diff --git a/src/zmq_reader.cpp b/src/zmq_reader.cpp index 2b14e74..c912dfc 100644 --- a/src/zmq_reader.cpp +++ b/src/zmq_reader.cpp @@ -19,6 +19,7 @@ #include "zmq_reader.h" #include "json_parsers.h" #include +#include static constexpr char log_category_prefix[] = "ZMQReader "; @@ -88,10 +89,14 @@ void ZMQReader::run() char addr[32]; snprintf(addr, sizeof(addr), "tcp://%s:%u", m_address, m_zmqPort); - while (!connect(addr, m_zmqPort)) { if (m_finished.load()) return; } + if (!connect(addr)) { + return; + } snprintf(addr, sizeof(addr), "tcp://127.0.0.1:%u", m_publisherPort); - while (!connect(addr, m_publisherPort)) { if (m_finished.load()) return; } + if (!connect(addr)) { + return; + } m_subscriber.set(zmq::sockopt::subscribe, "json-full-chain_main"); m_subscriber.set(zmq::sockopt::subscribe, "json-full-miner_data"); @@ -126,7 +131,7 @@ void ZMQReader::run() } } -bool ZMQReader::connect(const char* address, uint32_t id) +bool ZMQReader::connect(const char* address) { struct ConnectMonitor : public zmq::monitor_t { @@ -139,19 +144,33 @@ bool ZMQReader::connect(const char* address, uint32_t id) bool connected = false; } monitor; - char buf[32]; - snprintf(buf, sizeof(buf), "inproc://connect-mon-%u", id); + static uint64_t id = 0; + + if (!id) { + std::random_device rd; + id = (static_cast(rd()) << 32) | static_cast(rd()); + } + + char buf[log::Stream::BUF_SIZE + 1]; + log::Stream s(buf); + s << "inproc://p2pool-connect-mon-" << id << '\0'; + ++id; + monitor.init(m_subscriber, buf); m_subscriber.connect(address); using namespace std::chrono; - const system_clock::time_point start_time = system_clock::now(); + system_clock::time_point start_time = system_clock::now(); while (!monitor.connected && monitor.check_event(-1)) { - const int64_t elapsed_time = duration_cast(system_clock::now() - start_time).count(); + const system_clock::time_point cur_time = system_clock::now(); + const int64_t elapsed_time = duration_cast(cur_time - start_time).count(); if (elapsed_time >= 3000) { LOGERR(1, "failed to connect to " << address); - return false; + if (m_finished.load()) { + return false; + } + start_time = cur_time; } } diff --git a/src/zmq_reader.h b/src/zmq_reader.h index e894175..c1ab74d 100644 --- a/src/zmq_reader.h +++ b/src/zmq_reader.h @@ -30,7 +30,7 @@ public: private: static void run_wrapper(void* arg); void run(); - bool connect(const char* address, uint32_t id); + bool connect(const char* address); void parse(char* data, size_t size);