From 146d29b627bba504fa8184912178297132fb9cbc Mon Sep 17 00:00:00 2001 From: SChernykh Date: Fri, 10 Mar 2023 14:46:53 +0100 Subject: [PATCH] Don't let user connect to a node without ZMQ --- src/p2p_server.cpp | 6 ++++ src/p2pool.cpp | 20 ++++++++------ src/p2pool.h | 1 + src/zmq_reader.cpp | 69 +++++++++++++++++++++++----------------------- src/zmq_reader.h | 5 +++- 5 files changed, 58 insertions(+), 43 deletions(-) diff --git a/src/p2p_server.cpp b/src/p2p_server.cpp index 11d8d64..242d6d6 100644 --- a/src/p2p_server.cpp +++ b/src/p2p_server.cpp @@ -1153,6 +1153,12 @@ void P2PServer::check_zmq() return; } + if (!m_pool->zmq_running()) { + LOGERR(1, "ZMQ is not running, restarting it"); + m_pool->restart_zmq(); + return; + } + const uint64_t cur_time = seconds_since_epoch(); const uint64_t last_active = m_pool->zmq_last_active(); diff --git a/src/p2pool.cpp b/src/p2pool.cpp index 5c066b4..e4b2351 100644 --- a/src/p2pool.cpp +++ b/src/p2pool.cpp @@ -735,14 +735,6 @@ void p2pool::download_block_headers(uint64_t current_height) if (parse_block_headers_range(data, size) == current_height - start_height) { update_median_timestamp(); if (m_serversStarted.exchange(1) == 0) { - try { - m_ZMQReader = new ZMQReader(m_params->m_host, m_params->m_zmqPort, m_params->m_socks5Proxy, this); - } - catch (const std::exception& e) { - LOGERR(1, "Couldn't start ZMQ reader: exception " << e.what()); - PANIC_STOP(); - } - m_stratumServer = new StratumServer(this); m_p2pServer = new P2PServer(this); #ifdef WITH_RANDOMX @@ -750,6 +742,13 @@ void p2pool::download_block_headers(uint64_t current_height) start_mining(m_params->m_minerThreads); } #endif + try { + m_ZMQReader = new ZMQReader(m_params->m_host, m_params->m_zmqPort, m_params->m_socks5Proxy, this); + } + catch (const std::exception& e) { + LOGERR(1, "Couldn't start ZMQ reader: exception " << e.what()); + PANIC_STOP(); + } api_update_network_stats(); get_miner_data(); } @@ -1535,6 +1534,11 @@ void p2pool::stop() } } +bool p2pool::zmq_running() const +{ + return m_ZMQReader && m_ZMQReader->is_running(); +} + void p2pool::restart_zmq() { // If p2pool is stopped, m_restartZMQAsync is most likely already closed diff --git a/src/p2pool.h b/src/p2pool.h index 7ba722f..e9a3e7a 100644 --- a/src/p2pool.h +++ b/src/p2pool.h @@ -95,6 +95,7 @@ public: void stop_mining(); #endif + bool zmq_running() const; uint64_t zmq_last_active() const { return m_zmqLastActive; } uint64_t start_time() const { return m_startTime; } void restart_zmq(); diff --git a/src/zmq_reader.cpp b/src/zmq_reader.cpp index 2aa59da..9868d21 100644 --- a/src/zmq_reader.cpp +++ b/src/zmq_reader.cpp @@ -58,10 +58,32 @@ ZMQReader::ZMQReader(const std::string& address, uint32_t zmq_port, const std::s throw zmq::error_t(EFSM); } + m_subscriber.set(zmq::sockopt::connect_timeout, 1000); + + if (!m_proxy.empty()) { + m_subscriber.set(zmq::sockopt::socks_proxy, zmq::const_buffer(m_proxy.c_str(), m_proxy.length())); + } + + std::string addr = "tcp://" + m_address + ':' + std::to_string(m_zmqPort); + if (!connect(addr)) { + throw zmq::error_t(EFSM); + } + + m_subscriber.set(zmq::sockopt::socks_proxy, zmq::const_buffer()); + + addr = "tcp://127.0.0.1:" + std::to_string(m_publisherPort); + if (!connect(addr)) { + throw zmq::error_t(EFSM); + } + + m_subscriber.set(zmq::sockopt::subscribe, "json-full-chain_main"); + m_subscriber.set(zmq::sockopt::subscribe, "json-full-miner_data"); + m_subscriber.set(zmq::sockopt::subscribe, "json-minimal-txpool_add"); + const int err = uv_thread_create(&m_worker, run_wrapper, this); if (err) { LOGERR(1, "failed to start ZMQ thread, error " << uv_err_name(err)); - throw zmq::error_t(EFSM); + throw zmq::error_t(EMTHREAD); } } @@ -69,7 +91,7 @@ ZMQReader::~ZMQReader() { LOGINFO(1, "stopping"); - m_finished.exchange(1); + m_finished.exchange(true); try { const char msg[] = "json-minimal-txpool_add:[]"; @@ -89,28 +111,12 @@ void ZMQReader::run_wrapper(void* arg) void ZMQReader::run() { - try { - if (!m_proxy.empty()) { - m_subscriber.set(zmq::sockopt::socks_proxy, zmq::const_buffer(m_proxy.c_str(), m_proxy.length())); - } - - std::string addr = "tcp://" + m_address + ':' + std::to_string(m_zmqPort); - if (!connect(addr)) { - return; - } - - m_subscriber.set(zmq::sockopt::socks_proxy, zmq::const_buffer()); - - addr = "tcp://127.0.0.1:" + std::to_string(m_publisherPort); - if (!connect(addr)) { - return; - } + m_threadRunning = true; + ON_SCOPE_LEAVE([this]() { m_threadRunning = false; }); - m_subscriber.set(zmq::sockopt::subscribe, "json-full-chain_main"); - m_subscriber.set(zmq::sockopt::subscribe, "json-full-miner_data"); - m_subscriber.set(zmq::sockopt::subscribe, "json-minimal-txpool_add"); + zmq_msg_t message = {}; - zmq_msg_t message; + try { int rc = zmq_msg_init(&message); if (rc != 0) { throw zmq::error_t(errno); @@ -130,12 +136,12 @@ void ZMQReader::run() parse(reinterpret_cast(zmq_msg_data(&message)), zmq_msg_size(&message)); } while (true); - - zmq_msg_close(&message); } catch (const std::exception& e) { LOGERR(1, "exception " << e.what()); } + + zmq_msg_close(&message); } bool ZMQReader::connect(const std::string& address) @@ -163,21 +169,16 @@ bool ZMQReader::connect(const std::string& address) s << "inproc://p2pool-connect-mon-" << id << '\0'; ++id; + using namespace std::chrono; + const auto start_time = steady_clock::now(); + monitor.init(m_subscriber, buf); m_subscriber.connect(address); - using namespace std::chrono; - steady_clock::time_point start_time = steady_clock::now(); - while (!monitor.connected && monitor.check_event(-1)) { - const steady_clock::time_point cur_time = steady_clock::now(); - const int64_t elapsed_time = duration_cast(cur_time - start_time).count(); - if (elapsed_time >= 3000) { + if (duration_cast(steady_clock::now() - start_time).count() >= 1000) { LOGERR(1, "failed to connect to " << address); - if (m_finished.load()) { - return false; - } - start_time = cur_time; + return false; } } diff --git a/src/zmq_reader.h b/src/zmq_reader.h index fad4769..646085e 100644 --- a/src/zmq_reader.h +++ b/src/zmq_reader.h @@ -27,6 +27,8 @@ public: ZMQReader(const std::string& address, uint32_t zmq_port, const std::string& proxy, MinerCallbackHandler* handler); ~ZMQReader(); + bool is_running() const { return m_threadRunning.load(); } + private: static void run_wrapper(void* arg); void run(); @@ -44,7 +46,8 @@ private: zmq::socket_t m_publisher{ m_context, ZMQ_PUB }; zmq::socket_t m_subscriber{ m_context, ZMQ_SUB }; uint16_t m_publisherPort = 37891; - std::atomic m_finished{ 0 }; + std::atomic m_finished{ false }; + std::atomic m_threadRunning{ false }; TxMempoolData m_tx; MinerData m_minerData;