Restrict uv_default_loop() to main thread

pull/5/head
SChernykh 3 years ago
parent dbf18c34a4
commit d8efb27374

@ -33,7 +33,7 @@ JSONRPCRequest::JSONRPCRequest(const char* address, int port, const char* req, C
{
m_readBuf[0] = '\0';
uv_tcp_init(uv_default_loop(), &m_socket);
uv_tcp_init(uv_default_loop_checked(), &m_socket);
uv_tcp_nodelay(&m_socket, 1);
sockaddr_in dest;

@ -51,6 +51,8 @@ public:
: m_writePos(0)
, m_readPos(0)
{
is_main_thread = true;
m_logFile.open(log_file_name, std::ios::app | std::ios::binary);
m_buf.resize(BUF_SIZE);

@ -43,6 +43,7 @@ namespace p2pool {
p2pool::p2pool(int argc, char* argv[])
: m_stopped(false)
, m_params(new Params(argc, argv))
, m_updateSeed(true)
{
if (!m_params->m_wallet.valid()) {
LOGERR(1, "Invalid wallet address. Try \"p2pool --help\".");
@ -58,6 +59,20 @@ p2pool::p2pool(int argc, char* argv[])
LOGWARN(1, "Mining to a stagenet wallet address");
}
int err = uv_async_init(uv_default_loop_checked(), &m_blockTemplateAsync, on_update_block_template);
if (err) {
LOGERR(1, "uv_async_init failed, error " << uv_err_name(err));
panic();
}
m_blockTemplateAsync.data = this;
err = uv_async_init(uv_default_loop_checked(), &m_stopAsync, on_stop);
if (err) {
LOGERR(1, "uv_async_init failed, error " << uv_err_name(err));
panic();
}
m_stopAsync.data = this;
uv_rwlock_init_checked(&m_mainchainLock);
MinerData d;
@ -71,6 +86,8 @@ p2pool::p2pool(int argc, char* argv[])
p2pool::~p2pool()
{
uv_close(reinterpret_cast<uv_handle_t*>(&m_blockTemplateAsync), nullptr);
uv_close(reinterpret_cast<uv_handle_t*>(&m_stopAsync), nullptr);
uv_rwlock_destroy(&m_mainchainLock);
delete m_sideChain;
@ -152,6 +169,7 @@ void p2pool::handle_miner_data(MinerData& data)
data.tx_backlog.clear();
m_minerData = data;
m_updateSeed = true;
update_median_timestamp();
LOGINFO(2,
@ -167,10 +185,12 @@ void p2pool::handle_miner_data(MinerData& data)
"\n---------------------------------------------------------------------------------------------------------------"
);
m_hasher->set_seed_async(m_minerData.seed_hash);
m_blockTemplate->update(m_minerData, *m_mempool, &m_params->m_wallet);
stratum_on_block();
if (!is_main_thread) {
update_block_template_async();
}
else {
update_block_template();
}
}
static constexpr char BLOCK_FOUND[] = "\n\
@ -314,28 +334,20 @@ void p2pool::submit_sidechain_block(uint32_t template_id, uint32_t nonce, uint32
void p2pool::update_block_template_async()
{
uv_work_t* req = new uv_work_t{};
req->data = this;
const int err = uv_queue_work(uv_default_loop(), req,
[](uv_work_t* req)
{
bkg_jobs_tracker.start("p2pool::update_block_template_async");
p2pool* pool = reinterpret_cast<p2pool*>(req->data);
pool->m_blockTemplate->update(pool->m_minerData, *pool->m_mempool, &pool->m_params->m_wallet);
pool->stratum_on_block();
},
[](uv_work_t* req, int /*status*/)
{
delete req;
bkg_jobs_tracker.stop("p2pool::update_block_template_async");
});
const int err = uv_async_send(&m_blockTemplateAsync);
if (err) {
LOGERR(1, "uv_queue_work failed, error " << uv_err_name(err));
LOGERR(1, "uv_async_send failed, error " << uv_err_name(err));
}
}
void p2pool::update_block_template()
{
if (m_updateSeed) {
m_hasher->set_seed_async(m_minerData.seed_hash);
m_updateSeed = false;
}
m_blockTemplate->update(m_minerData, *m_mempool, &m_params->m_wallet);
stratum_on_block();
}
void p2pool::download_block_headers(uint64_t current_height)
@ -622,13 +634,13 @@ static void on_signal(uv_signal_t* handle, int signum)
LOGINFO(1, "stopping");
uv_signal_stop(handle);
uv_stop(uv_default_loop());
uv_stop(uv_default_loop_checked());
}
static bool init_uv_threadpool()
{
static uv_work_t dummy;
return (uv_queue_work(uv_default_loop(), &dummy, [](uv_work_t*) {}, nullptr) == 0);
return (uv_queue_work(uv_default_loop_checked(), &dummy, [](uv_work_t*) {}, nullptr) == 0);
}
static bool init_signals()
@ -648,7 +660,7 @@ static bool init_signals()
static uv_signal_t signals[array_size(signal_names)];
for (size_t i = 0; i < array_size(signal_names); ++i) {
uv_signal_init(uv_default_loop(), &signals[i]);
uv_signal_init(uv_default_loop_checked(), &signals[i]);
const int rc = uv_signal_start(&signals[i], on_signal, signal_names[i]);
if (rc != 0) {
LOGERR(1, "failed to initialize signal, error " << rc);
@ -661,13 +673,8 @@ static bool init_signals()
void p2pool::stop()
{
uv_async_t asy;
uv_loop_t *loop = uv_default_loop();
/* use async handle to make sure event loops wake up and stop */
uv_async_init(loop, &asy, NULL);
uv_stop(loop);
uv_async_send(&asy);
uv_stop(uv_default_loop());
uv_async_send(&m_stopAsync);
}
int p2pool::run()
@ -690,7 +697,7 @@ int p2pool::run()
{
ZMQReader z(m_params->m_host, m_params->m_rpcPort, m_params->m_zmqPort, this);
get_miner_data();
const int rc = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
const int rc = uv_run(uv_default_loop_checked(), UV_RUN_DEFAULT);
LOGINFO(1, "uv_run exited, result = " << rc);
}

@ -62,7 +62,9 @@ public:
void submit_block(uint32_t template_id, uint32_t nonce, uint32_t extra_nonce) const;
void submit_sidechain_block(uint32_t template_id, uint32_t nonce, uint32_t extra_nonce);
void update_block_template_async();
void update_block_template();
void download_block_headers(uint64_t current_height);
@ -72,6 +74,9 @@ private:
p2pool(const p2pool&) = delete;
p2pool(p2pool&&) = delete;
static void on_update_block_template(uv_async_t* async) { reinterpret_cast<p2pool*>(async->data)->update_block_template(); }
static void on_stop(uv_async_t*) {}
bool m_stopped;
Params* m_params;
@ -80,6 +85,7 @@ private:
RandomX_Hasher* m_hasher;
BlockTemplate* m_blockTemplate;
MinerData m_minerData;
bool m_updateSeed;
Mempool* m_mempool;
mutable uv_rwlock_t m_mainchainLock;
@ -103,6 +109,9 @@ private:
P2PServer* m_p2pServer = nullptr;
ConsoleCommands* m_consoleCommands;
uv_async_t m_blockTemplateAsync;
uv_async_t m_stopAsync;
};
} // namespace p2pool

@ -130,7 +130,7 @@ void RandomX_Hasher::set_seed_async(const hash& seed)
work->seed = seed;
work->req.data = work;
uv_queue_work(uv_default_loop(), &work->req,
uv_queue_work(uv_default_loop_checked(), &work->req,
[](uv_work_t* req)
{
bkg_jobs_tracker.start("RandomX_Hasher::set_seed_async");
@ -163,7 +163,7 @@ void RandomX_Hasher::set_old_seed_async(const hash& seed)
work->seed = seed;
work->req.data = work;
uv_queue_work(uv_default_loop(), &work->req,
uv_queue_work(uv_default_loop_checked(), &work->req,
[](uv_work_t* req)
{
bkg_jobs_tracker.start("RandomX_Hasher::set_old_seed_async");

@ -132,6 +132,19 @@ void uv_rwlock_init_checked(uv_rwlock_t* lock)
}
}
uv_loop_t* uv_default_loop_checked()
{
if (!is_main_thread) {
LOGERR(1, "uv_default_loop() can only be used by the main thread. Fix the code!");
#ifdef _WIN32
if (IsDebuggerPresent()) {
__debugbreak();
}
#endif
}
return uv_default_loop();
}
struct BackgroundJobTracker::Impl
{
Impl() { uv_mutex_init_checked(&m_lock); }
@ -235,5 +248,6 @@ void BackgroundJobTracker::print_status()
}
BackgroundJobTracker bkg_jobs_tracker;
thread_local bool is_main_thread = false;
} // namespace p2pool

@ -115,6 +115,7 @@ private:
};
extern BackgroundJobTracker bkg_jobs_tracker;
extern thread_local bool is_main_thread;
} // namespace p2pool

@ -56,5 +56,6 @@ typedef RWLock<true> WriteLock;
void uv_mutex_init_checked(uv_mutex_t* mutex);
void uv_rwlock_init_checked(uv_rwlock_t* lock);
uv_loop_t* uv_default_loop_checked();
} // namespace p2pool

Loading…
Cancel
Save