Refactored BackgroundJobTracker

pull/226/head
SChernykh 2 years ago
parent 74b5ab1798
commit 19414b061b

@ -420,13 +420,13 @@ void P2PServer::save_peer_list_async()
const int err = uv_queue_work(&m_loop, &work->req, const int err = uv_queue_work(&m_loop, &work->req,
[](uv_work_t* req) [](uv_work_t* req)
{ {
bkg_jobs_tracker.start("P2PServer::save_peer_list_async"); BACKGROUND_JOB_START(P2PServer::save_peer_list_async);
reinterpret_cast<Work*>(req->data)->server->save_peer_list(); reinterpret_cast<Work*>(req->data)->server->save_peer_list();
}, },
[](uv_work_t* req, int /*status*/) [](uv_work_t* req, int /*status*/)
{ {
delete reinterpret_cast<Work*>(req->data); delete reinterpret_cast<Work*>(req->data);
bkg_jobs_tracker.stop("P2PServer::save_peer_list_async"); BACKGROUND_JOB_STOP(P2PServer::save_peer_list_async);
}); });
if (err) { if (err) {
@ -995,13 +995,13 @@ void P2PServer::flush_cache()
const int err = uv_queue_work(&m_loop, &work->req, const int err = uv_queue_work(&m_loop, &work->req,
[](uv_work_t* req) [](uv_work_t* req)
{ {
bkg_jobs_tracker.start("P2PServer::flush_cache"); BACKGROUND_JOB_START(P2PServer::flush_cache);
reinterpret_cast<Work*>(req->data)->cache->flush(); reinterpret_cast<Work*>(req->data)->cache->flush();
}, },
[](uv_work_t* req, int) [](uv_work_t* req, int)
{ {
delete reinterpret_cast<Work*>(req->data); delete reinterpret_cast<Work*>(req->data);
bkg_jobs_tracker.stop("P2PServer::flush_cache"); BACKGROUND_JOB_STOP(P2PServer::flush_cache);
}); });
if (err) { if (err) {
@ -1534,7 +1534,7 @@ void P2PServer::P2PClient::send_handshake_solution(const uint8_t (&challenge)[CH
const int err = uv_queue_work(&server->m_loop, &work->req, const int err = uv_queue_work(&server->m_loop, &work->req,
[](uv_work_t* req) [](uv_work_t* req)
{ {
bkg_jobs_tracker.start("P2PServer::send_handshake_solution"); BACKGROUND_JOB_START(P2PServer::send_handshake_solution);
Work* work = reinterpret_cast<Work*>(req->data); Work* work = reinterpret_cast<Work*>(req->data);
const std::vector<uint8_t>& consensus_id = work->server->m_pool->side_chain().consensus_id(); const std::vector<uint8_t>& consensus_id = work->server->m_pool->side_chain().consensus_id();
@ -1591,7 +1591,7 @@ void P2PServer::P2PClient::send_handshake_solution(const uint8_t (&challenge)[CH
[work]() [work]()
{ {
delete work; delete work;
bkg_jobs_tracker.stop("P2PServer::send_handshake_solution"); BACKGROUND_JOB_STOP(P2PServer::send_handshake_solution);
}); });
// We might've been disconnected while working on the challenge, do nothing in this case // We might've been disconnected while working on the challenge, do nothing in this case
@ -2146,7 +2146,7 @@ bool P2PServer::P2PClient::handle_incoming_block_async(const PoolBlock* block, u
const int err = uv_queue_work(&server->m_loop, &work->req, const int err = uv_queue_work(&server->m_loop, &work->req,
[](uv_work_t* req) [](uv_work_t* req)
{ {
bkg_jobs_tracker.start("P2PServer::handle_incoming_block_async"); BACKGROUND_JOB_START(P2PServer::handle_incoming_block_async);
Work* work = reinterpret_cast<Work*>(req->data); Work* work = reinterpret_cast<Work*>(req->data);
work->client->handle_incoming_block(work->server->m_pool, work->block, work->client_reset_counter, work->client_ip, work->missing_blocks); work->client->handle_incoming_block(work->server->m_pool, work->block, work->client_reset_counter, work->client_ip, work->missing_blocks);
}, },
@ -2155,7 +2155,7 @@ bool P2PServer::P2PClient::handle_incoming_block_async(const PoolBlock* block, u
Work* work = reinterpret_cast<Work*>(req->data); Work* work = reinterpret_cast<Work*>(req->data);
work->client->post_handle_incoming_block(work->client_reset_counter, work->missing_blocks); work->client->post_handle_incoming_block(work->client_reset_counter, work->missing_blocks);
delete work; delete work;
bkg_jobs_tracker.stop("P2PServer::handle_incoming_block_async"); BACKGROUND_JOB_STOP(P2PServer::handle_incoming_block_async);
}); });
if (err != 0) { if (err != 0) {

@ -141,7 +141,7 @@ void RandomX_Hasher::set_seed_async(const hash& seed)
const int err = uv_queue_work(uv_default_loop_checked(), &work->req, const int err = uv_queue_work(uv_default_loop_checked(), &work->req,
[](uv_work_t* req) [](uv_work_t* req)
{ {
bkg_jobs_tracker.start("RandomX_Hasher::set_seed_async"); BACKGROUND_JOB_START(RandomX_Hasher::set_seed_async);
Work* work = reinterpret_cast<Work*>(req->data); Work* work = reinterpret_cast<Work*>(req->data);
if (!work->pool->stopped()) { if (!work->pool->stopped()) {
work->hasher->set_seed(work->seed); work->hasher->set_seed(work->seed);
@ -150,7 +150,7 @@ void RandomX_Hasher::set_seed_async(const hash& seed)
[](uv_work_t* req, int) [](uv_work_t* req, int)
{ {
delete reinterpret_cast<Work*>(req->data); delete reinterpret_cast<Work*>(req->data);
bkg_jobs_tracker.stop("RandomX_Hasher::set_seed_async"); BACKGROUND_JOB_STOP(RandomX_Hasher::set_seed_async);
} }
); );

@ -806,7 +806,7 @@ void StratumServer::on_share_found(uv_work_t* req)
{ {
SubmittedShare* share = reinterpret_cast<SubmittedShare*>(req->data); SubmittedShare* share = reinterpret_cast<SubmittedShare*>(req->data);
if (share->m_highEnoughDifficulty) { if (share->m_highEnoughDifficulty) {
bkg_jobs_tracker.start("StratumServer::on_share_found"); BACKGROUND_JOB_START(StratumServer::on_share_found);
} }
StratumClient* client = share->m_client; StratumClient* client = share->m_client;
@ -891,7 +891,7 @@ void StratumServer::on_after_share_found(uv_work_t* req, int /*status*/)
if (share->m_highEnoughDifficulty) { if (share->m_highEnoughDifficulty) {
const char* s = client->m_customUser; const char* s = client->m_customUser;
LOGINFO(0, log::Green() << "SHARE FOUND: mainchain height " << share->m_mainchainHeight << ", diff " << share->m_sidechainDifficulty << ", client " << static_cast<char*>(client->m_addrString) << (*s ? " user " : "") << s << ", effort " << share->m_effort << '%'); LOGINFO(0, log::Green() << "SHARE FOUND: mainchain height " << share->m_mainchainHeight << ", diff " << share->m_sidechainDifficulty << ", client " << static_cast<char*>(client->m_addrString) << (*s ? " user " : "") << s << ", effort " << share->m_effort << '%');
bkg_jobs_tracker.stop("StratumServer::on_share_found"); BACKGROUND_JOB_STOP(StratumServer::on_share_found);
} }
ON_SCOPE_LEAVE([share]() { share->m_server->m_submittedSharesPool.push_back(share); }); ON_SCOPE_LEAVE([share]() { share->m_server->m_submittedSharesPool.push_back(share); });

@ -287,21 +287,22 @@ struct BackgroundJobTracker::Impl
void wait() void wait()
{ {
uint64_t last_msg_time = 0;
do { do {
bool is_empty = true;
{ {
MutexLock lock(m_lock); MutexLock lock(m_lock);
is_empty = m_jobs.empty(); if (m_jobs.empty()) {
for (const auto& job : m_jobs) { return;
LOGINFO(1, "waiting for " << job.second << " \"" << job.first << "\" jobs to finish"); }
const uint64_t t = seconds_since_epoch();
if (t != last_msg_time) {
last_msg_time = t;
for (const auto& job : m_jobs) {
LOGINFO(1, "waiting for " << job.second << " \"" << job.first << "\" jobs to finish");
}
} }
} }
std::this_thread::sleep_for(std::chrono::milliseconds(1));
if (is_empty) {
return;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
} while (1); } while (1);
} }
@ -323,8 +324,10 @@ struct BackgroundJobTracker::Impl
LOGINFO(0, "background jobs running:" << log::const_buf(buf, s.m_pos)); LOGINFO(0, "background jobs running:" << log::const_buf(buf, s.m_pos));
} }
struct Compare { FORCEINLINE bool operator()(const char* a, const char* b) const { return strcmp(a, b) < 0; } };
uv_mutex_t m_lock; uv_mutex_t m_lock;
std::map<std::string, int32_t> m_jobs; std::map<const char*, int32_t, Compare> m_jobs;
}; };
BackgroundJobTracker::BackgroundJobTracker() : m_impl(new Impl()) BackgroundJobTracker::BackgroundJobTracker() : m_impl(new Impl())
@ -336,12 +339,12 @@ BackgroundJobTracker::~BackgroundJobTracker()
delete m_impl; delete m_impl;
} }
void BackgroundJobTracker::start(const char* name) void BackgroundJobTracker::start_internal(const char* name)
{ {
m_impl->start(name); m_impl->start(name);
} }
void BackgroundJobTracker::stop(const char* name) void BackgroundJobTracker::stop_internal(const char* name)
{ {
m_impl->stop(name); m_impl->stop(name);
} }

@ -156,18 +156,25 @@ public:
BackgroundJobTracker(); BackgroundJobTracker();
~BackgroundJobTracker(); ~BackgroundJobTracker();
void start(const char* name); template<size_t N> FORCEINLINE void start(const char (&name)[N]) { start_internal(name); }
void stop(const char* name); template<size_t N> FORCEINLINE void stop (const char (&name)[N]) { stop_internal (name); }
void wait(); void wait();
void print_status(); void print_status();
private: private:
void start_internal(const char* name);
void stop_internal(const char* name);
struct Impl; struct Impl;
Impl* m_impl; Impl* m_impl;
}; };
extern BackgroundJobTracker bkg_jobs_tracker; extern BackgroundJobTracker bkg_jobs_tracker;
#define BACKGROUND_JOB_START(x) do { bkg_jobs_tracker.start(#x); } while (0)
#define BACKGROUND_JOB_STOP(x) do { bkg_jobs_tracker.stop(#x); } while (0)
void set_main_thread(); void set_main_thread();
bool is_main_thread(); bool is_main_thread();

Loading…
Cancel
Save