From dcb822f8124fed33fbe49f9ef27a3fe92eb1d48b Mon Sep 17 00:00:00 2001 From: SChernykh Date: Tue, 5 Jul 2022 22:08:18 +0200 Subject: [PATCH] ConsoleCommands: read from stdin using libuv --- CMakeLists.txt | 7 -- src/console_commands.cpp | 165 ++++++++++++++++++++++----------------- src/console_commands.h | 24 +++++- 3 files changed, 113 insertions(+), 83 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 0e157dd..66719e3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -145,14 +145,7 @@ endif() add_definitions(/DZMQ_STATIC) -include(CheckSymbolExists) - set(CMAKE_REQUIRED_FLAGS "${GENERAL_FLAGS}") -check_symbol_exists(pthread_cancel pthread.h HAVE_PTHREAD_CANCEL) - -if (HAVE_PTHREAD_CANCEL) - add_definitions(/DHAVE_PTHREAD_CANCEL) -endif() include(CheckCXXSourceCompiles) diff --git a/src/console_commands.cpp b/src/console_commands.cpp index b50f378..d92bedc 100644 --- a/src/console_commands.cpp +++ b/src/console_commands.cpp @@ -27,35 +27,52 @@ #include "side_chain.h" #include -#ifdef HAVE_PTHREAD_CANCEL -#include -#endif - static constexpr char log_category_prefix[] = "ConsoleCommands "; namespace p2pool { -bool ConsoleCommands::stopped = false; - ConsoleCommands::ConsoleCommands(p2pool* pool) : m_pool(pool) + , m_loop{} + , m_shutdownAsync{} + , m_tty{} + , m_loopThread{} + , m_readBuf{} + , m_readBufInUse(false) { - m_worker = new std::thread(&ConsoleCommands::run, this); -} + int err = uv_loop_init(&m_loop); + if (err) { + LOGERR(1, "failed to create event loop, error " << uv_err_name(err)); + panic(); + } -ConsoleCommands::~ConsoleCommands() -{ - stopped = true; + err = uv_async_init(&m_loop, &m_shutdownAsync, on_shutdown); + if (err) { + LOGERR(1, "uv_async_init failed, error " << uv_err_name(err)); + panic(); + } + m_shutdownAsync.data = this; -#ifdef _WIN32 - TerminateThread(reinterpret_cast(m_worker->native_handle()), 0); -#elif defined HAVE_PTHREAD_CANCEL - pthread_cancel(m_worker->native_handle()); -#endif + err = uv_tty_init(&m_loop, &m_tty, 0, 1); + if (err) { + LOGERR(1, "uv_tty_init failed, error " << uv_err_name(err)); + panic(); + } + m_tty.data = this; - m_worker->join(); - delete m_worker; + uv_read_start(reinterpret_cast(&m_tty), allocCallback, stdinReadCallback); + err = uv_thread_create(&m_loopThread, loop, this); + if (err) { + LOGERR(1, "failed to start event loop thread, error " << uv_err_name(err)); + panic(); + } +} + +ConsoleCommands::~ConsoleCommands() +{ + uv_async_send(&m_shutdownAsync); + uv_thread_join(&m_loopThread); LOGINFO(1, "stopped"); } @@ -67,7 +84,7 @@ typedef struct strconst { #define STRCONST(x) {x, sizeof(x)-1} #define STRCNULL {NULL, 0} -typedef int (cmdfunc)(p2pool *pool, const char *args); +typedef void (cmdfunc)(p2pool *pool, const char *args); typedef struct cmd { strconst name; @@ -101,16 +118,15 @@ static cmd cmds[] = { { STRCNULL, NULL, NULL, NULL } }; -static int do_help(p2pool * /* m_pool */, const char * /* args */) +static void do_help(p2pool * /* m_pool */, const char * /* args */) { LOGINFO(0, "List of commands"); for (int i = 0; cmds[i].name.len; ++i) { LOGINFO(0, cmds[i].name.str << " " << cmds[i].arg << "\t" << cmds[i].descr); } - return 0; } -static int do_status(p2pool *m_pool, const char * /* args */) +static void do_status(p2pool *m_pool, const char * /* args */) { m_pool->side_chain().print_status(); if (m_pool->stratum_server()) { @@ -123,51 +139,45 @@ static int do_status(p2pool *m_pool, const char * /* args */) m_pool->print_miner_status(); #endif bkg_jobs_tracker.print_status(); - return 0; } -static int do_loglevel(p2pool * /* m_pool */, const char *args) +static void do_loglevel(p2pool * /* m_pool */, const char *args) { int level = strtol(args, nullptr, 10); level = std::min(std::max(level, 0), log::MAX_GLOBAL_LOG_LEVEL); log::GLOBAL_LOG_LEVEL = level; LOGINFO(0, "log level set to " << level); - return 0; } -static int do_addpeers(p2pool *m_pool, const char *args) +static void do_addpeers(p2pool *m_pool, const char *args) { if (m_pool->p2p_server()) { m_pool->p2p_server()->connect_to_peers(args); } - return 0; } -static int do_droppeers(p2pool *m_pool, const char * /* args */) +static void do_droppeers(p2pool *m_pool, const char * /* args */) { if (m_pool->p2p_server()) { m_pool->p2p_server()->drop_connections(); } - return 0; } -static int do_showpeers(p2pool* m_pool, const char* /* args */) +static void do_showpeers(p2pool* m_pool, const char* /* args */) { if (m_pool->p2p_server()) { m_pool->p2p_server()->show_peers(); } - return 0; } -static int do_showworkers(p2pool* m_pool, const char* /* args */) +static void do_showworkers(p2pool* m_pool, const char* /* args */) { if (m_pool->stratum_server()) { m_pool->stratum_server()->show_workers(); } - return 0; } -static int do_showbans(p2pool* m_pool, const char* /* args */) +static void do_showbans(p2pool* m_pool, const char* /* args */) { if (m_pool->stratum_server()) { m_pool->stratum_server()->print_bans(); @@ -175,87 +185,98 @@ static int do_showbans(p2pool* m_pool, const char* /* args */) if (m_pool->p2p_server()) { m_pool->p2p_server()->print_bans(); } - return 0; } -static int do_outpeers(p2pool* m_pool, const char* args) +static void do_outpeers(p2pool* m_pool, const char* args) { if (m_pool->p2p_server()) { m_pool->p2p_server()->set_max_outgoing_peers(strtoul(args, nullptr, 10)); LOGINFO(0, "max outgoing peers set to " << m_pool->p2p_server()->max_outgoing_peers()); } - return 0; } -static int do_inpeers(p2pool* m_pool, const char* args) +static void do_inpeers(p2pool* m_pool, const char* args) { if (m_pool->p2p_server()) { m_pool->p2p_server()->set_max_incoming_peers(strtoul(args, nullptr, 10)); LOGINFO(0, "max incoming peers set to " << m_pool->p2p_server()->max_incoming_peers()); } - return 0; } #ifdef WITH_RANDOMX -static int do_start_mining(p2pool* m_pool, const char* args) +static void do_start_mining(p2pool* m_pool, const char* args) { uint32_t threads = strtoul(args, nullptr, 10); threads = std::min(std::max(threads, 1u), 64u); m_pool->start_mining(threads); - return 0; } -static int do_stop_mining(p2pool* m_pool, const char* /*args*/) +static void do_stop_mining(p2pool* m_pool, const char* /*args*/) { m_pool->stop_mining(); - return 0; } #endif -static int do_exit(p2pool *m_pool, const char * /* args */) +static void do_exit(p2pool *m_pool, const char * /* args */) { bkg_jobs_tracker.wait(); m_pool->stop(); - - return 1; } -void ConsoleCommands::run() +void ConsoleCommands::allocCallback(uv_handle_t* handle, size_t /*suggested_size*/, uv_buf_t* buf) { - LOGINFO(1, "started"); - - std::string command; - command.reserve(1024); + ConsoleCommands* pThis = static_cast(handle->data); - do { - std::getline(std::cin, command); + if (pThis->m_readBufInUse) { + buf->len = 0; + buf->base = nullptr; + return; + } - if (std::cin.eof()) { - LOGINFO(1, "EOF, stopping"); - return; - } + buf->len = sizeof(pThis->m_readBuf); + buf->base = pThis->m_readBuf; + pThis->m_readBufInUse = true; +} - if (stopped) { - LOGINFO(1, "stopping"); - return; - } +void ConsoleCommands::stdinReadCallback(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) +{ + ConsoleCommands* pThis = static_cast(stream->data); + + if (nread > 0) { + for (size_t i = 0; i < static_cast(nread); ++i) { + if ((buf->base[i] == '\r') || (buf->base[i] == '\n')) { + buf->base[i] = '\0'; + + cmd* c = cmds; + for (; c->name.len; ++c) { + if (!strncmp(buf->base, c->name.str, c->name.len)) { + const char* args = (c->name.len + 1 <= i) ? (buf->base + c->name.len + 1) : ""; + c->func(pThis->m_pool, args); + break; + } + } - cmd* c = cmds; - for (; c->name.len; ++c) { - if (!strncmp(command.c_str(), c->name.str, c->name.len)) { - const char *args = command.c_str() + c->name.len + 1; - if (c->func(m_pool, args)) { - LOGINFO(1, "exit requested, stopping"); - return; + if (!c->name.len) { + LOGWARN(0, "Unknown command " << buf->base); } break; } } + } + else if (nread < 0) { + LOGWARN(4, "read error " << uv_err_name(static_cast(nread))); + } - if (!c->name.len) { - LOGWARN(0, "Unknown command " << command); - } - } while (true); + pThis->m_readBufInUse = false; +} + +void ConsoleCommands::loop(void* data) +{ + LOGINFO(1, "event loop started"); + ConsoleCommands* pThis = static_cast(data); + uv_run(&pThis->m_loop, UV_RUN_DEFAULT); + uv_loop_close(&pThis->m_loop); + LOGINFO(1, "event loop stopped"); } } // namespace p2pool diff --git a/src/console_commands.h b/src/console_commands.h index 65d9da2..825225d 100644 --- a/src/console_commands.h +++ b/src/console_commands.h @@ -17,7 +17,7 @@ #pragma once -#include +#include "uv_util.h" namespace p2pool { @@ -31,10 +31,26 @@ public: private: p2pool* m_pool; - std::thread* m_worker; - static bool stopped; - void run(); + uv_loop_t m_loop; + uv_async_t m_shutdownAsync; + uv_tty_t m_tty; + uv_thread_t m_loopThread; + + char m_readBuf[64]; + bool m_readBufInUse; + + static void loop(void* data); + + static void on_shutdown(uv_async_t* async) + { + ConsoleCommands* pThis = reinterpret_cast(async->data); + uv_close(reinterpret_cast(&pThis->m_shutdownAsync), nullptr); + uv_close(reinterpret_cast(&pThis->m_tty), nullptr); + } + + static void allocCallback(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); + static void stdinReadCallback(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf); }; } // namespace p2pool