From 19414b061b5a73a444ce0ee0c03fec2053cbc924 Mon Sep 17 00:00:00 2001 From: SChernykh Date: Fri, 4 Nov 2022 10:14:49 +0100 Subject: [PATCH] Refactored BackgroundJobTracker --- src/p2p_server.cpp | 16 ++++++++-------- src/pow_hash.cpp | 4 ++-- src/stratum_server.cpp | 4 ++-- src/util.cpp | 29 ++++++++++++++++------------- src/util.h | 11 +++++++++-- 5 files changed, 37 insertions(+), 27 deletions(-) diff --git a/src/p2p_server.cpp b/src/p2p_server.cpp index 2022bc7..dbae60f 100644 --- a/src/p2p_server.cpp +++ b/src/p2p_server.cpp @@ -420,13 +420,13 @@ void P2PServer::save_peer_list_async() const int err = uv_queue_work(&m_loop, &work->req, [](uv_work_t* req) { - bkg_jobs_tracker.start("P2PServer::save_peer_list_async"); + BACKGROUND_JOB_START(P2PServer::save_peer_list_async); reinterpret_cast(req->data)->server->save_peer_list(); }, [](uv_work_t* req, int /*status*/) { delete reinterpret_cast(req->data); - bkg_jobs_tracker.stop("P2PServer::save_peer_list_async"); + BACKGROUND_JOB_STOP(P2PServer::save_peer_list_async); }); if (err) { @@ -995,13 +995,13 @@ void P2PServer::flush_cache() const int err = uv_queue_work(&m_loop, &work->req, [](uv_work_t* req) { - bkg_jobs_tracker.start("P2PServer::flush_cache"); + BACKGROUND_JOB_START(P2PServer::flush_cache); reinterpret_cast(req->data)->cache->flush(); }, [](uv_work_t* req, int) { delete reinterpret_cast(req->data); - bkg_jobs_tracker.stop("P2PServer::flush_cache"); + BACKGROUND_JOB_STOP(P2PServer::flush_cache); }); if (err) { @@ -1534,7 +1534,7 @@ void P2PServer::P2PClient::send_handshake_solution(const uint8_t (&challenge)[CH const int err = uv_queue_work(&server->m_loop, &work->req, [](uv_work_t* req) { - bkg_jobs_tracker.start("P2PServer::send_handshake_solution"); + BACKGROUND_JOB_START(P2PServer::send_handshake_solution); Work* work = reinterpret_cast(req->data); const std::vector& consensus_id = work->server->m_pool->side_chain().consensus_id(); @@ -1591,7 +1591,7 @@ void P2PServer::P2PClient::send_handshake_solution(const uint8_t (&challenge)[CH [work]() { delete work; - bkg_jobs_tracker.stop("P2PServer::send_handshake_solution"); + BACKGROUND_JOB_STOP(P2PServer::send_handshake_solution); }); // We might've been disconnected while working on the challenge, do nothing in this case @@ -2146,7 +2146,7 @@ bool P2PServer::P2PClient::handle_incoming_block_async(const PoolBlock* block, u const int err = uv_queue_work(&server->m_loop, &work->req, [](uv_work_t* req) { - bkg_jobs_tracker.start("P2PServer::handle_incoming_block_async"); + BACKGROUND_JOB_START(P2PServer::handle_incoming_block_async); Work* work = reinterpret_cast(req->data); work->client->handle_incoming_block(work->server->m_pool, work->block, work->client_reset_counter, work->client_ip, work->missing_blocks); }, @@ -2155,7 +2155,7 @@ bool P2PServer::P2PClient::handle_incoming_block_async(const PoolBlock* block, u Work* work = reinterpret_cast(req->data); work->client->post_handle_incoming_block(work->client_reset_counter, work->missing_blocks); delete work; - bkg_jobs_tracker.stop("P2PServer::handle_incoming_block_async"); + BACKGROUND_JOB_STOP(P2PServer::handle_incoming_block_async); }); if (err != 0) { diff --git a/src/pow_hash.cpp b/src/pow_hash.cpp index 3657506..071c7f8 100644 --- a/src/pow_hash.cpp +++ b/src/pow_hash.cpp @@ -141,7 +141,7 @@ void RandomX_Hasher::set_seed_async(const hash& seed) const int err = uv_queue_work(uv_default_loop_checked(), &work->req, [](uv_work_t* req) { - bkg_jobs_tracker.start("RandomX_Hasher::set_seed_async"); + BACKGROUND_JOB_START(RandomX_Hasher::set_seed_async); Work* work = reinterpret_cast(req->data); if (!work->pool->stopped()) { work->hasher->set_seed(work->seed); @@ -150,7 +150,7 @@ void RandomX_Hasher::set_seed_async(const hash& seed) [](uv_work_t* req, int) { delete reinterpret_cast(req->data); - bkg_jobs_tracker.stop("RandomX_Hasher::set_seed_async"); + BACKGROUND_JOB_STOP(RandomX_Hasher::set_seed_async); } ); diff --git a/src/stratum_server.cpp b/src/stratum_server.cpp index a3bcec3..4f41878 100644 --- a/src/stratum_server.cpp +++ b/src/stratum_server.cpp @@ -806,7 +806,7 @@ void StratumServer::on_share_found(uv_work_t* req) { SubmittedShare* share = reinterpret_cast(req->data); if (share->m_highEnoughDifficulty) { - bkg_jobs_tracker.start("StratumServer::on_share_found"); + BACKGROUND_JOB_START(StratumServer::on_share_found); } StratumClient* client = share->m_client; @@ -891,7 +891,7 @@ void StratumServer::on_after_share_found(uv_work_t* req, int /*status*/) if (share->m_highEnoughDifficulty) { const char* s = client->m_customUser; LOGINFO(0, log::Green() << "SHARE FOUND: mainchain height " << share->m_mainchainHeight << ", diff " << share->m_sidechainDifficulty << ", client " << static_cast(client->m_addrString) << (*s ? " user " : "") << s << ", effort " << share->m_effort << '%'); - bkg_jobs_tracker.stop("StratumServer::on_share_found"); + BACKGROUND_JOB_STOP(StratumServer::on_share_found); } ON_SCOPE_LEAVE([share]() { share->m_server->m_submittedSharesPool.push_back(share); }); diff --git a/src/util.cpp b/src/util.cpp index d239032..2693364 100644 --- a/src/util.cpp +++ b/src/util.cpp @@ -287,21 +287,22 @@ struct BackgroundJobTracker::Impl void wait() { + uint64_t last_msg_time = 0; do { - bool is_empty = true; { MutexLock lock(m_lock); - is_empty = m_jobs.empty(); - for (const auto& job : m_jobs) { - LOGINFO(1, "waiting for " << job.second << " \"" << job.first << "\" jobs to finish"); + if (m_jobs.empty()) { + return; + } + const uint64_t t = seconds_since_epoch(); + if (t != last_msg_time) { + last_msg_time = t; + for (const auto& job : m_jobs) { + LOGINFO(1, "waiting for " << job.second << " \"" << job.first << "\" jobs to finish"); + } } } - - if (is_empty) { - return; - } - - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); } while (1); } @@ -323,8 +324,10 @@ struct BackgroundJobTracker::Impl LOGINFO(0, "background jobs running:" << log::const_buf(buf, s.m_pos)); } + struct Compare { FORCEINLINE bool operator()(const char* a, const char* b) const { return strcmp(a, b) < 0; } }; + uv_mutex_t m_lock; - std::map m_jobs; + std::map m_jobs; }; BackgroundJobTracker::BackgroundJobTracker() : m_impl(new Impl()) @@ -336,12 +339,12 @@ BackgroundJobTracker::~BackgroundJobTracker() delete m_impl; } -void BackgroundJobTracker::start(const char* name) +void BackgroundJobTracker::start_internal(const char* name) { m_impl->start(name); } -void BackgroundJobTracker::stop(const char* name) +void BackgroundJobTracker::stop_internal(const char* name) { m_impl->stop(name); } diff --git a/src/util.h b/src/util.h index d1ce64a..3896014 100644 --- a/src/util.h +++ b/src/util.h @@ -156,18 +156,25 @@ public: BackgroundJobTracker(); ~BackgroundJobTracker(); - void start(const char* name); - void stop(const char* name); + template FORCEINLINE void start(const char (&name)[N]) { start_internal(name); } + template FORCEINLINE void stop (const char (&name)[N]) { stop_internal (name); } + void wait(); void print_status(); private: + void start_internal(const char* name); + void stop_internal(const char* name); + struct Impl; Impl* m_impl; }; extern BackgroundJobTracker bkg_jobs_tracker; +#define BACKGROUND_JOB_START(x) do { bkg_jobs_tracker.start(#x); } while (0) +#define BACKGROUND_JOB_STOP(x) do { bkg_jobs_tracker.stop(#x); } while (0) + void set_main_thread(); bool is_main_thread();