From 4ec0fe8d96f66a8f074672e9eb0748a24b1874fa Mon Sep 17 00:00:00 2001 From: SChernykh Date: Tue, 5 Jul 2022 14:29:41 +0200 Subject: [PATCH] Fixed JSON RPC handling during shutdown --- src/json_rpc_request.cpp | 6 +++++- src/p2pool.cpp | 30 ++++++++++++++++++++++++------ src/uv_util.h | 27 +++++++++++++++++++++++++-- 3 files changed, 54 insertions(+), 9 deletions(-) diff --git a/src/json_rpc_request.cpp b/src/json_rpc_request.cpp index 5a7325c..f7d4a92 100644 --- a/src/json_rpc_request.cpp +++ b/src/json_rpc_request.cpp @@ -365,7 +365,7 @@ void Call(const std::string& address, int port, const std::string& req, const st loop = uv_default_loop(); } - CallOnLoop(loop, + const bool result = CallOnLoop(loop, [=]() { try { @@ -376,6 +376,10 @@ void Call(const std::string& address, int port, const std::string& req, const st (*close_cb)(msg, strlen(msg)); } }); + + if (!result) { + LOGERR(1, "JSON RPC \"" << req << "\" failed"); + } } } // namespace JSONRPCRequest diff --git a/src/p2pool.cpp b/src/p2pool.cpp index e2145b8..dd2f231 100644 --- a/src/p2pool.cpp +++ b/src/p2pool.cpp @@ -775,8 +775,10 @@ void p2pool::get_info() { if (size > 0) { LOGWARN(1, "get_info RPC request failed: error " << log::const_buf(data, size) << ", trying again in 1 second"); - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - get_info(); + if (!m_stopped) { + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + get_info(); + } } }); } @@ -820,6 +822,10 @@ void p2pool::load_found_blocks() void p2pool::parse_get_info_rpc(const char* data, size_t size) { + if (m_stopped) { + return; + } + rapidjson::Document doc; doc.Parse(data, size); @@ -881,14 +887,20 @@ void p2pool::get_version() { if (size > 0) { LOGWARN(1, "get_version RPC request failed: error " << log::const_buf(data, size) << ", trying again in 1 second"); - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - get_version(); + if (!m_stopped) { + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + get_version(); + } } }); } void p2pool::parse_get_version_rpc(const char* data, size_t size) { + if (m_stopped) { + return; + } + rapidjson::Document doc; doc.Parse(data, size); @@ -945,8 +957,10 @@ void p2pool::get_miner_data() { if (size > 0) { LOGWARN(1, "get_miner_data RPC request failed: error " << log::const_buf(data, size) << ", trying again in 1 second"); - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - get_miner_data(); + if (!m_stopped) { + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + get_miner_data(); + } } else { m_getMinerDataPending = false; @@ -956,6 +970,10 @@ void p2pool::get_miner_data() void p2pool::parse_get_miner_data_rpc(const char* data, size_t size) { + if (m_stopped) { + return; + } + hash h; keccak(reinterpret_cast(data), static_cast(size), h.h, HASH_SIZE); if (h == m_getMinerDataHash) { diff --git a/src/uv_util.h b/src/uv_util.h index 2d509be..d1f4fbe 100644 --- a/src/uv_util.h +++ b/src/uv_util.h @@ -137,9 +137,12 @@ struct UV_LoopUserData UV_LoopUserData* GetLoopUserData(uv_loop_t* loop, bool create = true); template -void CallOnLoop(uv_loop_t* loop, T&& callback) +bool CallOnLoop(uv_loop_t* loop, T&& callback) { UV_LoopUserData* data = GetLoopUserData(loop, false); + if (!data) { + return false; + } UV_LoopCallbackBase* cb = new UV_LoopCallback(std::move(callback)); { @@ -147,7 +150,27 @@ void CallOnLoop(uv_loop_t* loop, T&& callback) data->m_callbacks.push_back(cb); } - uv_async_send(data->m_async); + if (uv_async_send(data->m_async) == 0) { + return true; + } + + // Clean up after uv_async_send error + bool found = false; + { + MutexLock lock(data->m_callbacksLock); + + auto it = std::find(data->m_callbacks.begin(), data->m_callbacks.end(), cb); + if (it != data->m_callbacks.end()) { + found = true; + data->m_callbacks.erase(it); + } + } + + if (found) { + delete cb; + } + + return false; } } // namespace p2pool