7 #include "Metaserver.h"
9 #include "ServerInfo.h"
11 #include "EventService.h"
12 #include "Exceptions.h"
14 #include <Atlas/Objects/Operation.h>
15 #include <Atlas/Objects/RootEntity.h>
16 #include <sigc++/slot.h>
29 #define snprintf _snprintf
34 using namespace Atlas::Objects::Operation;
35 using Atlas::Objects::smart_dynamic_cast;
36 using Atlas::Objects::Root;
37 using Atlas::Objects::Entity::RootEntity;
41 char* pack_uint32(uint32_t data,
char* buffer,
unsigned int& size);
43 char* unpack_uint32(uint32_t& dest,
char* buffer);
45 const char* META_SERVER_PORT =
"8453";
48 const uint32_t CKEEP_ALIVE = 2,
57 const uint32_t LIST_RESP2 = 999;
63 ObjectsDecoder(factories), m_meta(meta) {
66 void objectArrived(Root obj)
override {
67 m_meta.objectArrived(std::move(obj));
71 Meta::Meta(boost::asio::io_service& io_service,
73 std::string metaServer,
74 unsigned int maxQueries) :
75 m_factories(new Atlas::Objects::Factories()),
76 m_io_service(io_service),
77 m_event_service(eventService),
80 m_metaHost(std::move(metaServer)),
81 m_maxActiveQueries(maxQueries),
83 m_resolver(io_service),
85 m_metaTimer(io_service),
86 m_receive_stream(&m_receive_buffer),
87 m_send_buffer(new boost::asio::streambuf()),
88 m_send_stream(m_send_buffer.get()),
95 unsigned int max_half_open = FD_SETSIZE;
96 if (m_maxActiveQueries > (max_half_open - 2)) {
97 m_maxActiveQueries = max_half_open - 2;
125 error() <<
"called queryServerByIndex with invalid server list";
129 if (index >= m_gameServers.size()) {
130 error() <<
"called queryServerByIndex with bad server index " << index;
134 if (m_gameServers[index].status == ServerInfo::QUERYING) {
135 warning() <<
"called queryServerByIndex on server already being queried";
139 internalQuery(index);
143 if (!m_activeQueries.empty()) {
144 warning() <<
"called meta::refresh() while doing another query, ignoring";
148 if (m_status ==
VALID) {
150 m_lastValidList = m_gameServers;
153 m_gameServers.clear();
160 m_activeQueries.clear();
165 if (!m_lastValidList.empty()) {
166 m_gameServers = m_lastValidList;
170 m_gameServers.clear();
172 m_nextQuery = m_gameServers.size();
176 if (index >= m_gameServers.size()) {
177 error() <<
"passed out-of-range index " << index <<
178 " to getInfoForServer";
179 throw BaseException(
"Out of bounds exception when getting server info.");
181 return m_gameServers[index];
186 return m_gameServers.size();
190 boost::asio::ip::udp::resolver::query query(m_metaHost, META_SERVER_PORT);
191 m_resolver.async_resolve(query,
192 [&](
const boost::system::error_code& ec, boost::asio::ip::udp::resolver::iterator iterator) {
193 if (!ec && iterator != boost::asio::ip::udp::resolver::iterator()) {
201 void Meta::connect(
const boost::asio::ip::udp::endpoint& endpoint) {
202 m_socket.open(boost::asio::ip::udp::v4());
203 m_socket.async_connect(endpoint, [&](boost::system::error_code ec) {
208 unsigned int dsz = 0;
209 pack_uint32(CKEEP_ALIVE, m_data.data(), dsz);
210 this->m_send_stream << std::string(m_data.data(), dsz) << std::flush;
212 this->setupRecvCmd();
215 this->startTimeout();
217 this->doFailure(
"Couldn't open connection to metaserver " + this->m_metaHost);
223 if (m_socket.is_open()) {
226 m_metaTimer.cancel();
229 void Meta::startTimeout() {
230 m_metaTimer.cancel();
231 m_metaTimer.expires_from_now(std::chrono::seconds(8));
232 m_metaTimer.async_wait([&](boost::system::error_code ec) {
240 void Meta::do_read() {
241 if (m_socket.is_open()) {
242 m_socket.async_receive(m_receive_buffer.prepare(DATA_BUFFER_SIZE),
243 [
this](boost::system::error_code ec, std::size_t length) {
245 m_receive_buffer.commit(length);
252 if (ec != boost::asio::error::operation_aborted) {
253 this->doFailure(std::string(
"Connection to the meta-server failed: ") + ec.message());
261 if (m_socket.is_open()) {
262 if (m_send_buffer->size() != 0) {
263 std::shared_ptr<boost::asio::streambuf> send_buffer(std::move(m_send_buffer));
264 m_send_buffer = std::make_unique<boost::asio::streambuf>();
265 m_send_stream.rdbuf(m_send_buffer.get());
266 m_socket.async_send(send_buffer->data(),
267 [&, send_buffer](boost::system::error_code ec, std::size_t length) {
269 send_buffer->consume(length);
271 if (ec != boost::asio::error::operation_aborted) {
272 this->doFailure(std::string(
"Connection to the meta-server failed: ") + ec.message());
280 void Meta::gotData() {
284 void Meta::deleteQuery(MetaQuery* query) {
285 auto I = std::find_if(m_activeQueries.begin(), m_activeQueries.end(), [&](
const std::unique_ptr<MetaQuery>& entry){return entry.get() == query;});
287 if (I != m_activeQueries.end()) {
288 auto containedQuery = I->release();
289 m_activeQueries.erase(I);
292 m_event_service.runOnMainThread([containedQuery]() {
293 delete containedQuery;
296 if (m_activeQueries.empty() && m_nextQuery == m_gameServers.size()) {
299 AllQueriesDone.emit();
302 error() <<
"Tried to delete meta server query which wasn't "
303 "among the active queries. This indicates an error "
304 "with the flow in Metaserver.";
309 if (m_bytesToRecv == 0) {
310 error() <<
"No bytes to receive when calling recv().";
314 m_receive_stream.peek();
315 std::streambuf* iobuf = m_receive_stream.rdbuf();
316 std::streamsize len = std::min(m_bytesToRecv, iobuf->in_avail());
318 iobuf->sgetn(m_dataPtr, len);
319 m_bytesToRecv -= len;
328 if (m_bytesToRecv > 0) {
329 error() <<
"Fragment data received by Meta::recv";
335 unpack_uint32(op, m_data.data());
342 if (m_bytesToRecv && m_receive_stream.rdbuf()->in_avail())
346 void Meta::recvCmd(uint32_t op) {
349 setupRecvData(1, HANDSHAKE);
353 doFailure(
"Got list range error from Metaserver");
357 setupRecvData(2, LIST_RESP);
361 doFailure(
"Unknown Meta server command");
366 void Meta::processCmd() {
367 if (m_status != GETTING_LIST) {
368 error() <<
"Command received when not expecting any. It will be ignored. The command was: " << m_gotCmd;
375 unpack_uint32(stamp, m_data.data());
377 unsigned int dsz = 0;
378 m_dataPtr = pack_uint32(CLIENTSHAKE, m_data.data(), dsz);
379 pack_uint32(stamp, m_dataPtr, dsz);
381 m_send_stream << std::string(m_data.data(), dsz) << std::flush;
384 m_metaTimer.cancel();
392 uint32_t total_servers;
393 m_dataPtr = unpack_uint32(total_servers, m_data.data());
394 if (!m_gameServers.empty()) {
395 if (total_servers != m_totalServers) {
396 warning() <<
"Server total in new packet has changed. " << total_servers <<
":" << m_totalServers;
399 m_totalServers = total_servers;
401 unpack_uint32(m_packed, m_dataPtr);
408 setupRecvData(m_packed, LIST_RESP2);
411 if (m_gameServers.empty()) {
413 assert(m_nextQuery == 0);
414 m_gameServers.reserve(m_totalServers);
420 m_dataPtr = m_data.data();
423 m_dataPtr = unpack_uint32(ip, m_dataPtr);
426 snprintf(buf, 32,
"%u.%u.%u.%u",
428 (ip & 0x0000FF00) >> 8u,
429 (ip & 0x00FF0000) >> 16u,
430 (ip & 0xFF000000) >> 24u
434 m_gameServers.push_back(ServerInfo{buf});
437 if (m_gameServers.size() < m_totalServers) {
439 listReq((
unsigned int) m_gameServers.size());
442 CompletedServerList.emit(m_totalServers);
453 std::stringstream ss;
454 ss <<
"Unknown Meta server command: " << m_gotCmd;
460 void Meta::listReq(
unsigned int base) {
461 unsigned int dsz = 0;
462 char* _dataPtr = pack_uint32(LIST_REQ, m_data.data(), dsz);
463 pack_uint32(base, _dataPtr, dsz);
465 m_send_stream << std::string(m_data.data(), dsz) << std::flush;
472 void Meta::setupRecvCmd() {
474 m_bytesToRecv =
sizeof(uint32_t);
475 m_dataPtr = m_data.data();
478 void Meta::setupRecvData(
int words, uint32_t got) {
480 m_bytesToRecv = words *
sizeof(uint32_t);
481 m_dataPtr = m_data.data();
488 char* pack_uint32(uint32_t data,
char* buffer,
unsigned int& size) {
491 netorder = htonl(data);
492 memcpy(buffer, &netorder,
sizeof(uint32_t));
493 size +=
sizeof(uint32_t);
494 return buffer +
sizeof(uint32_t);
499 char* unpack_uint32(uint32_t& dest,
char* buffer) {
502 memcpy(&netorder, buffer,
sizeof(uint32_t));
503 dest = ntohl(netorder);
504 return buffer +
sizeof(uint32_t);
507 void Meta::internalQuery(
size_t index) {
508 assert(index < m_gameServers.size());
510 ServerInfo& sv = m_gameServers[index];
511 auto q = std::make_unique<MetaQuery>(m_io_service, *m_decoder, *
this, sv.host, index);
512 if (q->getStatus() != BaseConnection::CONNECTING &&
513 q->getStatus() != BaseConnection::NEGOTIATE) {
515 sv.status = ServerInfo::INVALID;
517 m_activeQueries.emplace_back(std::move(q));
518 sv.status = ServerInfo::QUERYING;
522 void Meta::objectArrived(Root obj) {
523 Info info = smart_dynamic_cast<Info>(obj);
524 if (!info.isValid()) {
525 error() <<
"Meta::objectArrived, failed to convert object to INFO op";
530 auto refno = info->getRefno();
531 QuerySet::iterator Q;
533 for (Q = m_activeQueries.begin(); Q != m_activeQueries.end(); ++Q)
534 if ((*Q)->getQueryNo() == refno)
break;
536 if (Q == m_activeQueries.end()) {
537 error() <<
"Couldn't locate query for meta-query reply";
541 RootEntity svr = smart_dynamic_cast<RootEntity>(info->getArgs().front());
542 if (!svr.isValid()) {
543 error() <<
"Query INFO argument object is broken";
545 if ((*Q)->getServerIndex() >= m_gameServers.size()) {
546 error() <<
"Got server info with out of bounds index.";
548 ServerInfo& sv = m_gameServers[(*Q)->getServerIndex()];
550 sv.processServer(svr);
551 sv.ping = (int) (*Q)->getElapsed();
554 ReceivedServerInfo.emit(sv);
557 deleteQuery(Q->get());
562 void Meta::doFailure(
const std::string& msg) {
567 void Meta::dispatch() {
571 void Meta::metaTimeout() {
573 m_metaTimer.cancel();
576 doFailure(
"Connection to the meta-server timed out");
579 void Meta::queryFailure(MetaQuery* q,
const std::string& msg) {
583 m_gameServers[q->getServerIndex()].status = ServerInfo::INVALID;
590 while ((m_activeQueries.size() < m_maxActiveQueries) && (m_nextQuery < m_gameServers.size())) {
591 internalQuery(m_nextQuery++);
595 void Meta::queryTimeout(MetaQuery* q) {
596 m_gameServers[q->getServerIndex()].status = ServerInfo::TIMEOUT;
Handles polling of the IO system as well as making sure that registered handlers are run on the main ...