From 196b27f3b21bcf8096ecee1e3b7d089ded1fa30f Mon Sep 17 00:00:00 2001 From: SChernykh Date: Tue, 18 Apr 2023 15:38:24 +0200 Subject: [PATCH] Console: read commands via localhost TCP --- src/console_commands.cpp | 143 +++++++++++++++++------------------ src/console_commands.h | 33 ++++---- src/tcp_server.h | 7 +- src/tcp_server.inl | 159 +++++++++++++++++++++++---------------- 4 files changed, 188 insertions(+), 154 deletions(-) diff --git a/src/console_commands.cpp b/src/console_commands.cpp index 9e90032..93a26b6 100644 --- a/src/console_commands.cpp +++ b/src/console_commands.cpp @@ -28,38 +28,46 @@ static constexpr char log_category_prefix[] = "ConsoleCommands "; +static constexpr int DEFAULT_BACKLOG = 1; + +#include "tcp_server.inl" + namespace p2pool { ConsoleCommands::ConsoleCommands(p2pool* pool) - : m_pool(pool) - , m_loop{} - , m_shutdownAsync{} + : TCPServer(ConsoleClient::allocate) + , m_pool(pool) , m_tty{} , m_stdin_pipe{} , m_stdin_handle(nullptr) - , m_loopThread{} , m_readBuf{} , m_readBufInUse(false) { - const uv_handle_type stdin_type = uv_guess_handle(0); - LOGINFO(3, "uv_guess_handle returned " << static_cast(stdin_type)); - if (stdin_type != UV_TTY && stdin_type != UV_NAMED_PIPE) { - LOGERR(1, "tty or named pipe is not available"); - throw std::exception(); + std::random_device rd; + + for (int i = 0; i < 10; ++i) { + if (start_listening(false, "127.0.0.1", 49152 + (rd() % 16384))) { + break; + } } - int err = uv_loop_init(&m_loop); - if (err) { - LOGERR(1, "failed to create event loop, error " << uv_err_name(err)); + if (m_listenPort < 0) { + LOGERR(1, "failed to listen on TCP port"); throw std::exception(); } - err = uv_async_init(&m_loop, &m_shutdownAsync, on_shutdown); + int err = uv_thread_create(&m_loopThread, loop, this); if (err) { - LOGERR(1, "uv_async_init failed, error " << uv_err_name(err)); + LOGERR(1, "failed to start event loop thread, error " << uv_err_name(err)); + PANIC_STOP(); + } + + const uv_handle_type stdin_type = uv_guess_handle(0); + LOGINFO(3, "uv_guess_handle returned " << static_cast(stdin_type)); + if (stdin_type != UV_TTY && stdin_type != UV_NAMED_PIPE) { + LOGERR(1, "tty or named pipe is not available"); throw std::exception(); } - m_shutdownAsync.data = this; if (stdin_type == UV_TTY) { LOGINFO(3, "processing stdin as UV_TTY"); @@ -91,19 +99,16 @@ ConsoleCommands::ConsoleCommands(p2pool* pool) LOGERR(1, "uv_read_start failed, error " << uv_err_name(err)); throw std::exception(); } - - err = uv_thread_create(&m_loopThread, loop, this); - if (err) { - LOGERR(1, "failed to start event loop thread, error " << uv_err_name(err)); - throw std::exception(); - } } ConsoleCommands::~ConsoleCommands() { - uv_async_send(&m_shutdownAsync); - uv_thread_join(&m_loopThread); - LOGINFO(1, "stopped"); + shutdown_tcp(); +} + +void ConsoleCommands::on_shutdown() +{ + uv_close(reinterpret_cast(m_stdin_handle), nullptr); } typedef struct strconst { @@ -273,46 +278,7 @@ void ConsoleCommands::stdinReadCallback(uv_stream_t* stream, ssize_t nread, cons ConsoleCommands* pThis = static_cast(stream->data); if (nread > 0) { - std::string& command = pThis->m_command; - command.append(buf->base, nread); - - do { - size_t k = command.find_first_of("\r\n"); - if (k == std::string::npos) { - break; - } - command[k] = '\0'; - - cmd* c = cmds; - for (; c->name.len; ++c) { - if (!strncmp(command.c_str(), c->name.str, c->name.len)) { - const char* args = (c->name.len + 1 <= k) ? (command.c_str() + c->name.len + 1) : ""; - - // Skip spaces - while ((args[0] == ' ') || (args[0] == '\t')) { - ++args; - } - - // Check if an argument is required - if (strlen(c->arg) && !strlen(args)) { - LOGWARN(0, c->name.str << " requires arguments"); - do_help(nullptr, nullptr); - break; - } - - c->func(pThis->m_pool, args); - break; - } - } - - if (!c->name.len) { - LOGWARN(0, "Unknown command " << command.c_str()); - do_help(nullptr, nullptr); - } - - k = command.find_first_not_of("\r\n", k + 1); - command.erase(0, k); - } while (true); + pThis->process_input(pThis->m_command, buf->base, static_cast(nread)); } else if (nread < 0) { LOGWARN(4, "read error " << uv_err_name(static_cast(nread))); @@ -321,23 +287,48 @@ void ConsoleCommands::stdinReadCallback(uv_stream_t* stream, ssize_t nread, cons pThis->m_readBufInUse = false; } -void ConsoleCommands::loop(void* data) + +void ConsoleCommands::process_input(std::string& command, char* data, uint32_t size) { - LOGINFO(1, "event loop started"); + command.append(data, size); - ConsoleCommands* pThis = static_cast(data); + do { + size_t k = command.find_first_of("\r\n"); + if (k == std::string::npos) { + break; + } + command[k] = '\0'; - int err = uv_run(&pThis->m_loop, UV_RUN_DEFAULT); - if (err) { - LOGWARN(1, "uv_run returned " << err); - } + cmd* c = cmds; + for (; c->name.len; ++c) { + if (!strncmp(command.c_str(), c->name.str, c->name.len)) { + const char* args = (c->name.len + 1 <= k) ? (command.c_str() + c->name.len + 1) : ""; - err = uv_loop_close(&pThis->m_loop); - if (err) { - LOGWARN(1, "uv_loop_close returned error " << uv_err_name(err)); - } + // Skip spaces + while ((args[0] == ' ') || (args[0] == '\t')) { + ++args; + } + + // Check if an argument is required + if (strlen(c->arg) && !strlen(args)) { + LOGWARN(0, c->name.str << " requires arguments"); + do_help(nullptr, nullptr); + break; + } + + c->func(m_pool, args); + break; + } + } + + if (!c->name.len) { + LOGWARN(0, "Unknown command " << command.c_str()); + do_help(nullptr, nullptr); + } - LOGINFO(1, "event loop stopped"); + k = command.find_first_not_of("\r\n", k + 1); + command.erase(0, k); + } while (true); } } // namespace p2pool diff --git a/src/console_commands.h b/src/console_commands.h index def59d7..a79e41d 100644 --- a/src/console_commands.h +++ b/src/console_commands.h @@ -18,43 +18,50 @@ #pragma once #include "uv_util.h" +#include "tcp_server.h" namespace p2pool { class p2pool; -class ConsoleCommands : public nocopy_nomove +class ConsoleCommands : public TCPServer<256, 256> { public: explicit ConsoleCommands(p2pool* pool); ~ConsoleCommands(); + struct ConsoleClient : public Client + { + ~ConsoleClient() {} + + static Client* allocate() { return new ConsoleClient(); } + + size_t size() const override { return sizeof(ConsoleClient); } + + bool on_connect() override { return true; }; + bool on_read(char* data, uint32_t size) override { static_cast(m_owner)->process_input(m_command, data, size); return true; }; + + std::string m_command; + }; + + void on_shutdown() override; + private: p2pool* m_pool; - uv_loop_t m_loop; - uv_async_t m_shutdownAsync; uv_tty_t m_tty; uv_pipe_t m_stdin_pipe; uv_stream_t* m_stdin_handle; - uv_thread_t m_loopThread; char m_readBuf[64]; bool m_readBufInUse; std::string m_command; - static void loop(void* data); - - static void on_shutdown(uv_async_t* async) - { - ConsoleCommands* pThis = reinterpret_cast(async->data); - uv_close(reinterpret_cast(&pThis->m_shutdownAsync), nullptr); - uv_close(reinterpret_cast(pThis->m_stdin_handle), nullptr); - } - static void allocCallback(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void stdinReadCallback(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf); + + void process_input(std::string& command, char* data, uint32_t size); }; } // namespace p2pool diff --git a/src/tcp_server.h b/src/tcp_server.h index 3e84c79..0bd0447 100644 --- a/src/tcp_server.h +++ b/src/tcp_server.h @@ -138,7 +138,6 @@ public: FORCEINLINE bool send(Client* client, T&& callback) { return send_internal(client, SendCallback(std::move(callback))); } private: - static void loop(void* data); static void on_new_connection(uv_stream_t* server, int status); static void on_connection_close(uv_handle_t* handle); static void on_connection_error(uv_handle_t* handle); @@ -156,10 +155,14 @@ private: std::vector m_listenSockets6; std::vector m_listenSockets; - uv_thread_t m_loopThread; protected: + uv_thread_t m_loopThread; + + static void loop(void* data); + void start_listening(const std::string& listen_addresses, bool upnp); + bool start_listening(bool is_v6, const std::string& ip, int port, std::string address = std::string()); #ifdef WITH_UPNP int m_portMapping; diff --git a/src/tcp_server.inl b/src/tcp_server.inl index a054cc8..514d839 100644 --- a/src/tcp_server.inl +++ b/src/tcp_server.inl @@ -131,82 +131,115 @@ void TCPServer::parse_address_list(const std::str } template -void TCPServer::start_listening(const std::string& listen_addresses, bool upnp) +bool TCPServer::start_listening(bool is_v6, const std::string& ip, int port, std::string address) { - if (listen_addresses.empty()) { - LOGERR(1, "listen address not set"); - PANIC_STOP(); + if ((m_listenPort >= 0) && (m_listenPort != port)) { + LOGERR(1, "all sockets must be listening on the same port number, fix the command line"); + return false; } - parse_address_list(listen_addresses, - [this](bool is_v6, const std::string& address, const std::string& ip, int port) - { - if (m_listenPort < 0) { - m_listenPort = port; - } - else if (m_listenPort != port) { - LOGERR(1, "all sockets must be listening on the same port number, fix the command line"); - PANIC_STOP(); - } + uv_tcp_t* socket = new uv_tcp_t(); - uv_tcp_t* socket = new uv_tcp_t(); + int err = uv_tcp_init(&m_loop, socket); + if (err) { + LOGERR(1, "failed to create tcp server handle, error " << uv_err_name(err)); + delete socket; + return false; + } + socket->data = this; - if (is_v6) { - m_listenSockets6.push_back(socket); - } - else { - m_listenSockets.push_back(socket); - } + ON_SCOPE_LEAVE([is_v6, this, socket]() + { + const std::vector& v = is_v6 ? m_listenSockets6 : m_listenSockets; + if (std::find(v.begin(), v.end(), socket) == v.end()) { + uv_close(reinterpret_cast(socket), [](uv_handle_t* h) { delete reinterpret_cast(h); }); + } + }); - int err = uv_tcp_init(&m_loop, socket); - if (err) { - LOGERR(1, "failed to create tcp server handle, error " << uv_err_name(err)); - PANIC_STOP(); - } - socket->data = this; + err = uv_tcp_nodelay(socket, 1); + if (err) { + LOGERR(1, "failed to set tcp_nodelay on tcp server handle, error " << uv_err_name(err)); + return false; + } - err = uv_tcp_nodelay(socket, 1); - if (err) { - LOGERR(1, "failed to set tcp_nodelay on tcp server handle, error " << uv_err_name(err)); - PANIC_STOP(); - } + if (is_v6) { + if (address.empty()) { + char buf[64] = {}; + log::Stream s(buf); + s << '[' << ip << "]:" << port; + address = buf; + } - if (is_v6) { - sockaddr_in6 addr6; - err = uv_ip6_addr(ip.c_str(), port, &addr6); - if (err) { - LOGERR(1, "failed to parse IPv6 address " << ip << ", error " << uv_err_name(err)); - PANIC_STOP(); - } + sockaddr_in6 addr6; + err = uv_ip6_addr(ip.c_str(), port, &addr6); + if (err) { + LOGERR(1, "failed to parse IPv6 address " << ip << ", error " << uv_err_name(err)); + return false; + } - err = uv_tcp_bind(socket, reinterpret_cast(&addr6), UV_TCP_IPV6ONLY); - if (err) { - LOGERR(1, "failed to bind tcp server IPv6 socket " << address << ", error " << uv_err_name(err)); - PANIC_STOP(); - } - } - else { - sockaddr_in addr; - err = uv_ip4_addr(ip.c_str(), port, &addr); - if (err) { - LOGERR(1, "failed to parse IPv4 address " << ip << ", error " << uv_err_name(err)); - PANIC_STOP(); - } + err = uv_tcp_bind(socket, reinterpret_cast(&addr6), UV_TCP_IPV6ONLY); + if (err) { + LOGERR(1, "failed to bind tcp server IPv6 socket " << address << ", error " << uv_err_name(err)); + return false; + } + } + else { + if (address.empty()) { + char buf[64] = {}; + log::Stream s(buf); + s << ip << ':' << port; + address = buf; + } - err = uv_tcp_bind(socket, reinterpret_cast(&addr), 0); - if (err) { - LOGERR(1, "failed to bind tcp server IPv4 socket " << address << ", error " << uv_err_name(err)); - PANIC_STOP(); - } - } + sockaddr_in addr; + err = uv_ip4_addr(ip.c_str(), port, &addr); + if (err) { + LOGERR(1, "failed to parse IPv4 address " << ip << ", error " << uv_err_name(err)); + return false; + } + + err = uv_tcp_bind(socket, reinterpret_cast(&addr), 0); + if (err) { + LOGERR(1, "failed to bind tcp server IPv4 socket " << address << ", error " << uv_err_name(err)); + return false; + } + } + + err = uv_listen(reinterpret_cast(socket), DEFAULT_BACKLOG, on_new_connection); + if (err) { + LOGERR(1, "failed to listen on tcp server socket " << address << ", error " << uv_err_name(err)); + return false; + } + + if (is_v6) { + m_listenSockets6.push_back(socket); + } + else { + m_listenSockets.push_back(socket); + } + + if (m_listenPort < 0) { + m_listenPort = port; + } - err = uv_listen(reinterpret_cast(socket), DEFAULT_BACKLOG, on_new_connection); - if (err) { - LOGERR(1, "failed to listen on tcp server socket " << address << ", error " << uv_err_name(err)); + LOGINFO(1, "listening on " << log::Gray() << address); + return true; +} + +template +void TCPServer::start_listening(const std::string& listen_addresses, bool upnp) +{ + if (listen_addresses.empty()) { + LOGERR(1, "listen address not set"); + PANIC_STOP(); + } + + parse_address_list(listen_addresses, + [this](bool is_v6, const std::string& /*address*/, const std::string& ip, int port) + { + if (!start_listening(is_v6, ip, port)) { PANIC_STOP(); } - - LOGINFO(1, "listening on " << log::Gray() << address); }); #ifdef WITH_UPNP