Restart ZMQ connection if it looks dead

pull/166/head
SChernykh 2 years ago
parent c3c326c680
commit 6c6ef1c1b8

@ -1029,6 +1029,7 @@ void P2PServer::check_zmq()
if (cur_time >= last_active + 300) {
const uint64_t dt = static_cast<uint64_t>(cur_time - last_active);
LOGERR(1, "no ZMQ messages received from monerod in the last " << dt << " seconds, check your monerod/p2pool/network/firewall setup!!!");
m_pool->restart_zmq();
}
}

@ -108,6 +108,13 @@ p2pool::p2pool(int argc, char* argv[])
}
m_stopAsync.data = this;
err = uv_async_init(uv_default_loop_checked(), &m_restartZMQAsync, on_restart_zmq);
if (err) {
LOGERR(1, "uv_async_init failed, error " << uv_err_name(err));
panic();
}
m_restartZMQAsync.data = this;
uv_rwlock_init_checked(&m_mainchainLock);
uv_rwlock_init_checked(&m_minerDataLock);
uv_mutex_init_checked(&m_foundBlocksLock);
@ -438,6 +445,7 @@ void p2pool::on_stop(uv_async_t* async)
uv_close(reinterpret_cast<uv_handle_t*>(&pool->m_submitBlockAsync), nullptr);
uv_close(reinterpret_cast<uv_handle_t*>(&pool->m_blockTemplateAsync), nullptr);
uv_close(reinterpret_cast<uv_handle_t*>(&pool->m_stopAsync), nullptr);
uv_close(reinterpret_cast<uv_handle_t*>(&pool->m_restartZMQAsync), nullptr);
uv_stop(uv_default_loop());
}
@ -632,7 +640,14 @@ void p2pool::download_block_headers(uint64_t current_height)
if (parse_block_headers_range(data, size) == current_height - start_height) {
update_median_timestamp();
if (m_serversStarted.exchange(1) == 0) {
m_ZMQReader = new ZMQReader(m_params->m_host.c_str(), m_params->m_zmqPort, this);
try {
m_ZMQReader = new ZMQReader(m_params->m_host.c_str(), m_params->m_zmqPort, this);
}
catch (const std::exception& e) {
LOGERR(1, "Couldn't start ZMQ reader: exception " << e.what());
panic();
}
m_stratumServer = new StratumServer(this);
m_p2pServer = new P2PServer(this);
#ifdef WITH_RANDOMX
@ -1376,6 +1391,27 @@ void p2pool::stop()
uv_async_send(&m_stopAsync);
}
void p2pool::restart_zmq()
{
if (!is_main_thread()) {
uv_async_send(&m_restartZMQAsync);
return;
}
get_miner_data();
delete m_ZMQReader;
m_ZMQReader = nullptr;
try {
m_ZMQReader = new ZMQReader(m_params->m_host.c_str(), m_params->m_zmqPort, this);
m_zmqLastActive = seconds_since_epoch();
}
catch (const std::exception& e) {
LOGERR(1, "Couldn't restart ZMQ reader: exception " << e.what());
}
}
int p2pool::run()
{
if (!m_params->ok()) {

@ -95,6 +95,7 @@ public:
uint64_t zmq_last_active() const { return m_zmqLastActive; }
uint64_t start_time() const { return m_startTime; }
void restart_zmq();
private:
p2pool(const p2pool&) = delete;
@ -103,6 +104,7 @@ private:
static void on_submit_block(uv_async_t* async) { reinterpret_cast<p2pool*>(async->data)->submit_block(); }
static void on_update_block_template(uv_async_t* async) { reinterpret_cast<p2pool*>(async->data)->update_block_template(); }
static void on_stop(uv_async_t*);
static void on_restart_zmq(uv_async_t* async) { reinterpret_cast<p2pool*>(async->data)->restart_zmq(); }
void submit_block() const;
@ -197,6 +199,7 @@ private:
std::atomic<uint64_t> m_zmqLastActive;
uint64_t m_startTime;
uv_async_t m_restartZMQAsync;
ZMQReader* m_ZMQReader = nullptr;
};

@ -49,13 +49,13 @@ ZMQReader::ZMQReader(const char* address, uint32_t zmq_port, MinerCallbackHandle
if (!m_publisherPort) {
LOGERR(1, "failed to to bind ZMQ publisher port, aborting");
panic();
throw zmq::error_t(EFSM);
}
const int err = uv_thread_create(&m_worker, run_wrapper, this);
if (err) {
LOGERR(1, "failed to start ZMQ thread, error " << uv_err_name(err));
panic();
throw zmq::error_t(EFSM);
}
}
@ -71,8 +71,7 @@ ZMQReader::~ZMQReader()
uv_thread_join(&m_worker);
}
catch (const std::exception& e) {
LOGERR(1, "exception " << e.what() << ", aborting");
panic();
LOGERR(1, "exception " << e.what());
}
}
@ -104,7 +103,7 @@ void ZMQReader::run()
zmq_msg_t message;
int rc = zmq_msg_init(&message);
if (rc != 0) {
throw zmq::error_t();
throw zmq::error_t(errno);
}
LOGINFO(1, "worker thread ready");
@ -112,7 +111,7 @@ void ZMQReader::run()
do {
rc = zmq_msg_recv(&message, m_subscriber, 0);
if (rc < 0) {
throw zmq::error_t();
throw zmq::error_t(errno);
}
if (m_finished.load()) {
@ -125,8 +124,7 @@ void ZMQReader::run()
zmq_msg_close(&message);
}
catch (const std::exception& e) {
LOGERR(1, "exception " << e.what() << ", aborting");
panic();
LOGERR(1, "exception " << e.what());
}
}
@ -196,7 +194,7 @@ void ZMQReader::parse(char* data, size_t size)
using namespace rapidjson;
Document doc;
if (doc.Parse<rapidjson::kParseCommentsFlag | rapidjson::kParseTrailingCommasFlag>(value, end - value).HasParseError()) {
if (doc.Parse<kParseCommentsFlag | kParseTrailingCommasFlag>(value, end - value).HasParseError()) {
LOGWARN(1, "ZeroMQ message failed to parse, skipping it");
return;
}

Loading…
Cancel
Save