7#include <Atlas/Net/Stream.h>
9#include <Atlas/Codecs/XML.h>
10#include <Atlas/Codecs/Packed.h>
16#define Debug(prg) { if (debug_flag) { prg } }
18static const bool debug_flag =
false;
20static std::string get_line(std::string &s,
char ch)
23 int n = (int) s.find(ch);
26 out.assign(s, 0, (
unsigned long) n);
33static inline std::string get_line(std::string &s1,
char ch, std::string &s2)
35 s2 = get_line(s1, ch);
40namespace Atlas {
namespace Net {
42NegotiateHelper::NegotiateHelper(std::list<std::string> & names) :
47bool NegotiateHelper::get(std::string &buf,
const std::string & header)
54 if(buf.find(
'\n') == 0)
60 if(get_line(buf,
'\n', s).empty())
63 if(get_line(s,
' ', h) == header)
67 Debug( std::cout <<
" got: " << s << std::endl; )
70 Debug( std::cerr <<
"Unknown pattern " << h << std::endl; )
75void NegotiateHelper::put(std::string &buf,
const std::string & header)
97StreamConnect::StreamConnect(std::string name, std::istream& inStream, std::ostream& outStream) :
98 m_state(SERVER_GREETING), m_outName(std::move(name)), m_inStream(inStream), m_outStream(outStream),
99 m_codecHelper(m_inCodecs), m_filterHelper(m_inFilters),
100 m_canPacked(true), m_canXML(true), m_canBach(true),m_canGzip(true), m_canBzip2(true)
104void StreamConnect::poll()
106 Debug( std::cout <<
"** Client(" << m_state <<
") : " << m_inStream.rdbuf()->in_avail() << std::endl; )
110 std::streamsize count;
111 while ((count = m_inStream.rdbuf()->in_avail()) > 0) {
112 for (
int i = 0 ; i < count; ++i) {
113 m_buf += (char) m_inStream.rdbuf()->sbumpc();
117 if(m_state == SERVER_GREETING)
121 if (!m_buf.empty() && !get_line(m_buf,
'\n', m_inName).empty())
123 Debug( std::cout <<
"server: " << m_inName << std::endl; )
124 m_state = CLIENT_GREETING;
128 if(m_state == CLIENT_GREETING)
132 m_outStream <<
"ATLAS " << m_outName << std::endl;
133 m_state = CLIENT_CODECS;
136 if (m_state == CLIENT_CODECS)
139 m_codecHelper.put(out,
"ICAN");
140 m_outStream << out << std::flush;
141 m_state = SERVER_CODECS;
144 if(m_state == SERVER_CODECS)
146 if (m_codecHelper.get(m_buf,
"IWILL"))
148 processServerCodecs();
154 if (m_state == CLIENT_FILTERS)
157 m_filterHelper.put(out,
"ICAN");
158 m_socket << out << std::flush;
162 if (m_state == SERVER_FILTERS)
164 if (m_filterHelper.get(m_buf,
"IWILL"))
166 processServerFilters();
173Atlas::Negotiate::State StreamConnect::getState()
177 if (m_canPacked || m_canXML || m_canBach)
182 else if (m_inStream || m_outStream)
190std::unique_ptr<Atlas::Codec> StreamConnect::getCodec(
Atlas::Bridge & bridge)
192 if (m_canPacked) {
return std::make_unique<Atlas::Codecs::Packed>(m_inStream, m_outStream, bridge); }
193 if (m_canXML) {
return std::make_unique<Atlas::Codecs::XML>(m_inStream, m_outStream, bridge); }
194 if (m_canBach) {
return std::make_unique<Atlas::Codecs::Bach>(m_inStream, m_outStream, bridge); }
198void StreamConnect::processServerCodecs()
200 for (
auto& codec : m_inCodecs)
202 if (codec ==
"XML") { m_canXML =
true; }
203 if (codec ==
"Packed") { m_canPacked =
true; }
204 if (codec ==
"Bach") { m_canBach =
true; }
208void StreamConnect::processServerFilters()
210 for (
auto& filter : m_inFilters)
212 if (filter ==
"Gzip") { m_canGzip =
true; }
213 if (filter ==
"Bzip2") { m_canBzip2 =
true; }
218void StreamConnect::processClientCodecs()
220 std::list<std::string>::const_iterator j;
222 for (j = m_inCodecs.begin(); j != m_inCodecs.end(); ++j)
228void StreamConnect::processClientFilters()
230 std::list<std::string>::const_iterator j;
232 for (j = m_inFilters.begin(); j != m_inFilters.end(); ++j)
240StreamAccept::StreamAccept(std::string name, std::istream& inStream, std::ostream& outStream) :
241 m_state(SERVER_GREETING),
242 m_outName(std::move(name)),
243 m_inStream(inStream),
244 m_outStream(outStream),
245 m_codecHelper(m_inCodecs),
246 m_filterHelper(m_inFilters),
255void StreamAccept::poll()
257 Debug( std::cout <<
"** Server(" << m_state <<
") : " << std::endl; )
259 if (m_state == SERVER_GREETING)
263 m_outStream <<
"ATLAS " << m_outName << std::endl;
264 m_state = CLIENT_GREETING;
265 Debug( std::cout <<
"server now in state " << m_state << std::endl; )
268 std::streamsize count;
269 while ((count = m_inStream.rdbuf()->in_avail()) > 0) {
270 for (
int i = 0 ; i < count; ++i) {
271 m_buf += (char) m_inStream.rdbuf()->sbumpc();
275 if (m_state == CLIENT_GREETING)
278 if (!m_buf.empty() && !get_line(m_buf,
'\n', m_inName).empty())
280 Debug(std::cout <<
"client: " << m_inName << std::endl; )
281 m_state = CLIENT_CODECS;
285 if (m_state == CLIENT_CODECS)
287 if (m_codecHelper.get(m_buf,
"ICAN"))
289 m_state = SERVER_CODECS;
290 Debug(std::cout <<
"server now in state " << m_state << std::endl;)
292 processClientCodecs();
295 if (m_state == SERVER_CODECS)
297 if (m_canPacked) { m_outStream <<
"IWILL Packed\n"; }
298 else if (m_canXML) { m_outStream <<
"IWILL XML\n"; }
299 else if (m_canBach) { m_outStream <<
"IWILL Bach\n"; }
300 m_outStream << std::endl;
306 if(m_state == CLIENT_FILTERS)
308 if (m_filterHelper.get(m_buf,
"ICAN"))
312 processClientFilters();
315 if (m_state == SERVER_FILTERS)
320 m_outStream << std::endl;
326Atlas::Negotiate::State StreamAccept::getState()
330 if (m_canPacked || m_canXML || m_canBach)
335 std::cout <<
"done, but no codec" << std::endl;
337 else if (m_inStream || m_outStream)
355 if (m_canPacked) {
return std::make_unique<Atlas::Codecs::Packed>(m_inStream, m_outStream, bridge); }
356 if (m_canXML) {
return std::make_unique<Atlas::Codecs::XML>(m_inStream, m_outStream, bridge); }
357 if (m_canBach) {
return std::make_unique<Atlas::Codecs::Bach>(m_inStream, m_outStream, bridge); }
362void StreamAccept::processServerCodecs()
364 FactoryCodecs::iterator i;
365 list<std::string>::iterator j;
367 FactoryCodecs *myCodecs = Factory<Codec<std::iostream> >::factories();
369 for (i = myCodecs->begin(); i != myCodecs->end(); ++i)
371 for (j = m_inCodecs.begin(); j != m_inCodecs.end(); ++j)
373 if ((*i)->getName() == *j)
375 outCodecs.push_back(*i);
382void StreamAccept::processServerFilters()
384 FactoryFilters::iterator i;
385 list<std::string>::iterator j;
387 FactoryFilters *myFilters = Factory<Filter>::factories();
389 for (i = myFilters->begin(); i != myFilters->end(); ++i)
391 for (j = m_inFilters.begin(); j != m_inFilters.end(); ++j)
393 if ((*i)->getName() == *j)
395 outFilters.push_back(*i);
402void StreamAccept::processClientCodecs()
404 for (
auto& codec : m_inCodecs)
406 if (codec ==
"XML") { m_canXML =
true; }
407 if (codec ==
"Packed") { m_canPacked =
true; }
408 if (codec ==
"Bach") { m_canBach =
true; }
412void StreamAccept::processClientFilters()
414 for (
auto& filter : m_inFilters)
416 if (filter ==
"Gzip") { m_canGzip =
true; }
417 if (filter ==
"Bzip2") { m_canBzip2 =
true; }
std::unique_ptr< Atlas::Codec > getCodec(Atlas::Bridge &) override
FIXME We should pass in the Bridge here, not at construction time.