diff --git a/.github/workflows/c-cpp.yml b/.github/workflows/c-cpp.yml index d0ba668..2431e86 100644 --- a/.github/workflows/c-cpp.yml +++ b/.github/workflows/c-cpp.yml @@ -70,7 +70,7 @@ jobs: run: | cd external/src/curl autoreconf -fi - ./configure --without-ssl --without-hyper --without-zlib --without-brotli --without-zstd --without-default-ssl-backend --without-ca-bundle --without-ca-path --without-ca-fallback --without-libpsl --without-libgsasl --without-librtmp --without-winidn --without-libidn2 --without-nghttp2 --without-ngtcp2 --without-nghttp3 --without-quiche --without-msh3 --without-zsh-functions-dir --without-fish-functions-dir --disable-ftp --disable-file --disable-ldap --disable-ldaps --disable-rtsp --disable-proxy --disable-dict --disable-telnet --disable-tftp --disable-pop3 --disable-imap --disable-smb --disable-smtp --disable-gopher --disable-mqtt --disable-manual --disable-ntlm --disable-ntlm-wb --disable-tls-srp --disable-unix-sockets --disable-cookies --disable-socketpair --disable-doh --disable-dateparse --disable-netrc --disable-progress-meter --disable-dnsshuffle --disable-hsts --disable-alt-svc --disable-ares + ./configure --without-ssl --without-hyper --without-zlib --without-brotli --without-zstd --without-default-ssl-backend --without-ca-bundle --without-ca-path --without-ca-fallback --without-libpsl --without-libgsasl --without-librtmp --without-winidn --without-libidn2 --without-nghttp2 --without-ngtcp2 --without-nghttp3 --without-quiche --without-msh3 --without-zsh-functions-dir --without-fish-functions-dir --disable-ftp --disable-file --disable-ldap --disable-ldaps --disable-rtsp --disable-dict --disable-telnet --disable-tftp --disable-pop3 --disable-imap --disable-smb --disable-smtp --disable-gopher --disable-mqtt --disable-manual --disable-ntlm --disable-ntlm-wb --disable-tls-srp --disable-unix-sockets --disable-cookies --disable-socketpair --disable-doh --disable-dateparse --disable-netrc --disable-progress-meter --disable-dnsshuffle --disable-hsts --disable-alt-svc --disable-ares make -j$(nproc) - name: Build libuv @@ -139,7 +139,7 @@ jobs: run: | cd external/src/curl autoreconf -fi - ./configure --host=aarch64-linux-gnu --without-ssl --without-hyper --without-zlib --without-brotli --without-zstd --without-default-ssl-backend --without-ca-bundle --without-ca-path --without-ca-fallback --without-libpsl --without-libgsasl --without-librtmp --without-winidn --without-libidn2 --without-nghttp2 --without-ngtcp2 --without-nghttp3 --without-quiche --without-msh3 --without-zsh-functions-dir --without-fish-functions-dir --disable-ftp --disable-file --disable-ldap --disable-ldaps --disable-rtsp --disable-proxy --disable-dict --disable-telnet --disable-tftp --disable-pop3 --disable-imap --disable-smb --disable-smtp --disable-gopher --disable-mqtt --disable-manual --disable-ntlm --disable-ntlm-wb --disable-tls-srp --disable-unix-sockets --disable-cookies --disable-socketpair --disable-doh --disable-dateparse --disable-netrc --disable-progress-meter --disable-dnsshuffle --disable-hsts --disable-alt-svc --disable-ares + ./configure --host=aarch64-linux-gnu --without-ssl --without-hyper --without-zlib --without-brotli --without-zstd --without-default-ssl-backend --without-ca-bundle --without-ca-path --without-ca-fallback --without-libpsl --without-libgsasl --without-librtmp --without-winidn --without-libidn2 --without-nghttp2 --without-ngtcp2 --without-nghttp3 --without-quiche --without-msh3 --without-zsh-functions-dir --without-fish-functions-dir --disable-ftp --disable-file --disable-ldap --disable-ldaps --disable-rtsp --disable-dict --disable-telnet --disable-tftp --disable-pop3 --disable-imap --disable-smb --disable-smtp --disable-gopher --disable-mqtt --disable-manual --disable-ntlm --disable-ntlm-wb --disable-tls-srp --disable-unix-sockets --disable-cookies --disable-socketpair --disable-doh --disable-dateparse --disable-netrc --disable-progress-meter --disable-dnsshuffle --disable-hsts --disable-alt-svc --disable-ares make -j$(nproc) - name: Build libuv @@ -195,7 +195,7 @@ jobs: run: | cd external/src/curl autoreconf -fi - ./configure --without-ssl --without-hyper --without-zlib --without-brotli --without-zstd --without-default-ssl-backend --without-ca-bundle --without-ca-path --without-ca-fallback --without-libpsl --without-libgsasl --without-librtmp --without-winidn --without-libidn2 --without-nghttp2 --without-ngtcp2 --without-nghttp3 --without-quiche --without-msh3 --without-zsh-functions-dir --without-fish-functions-dir --disable-ftp --disable-file --disable-ldap --disable-ldaps --disable-rtsp --disable-proxy --disable-dict --disable-telnet --disable-tftp --disable-pop3 --disable-imap --disable-smb --disable-smtp --disable-gopher --disable-mqtt --disable-manual --disable-ntlm --disable-ntlm-wb --disable-tls-srp --disable-unix-sockets --disable-cookies --disable-socketpair --disable-doh --disable-dateparse --disable-netrc --disable-progress-meter --disable-dnsshuffle --disable-hsts --disable-alt-svc --disable-ares + ./configure --without-ssl --without-hyper --without-zlib --without-brotli --without-zstd --without-default-ssl-backend --without-ca-bundle --without-ca-path --without-ca-fallback --without-libpsl --without-libgsasl --without-librtmp --without-winidn --without-libidn2 --without-nghttp2 --without-ngtcp2 --without-nghttp3 --without-quiche --without-msh3 --without-zsh-functions-dir --without-fish-functions-dir --disable-ftp --disable-file --disable-ldap --disable-ldaps --disable-rtsp --disable-dict --disable-telnet --disable-tftp --disable-pop3 --disable-imap --disable-smb --disable-smtp --disable-gopher --disable-mqtt --disable-manual --disable-ntlm --disable-ntlm-wb --disable-tls-srp --disable-unix-sockets --disable-cookies --disable-socketpair --disable-doh --disable-dateparse --disable-netrc --disable-progress-meter --disable-dnsshuffle --disable-hsts --disable-alt-svc --disable-ares make -j$(nproc) - name: Build libuv @@ -307,7 +307,7 @@ jobs: run: | cd external/src/curl autoreconf -fi - ./configure --without-ssl --without-hyper --without-zlib --without-brotli --without-zstd --without-default-ssl-backend --without-ca-bundle --without-ca-path --without-ca-fallback --without-libpsl --without-libgsasl --without-librtmp --without-winidn --without-libidn2 --without-nghttp2 --without-ngtcp2 --without-nghttp3 --without-quiche --without-msh3 --without-zsh-functions-dir --without-fish-functions-dir --disable-ftp --disable-file --disable-ldap --disable-ldaps --disable-rtsp --disable-proxy --disable-dict --disable-telnet --disable-tftp --disable-pop3 --disable-imap --disable-smb --disable-smtp --disable-gopher --disable-mqtt --disable-manual --disable-ntlm --disable-ntlm-wb --disable-tls-srp --disable-unix-sockets --disable-cookies --disable-socketpair --disable-doh --disable-dateparse --disable-netrc --disable-progress-meter --disable-dnsshuffle --disable-hsts --disable-alt-svc --disable-ares + ./configure --without-ssl --without-hyper --without-zlib --without-brotli --without-zstd --without-default-ssl-backend --without-ca-bundle --without-ca-path --without-ca-fallback --without-libpsl --without-libgsasl --without-librtmp --without-winidn --without-libidn2 --without-nghttp2 --without-ngtcp2 --without-nghttp3 --without-quiche --without-msh3 --without-zsh-functions-dir --without-fish-functions-dir --disable-ftp --disable-file --disable-ldap --disable-ldaps --disable-rtsp --disable-dict --disable-telnet --disable-tftp --disable-pop3 --disable-imap --disable-smb --disable-smtp --disable-gopher --disable-mqtt --disable-manual --disable-ntlm --disable-ntlm-wb --disable-tls-srp --disable-unix-sockets --disable-cookies --disable-socketpair --disable-doh --disable-dateparse --disable-netrc --disable-progress-meter --disable-dnsshuffle --disable-hsts --disable-alt-svc --disable-ares make -j3 - name: Build libuv diff --git a/.github/workflows/test-sync.yml b/.github/workflows/test-sync.yml index df11315..e22e081 100644 --- a/.github/workflows/test-sync.yml +++ b/.github/workflows/test-sync.yml @@ -30,7 +30,7 @@ jobs: timeout-minutes: 15 run: | cd build - ./p2pool --host p2pmd.xmrvsbeast.com --zmq-port 18084 --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --no-cache --loglevel 4 + ./p2pool --host p2pmd.xmrvsbeast.com --zmq-port 18084 --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --no-cache --loglevel 6 - name: Archive p2pool.log uses: actions/upload-artifact@v2 @@ -62,7 +62,7 @@ jobs: timeout-minutes: 15 run: | cd build - ./p2pool --host p2pmd.xmrvsbeast.com --zmq-port 18084 --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --no-cache --loglevel 4 + ./p2pool --host p2pmd.xmrvsbeast.com --zmq-port 18084 --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --no-cache --loglevel 6 - name: Archive p2pool.log uses: actions/upload-artifact@v2 @@ -94,7 +94,7 @@ jobs: timeout-minutes: 15 run: | cd build/Release - ./p2pool.exe --host p2pmd.xmrvsbeast.com --zmq-port 18084 --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --no-cache --loglevel 4 + ./p2pool.exe --host p2pmd.xmrvsbeast.com --zmq-port 18084 --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --no-cache --loglevel 6 - name: Archive p2pool.log uses: actions/upload-artifact@v2 diff --git a/docs/COMMAND_LINE.MD b/docs/COMMAND_LINE.MD index 5d5f315..4c50dcd 100644 --- a/docs/COMMAND_LINE.MD +++ b/docs/COMMAND_LINE.MD @@ -23,6 +23,7 @@ --mini Connect to p2pool-mini sidechain. Note that it will also change default p2p port from 37889 to 37888 --no-autodiff Disable automatic difficulty adjustment for miners connected to stratum --rpc-login Specify username[:password] required for Monero RPC server +--socks5 Specify IP:port of a SOCKS5 proxy to use for outgoing connections ``` ### Example command line diff --git a/src/block_template.cpp b/src/block_template.cpp index 530303d..29ee396 100644 --- a/src/block_template.cpp +++ b/src/block_template.cpp @@ -910,11 +910,12 @@ void BlockTemplate::calc_merkle_tree_main_branch() } } -bool BlockTemplate::get_difficulties(const uint32_t template_id, difficulty_type& mainchain_difficulty, difficulty_type& sidechain_difficulty) const +bool BlockTemplate::get_difficulties(const uint32_t template_id, uint64_t& height, difficulty_type& mainchain_difficulty, difficulty_type& sidechain_difficulty) const { ReadLock lock(m_lock); if (template_id == m_templateId) { + height = m_height; mainchain_difficulty = m_difficulty; sidechain_difficulty = m_poolBlockTemplate->m_difficulty; return true; @@ -923,7 +924,7 @@ bool BlockTemplate::get_difficulties(const uint32_t template_id, difficulty_type const BlockTemplate* old = m_oldTemplates[template_id % array_size(&BlockTemplate::m_oldTemplates)]; if (old && (template_id == old->m_templateId)) { - return old->get_difficulties(template_id, mainchain_difficulty, sidechain_difficulty); + return old->get_difficulties(template_id, height, mainchain_difficulty, sidechain_difficulty); } return false; diff --git a/src/block_template.h b/src/block_template.h index d85b029..d651330 100644 --- a/src/block_template.h +++ b/src/block_template.h @@ -40,7 +40,7 @@ public: void update(const MinerData& data, const Mempool& mempool, Wallet* miner_wallet); - bool get_difficulties(const uint32_t template_id, difficulty_type& mainchain_difficulty, difficulty_type& sidechain_difficulty) const; + bool get_difficulties(const uint32_t template_id, uint64_t& height, difficulty_type& mainchain_difficulty, difficulty_type& sidechain_difficulty) const; uint32_t get_hashing_blob(const uint32_t template_id, uint32_t extra_nonce, uint8_t (&blob)[128], uint64_t& height, difficulty_type& difficulty, difficulty_type& sidechain_difficulty, hash& seed_hash, size_t& nonce_offset) const; uint32_t get_hashing_blob(uint32_t extra_nonce, uint8_t (&blob)[128], uint64_t& height, difficulty_type& difficulty, difficulty_type& sidechain_difficulty, hash& seed_hash, size_t& nonce_offset, uint32_t& template_id) const; diff --git a/src/console_commands.cpp b/src/console_commands.cpp index 821f0a0..47ae8a2 100644 --- a/src/console_commands.cpp +++ b/src/console_commands.cpp @@ -161,21 +161,21 @@ static void do_loglevel(p2pool * /* m_pool */, const char *args) static void do_addpeers(p2pool *m_pool, const char *args) { if (m_pool->p2p_server()) { - m_pool->p2p_server()->connect_to_peers(args); + m_pool->p2p_server()->connect_to_peers_async(args); } } static void do_droppeers(p2pool *m_pool, const char * /* args */) { if (m_pool->p2p_server()) { - m_pool->p2p_server()->drop_connections(); + m_pool->p2p_server()->drop_connections_async(); } } static void do_showpeers(p2pool* m_pool, const char* /* args */) { if (m_pool->p2p_server()) { - m_pool->p2p_server()->show_peers(); + m_pool->p2p_server()->show_peers_async(); } } diff --git a/src/json_rpc_request.cpp b/src/json_rpc_request.cpp index 96d19ed..bf0c25c 100644 --- a/src/json_rpc_request.cpp +++ b/src/json_rpc_request.cpp @@ -27,7 +27,7 @@ namespace JSONRPCRequest { struct CurlContext { - CurlContext(const std::string& address, int port, const std::string& req, const std::string& auth, CallbackBase* cb, CallbackBase* close_cb, uv_loop_t* loop); + CurlContext(const std::string& address, int port, const std::string& req, const std::string& auth, const std::string& proxy, CallbackBase* cb, CallbackBase* close_cb, uv_loop_t* loop); ~CurlContext(); static int socket_func(CURL* easy, curl_socket_t s, int action, void* userp, void* socketp) @@ -73,7 +73,6 @@ struct CurlContext std::string m_url; std::string m_req; - std::string m_auth; std::vector m_response; std::string m_error; @@ -81,7 +80,7 @@ struct CurlContext curl_slist* m_headers; }; -CurlContext::CurlContext(const std::string& address, int port, const std::string& req, const std::string& auth, CallbackBase* cb, CallbackBase* close_cb, uv_loop_t* loop) +CurlContext::CurlContext(const std::string& address, int port, const std::string& req, const std::string& auth, const std::string& proxy, CallbackBase* cb, CallbackBase* close_cb, uv_loop_t* loop) : m_callback(cb) , m_closeCallback(close_cb) , m_loop(loop) @@ -90,7 +89,6 @@ CurlContext::CurlContext(const std::string& address, int port, const std::string , m_multiHandle(nullptr) , m_handle(nullptr) , m_req(req) - , m_auth(auth) , m_headers(nullptr) { m_pollHandles.reserve(2); @@ -176,19 +174,31 @@ CurlContext::CurlContext(const std::string& address, int port, const std::string curl_easy_setopt_checked(m_handle, CURLOPT_WRITEFUNCTION, write_func); curl_easy_setopt_checked(m_handle, CURLOPT_WRITEDATA, this); + const int timeout = proxy.empty() ? 1 : 5; + curl_easy_setopt_checked(m_handle, CURLOPT_URL, m_url.c_str()); curl_easy_setopt_checked(m_handle, CURLOPT_POSTFIELDS, m_req.c_str()); - curl_easy_setopt_checked(m_handle, CURLOPT_CONNECTTIMEOUT, 1); - curl_easy_setopt_checked(m_handle, CURLOPT_TIMEOUT, 10); + curl_easy_setopt_checked(m_handle, CURLOPT_CONNECTTIMEOUT, timeout); + curl_easy_setopt_checked(m_handle, CURLOPT_TIMEOUT, timeout * 10); m_headers = curl_slist_append(m_headers, "Content-Type: application/json"); if (m_headers) { curl_easy_setopt_checked(m_handle, CURLOPT_HTTPHEADER, m_headers); } - if (!m_auth.empty()) { + if (!auth.empty()) { curl_easy_setopt_checked(m_handle, CURLOPT_HTTPAUTH, CURLAUTH_DIGEST | CURLAUTH_ONLY); - curl_easy_setopt_checked(m_handle, CURLOPT_USERPWD, m_auth.c_str()); + curl_easy_setopt_checked(m_handle, CURLOPT_USERPWD, auth.c_str()); + } + + if (!proxy.empty()) { + if (is_localhost(address)) { + LOGINFO(5, "not using proxy to connect to localhost address " << log::Gray() << address); + } + else { + curl_easy_setopt_checked(m_handle, CURLOPT_PROXYTYPE, CURLPROXY_SOCKS5_HOSTNAME); + curl_easy_setopt_checked(m_handle, CURLOPT_PROXY, proxy.c_str()); + } } CURLMcode curl_err = curl_multi_add_handle(m_multiHandle, m_handle); @@ -443,7 +453,7 @@ void CurlContext::shutdown() } } -void Call(const std::string& address, int port, const std::string& req, const std::string& auth, CallbackBase* cb, CallbackBase* close_cb, uv_loop_t* loop) +void Call(const std::string& address, int port, const std::string& req, const std::string& auth, const std::string& proxy, CallbackBase* cb, CallbackBase* close_cb, uv_loop_t* loop) { if (!loop) { loop = uv_default_loop(); @@ -453,7 +463,7 @@ void Call(const std::string& address, int port, const std::string& req, const st [=]() { try { - new CurlContext(address, port, req, auth, cb, close_cb, loop); + new CurlContext(address, port, req, auth, proxy, cb, close_cb, loop); } catch (const std::exception& e) { const char* msg = e.what(); diff --git a/src/json_rpc_request.h b/src/json_rpc_request.h index 70f6b90..e93ae8f 100644 --- a/src/json_rpc_request.h +++ b/src/json_rpc_request.h @@ -37,12 +37,12 @@ private: T m_cb; }; -void Call(const std::string& address, int port, const std::string& req, const std::string& auth, CallbackBase* cb, CallbackBase* close_cb, uv_loop_t* loop); +void Call(const std::string& address, int port, const std::string& req, const std::string& auth, const std::string& proxy, CallbackBase* cb, CallbackBase* close_cb, uv_loop_t* loop); template -FORCEINLINE void call(const std::string& address, int port, const std::string& req, const std::string& auth, T&& cb, U&& close_cb, uv_loop_t* loop = nullptr) +FORCEINLINE void call(const std::string& address, int port, const std::string& req, const std::string& auth, const std::string& proxy, T&& cb, U&& close_cb, uv_loop_t* loop = nullptr) { - Call(address, port, req, auth, new Callback(std::move(cb)), new Callback(std::move(close_cb)), loop); + Call(address, port, req, auth, proxy, new Callback(std::move(cb)), new Callback(std::move(close_cb)), loop); } } // namespace JSONRPCRequest diff --git a/src/main.cpp b/src/main.cpp index 1094874..efeed01 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -48,6 +48,7 @@ void p2pool_usage() "--mini Connect to p2pool-mini sidechain. Note that it will also change default p2p port from %d to %d\n" "--no-autodiff Disable automatic difficulty adjustment for miners connected to stratum\n" "--rpc-login Specify username[:password] required for Monero RPC server\n" + "--socks5 Specify IP:port of a SOCKS5 proxy to use for outgoing connections\n" "--help Show this help message\n\n" "Example command line:\n\n" "%s --host 127.0.0.1 --rpc-port 18081 --zmq-port 18083 --wallet YOUR_WALLET_ADDRESS --stratum 0.0.0.0:%d --p2p 0.0.0.0:%d\n\n", diff --git a/src/p2p_server.cpp b/src/p2p_server.cpp index 1400443..564b588 100644 --- a/src/p2p_server.cpp +++ b/src/p2p_server.cpp @@ -68,6 +68,19 @@ P2PServer::P2PServer(p2pool* pool) const Params& params = pool->params(); + if (!params.m_socks5Proxy.empty()) { + parse_address_list(params.m_socks5Proxy, + [this](bool is_v6, const std::string& /*address*/, const std::string& ip, int port) + { + if (!str_to_ip(is_v6, ip.c_str(), m_socks5ProxyIP)) { + panic(); + } + m_socks5ProxyV6 = is_v6; + m_socks5ProxyPort = port; + }); + m_socks5Proxy = params.m_socks5Proxy; + } + set_max_outgoing_peers(params.m_maxOutgoingPeers); set_max_incoming_peers(params.m_maxIncomingPeers); @@ -77,6 +90,7 @@ P2PServer::P2PServer(p2pool* pool) uv_mutex_init_checked(&m_broadcastLock); uv_mutex_init_checked(&m_missingBlockRequestsLock); uv_rwlock_init_checked(&m_cachedBlocksLock); + uv_mutex_init_checked(&m_connectToPeersLock); int err = uv_async_init(&m_loop, &m_broadcastAsync, on_broadcast); if (err) { @@ -86,6 +100,20 @@ P2PServer::P2PServer(p2pool* pool) m_broadcastAsync.data = this; m_broadcastQueue.reserve(2); + err = uv_async_init(&m_loop, &m_connectToPeersAsync, on_connect_to_peers); + if (err) { + LOGERR(1, "uv_async_init failed, error " << uv_err_name(err)); + panic(); + } + m_connectToPeersAsync.data = this; + + err = uv_async_init(&m_loop, &m_showPeersAsync, on_show_peers); + if (err) { + LOGERR(1, "uv_async_init failed, error " << uv_err_name(err)); + panic(); + } + m_showPeersAsync.data = this; + err = uv_timer_init(&m_loop, &m_timer); if (err) { LOGERR(1, "failed to create timer, error " << uv_err_name(err)); @@ -114,6 +142,8 @@ P2PServer::~P2PServer() uv_timer_stop(&m_timer); uv_close(reinterpret_cast(&m_timer), nullptr); uv_close(reinterpret_cast(&m_broadcastAsync), nullptr); + uv_close(reinterpret_cast(&m_connectToPeersAsync), nullptr); + uv_close(reinterpret_cast(&m_showPeersAsync), nullptr); shutdown_tcp(); @@ -126,6 +156,8 @@ P2PServer::~P2PServer() clear_cached_blocks(); uv_rwlock_destroy(&m_cachedBlocksLock); + uv_mutex_destroy(&m_connectToPeersLock); + delete m_block; delete m_cache; @@ -179,12 +211,42 @@ void P2PServer::store_in_cache(const PoolBlock& block) } } +void P2PServer::connect_to_peers_async(const char* peer_list) +{ + { + MutexLock lock(m_connectToPeersLock); + if (!m_connectToPeersData.empty()) { + m_connectToPeersData.append(1, ','); + } + m_connectToPeersData.append(peer_list); + } + + if (!uv_is_closing(reinterpret_cast(&m_connectToPeersAsync))) { + uv_async_send(&m_connectToPeersAsync); + } +} + +void P2PServer::on_connect_to_peers(uv_async_t* handle) +{ + P2PServer* server = reinterpret_cast(handle->data); + + std::string peer_list; + { + MutexLock lock(server->m_connectToPeersLock); + peer_list = std::move(server->m_connectToPeersData); + } + + if (!peer_list.empty()) { + server->connect_to_peers(peer_list); + } +} + void P2PServer::connect_to_peers(const std::string& peer_list) { parse_address_list(peer_list, [this](bool is_v6, const std::string& /*address*/, std::string ip, int port) { - if (resolve_host(ip, is_v6)) { + if (!m_socks5Proxy.empty() || resolve_host(ip, is_v6)) { connect_to_peer(is_v6, ip.c_str(), port); } }); @@ -219,7 +281,7 @@ void P2PServer::update_peer_connections() connected_clients.reserve(m_numConnections); for (P2PClient* client = static_cast(m_connectedClientsList->m_next); client != m_connectedClientsList; client = static_cast(client->m_next)) { const int timeout = client->m_handshakeComplete ? 300 : 10; - if (cur_time >= client->m_lastAlive + timeout) { + if ((cur_time >= client->m_lastAlive + timeout) && (client->m_socks5ProxyState == Client::Socks5ProxyState::Default)) { const uint64_t idle_time = static_cast(cur_time - client->m_lastAlive); LOGWARN(5, "peer " << static_cast(client->m_addrString) << " has been idle for " << idle_time << " seconds, disconnecting"); client->close(); @@ -507,29 +569,10 @@ void P2PServer::load_peer_list() [this](bool is_v6, const std::string& /*address*/, const std::string& ip, int port) { Peer p; - if (is_v6) { - sockaddr_in6 addr6; - const int err = uv_ip6_addr(ip.c_str(), port, &addr6); - if (err) { - LOGERR(1, "failed to parse IPv6 address " << ip << ", error " << uv_err_name(err)); - return; - } - p.m_isV6 = true; - memcpy(p.m_addr.data, &addr6.sin6_addr, sizeof(in6_addr)); - } - else { - sockaddr_in addr4; - const int err = uv_ip4_addr(ip.c_str(), port, &addr4); - if (err) { - LOGERR(1, "failed to parse IPv4 address " << ip << ", error " << uv_err_name(err)); - return; - } - p.m_isV6 = false; - p.m_addr = {}; - p.m_addr.data[10] = 0xFF; - p.m_addr.data[11] = 0xFF; - memcpy(p.m_addr.data + 12, &addr4.sin_addr, sizeof(in_addr)); + if (!str_to_ip(is_v6, ip.c_str(), p.m_addr)) { + return; } + p.m_isV6 = is_v6; bool already_added = false; for (const Peer& peer : m_peerList) { @@ -555,7 +598,7 @@ void P2PServer::load_monerod_peer_list() { const Params& params = m_pool->params(); - JSONRPCRequest::call(params.m_host, params.m_rpcPort, "/get_peer_list", params.m_rpcLogin, + JSONRPCRequest::call(params.m_host, params.m_rpcPort, "/get_peer_list", params.m_rpcLogin, m_socks5Proxy, [this](const char* data, size_t size) { #define ERR_STR "/get_peer_list RPC request returned invalid JSON " @@ -604,27 +647,10 @@ void P2PServer::load_monerod_peer_list() Peer p; p.m_lastSeen = last_seen; + p.m_isV6 = (strchr(ip, ':') != 0); - if (strchr(ip, ':')) { - sockaddr_in6 addr6; - const int err = uv_ip6_addr(ip, port, &addr6); - if (err) { - continue; - } - p.m_isV6 = true; - memcpy(p.m_addr.data, &addr6.sin6_addr, sizeof(in6_addr)); - } - else { - sockaddr_in addr4; - const int err = uv_ip4_addr(ip, port, &addr4); - if (err) { - continue; - } - p.m_isV6 = false; - p.m_addr = {}; - p.m_addr.data[10] = 0xFF; - p.m_addr.data[11] = 0xFF; - memcpy(p.m_addr.data + 12, &addr4.sin_addr, sizeof(in_addr)); + if (!str_to_ip(p.m_isV6, ip, p.m_addr)) { + continue; } p.m_port = port; @@ -635,8 +661,8 @@ void P2PServer::load_monerod_peer_list() } } - // Put recently active peers first in the list - std::sort(m_peerListMonero.begin(), m_peerListMonero.end(), [](const Peer& a, const Peer& b) { return a.m_lastSeen > b.m_lastSeen; }); + // Put recently active peers last in the list (it will be scanned backwards) + std::sort(m_peerListMonero.begin(), m_peerListMonero.end(), [](const Peer& a, const Peer& b) { return a.m_lastSeen < b.m_lastSeen; }); LOGINFO(4, "monerod peer list loaded (" << m_peerListMonero.size() << " peers)"); }, @@ -793,7 +819,7 @@ void P2PServer::on_broadcast() MutexLock lock(m_clientsListLock); for (P2PClient* client = static_cast(m_connectedClientsList->m_next); client != m_connectedClientsList; client = static_cast(client->m_next)) { - if (!client->m_handshakeComplete || !client->m_handshakeSolutionSent) { + if (!client->is_good()) { continue; } @@ -875,6 +901,13 @@ void P2PServer::print_status() ); } +void P2PServer::show_peers_async() +{ + if (!uv_is_closing(reinterpret_cast(&m_showPeersAsync))) { + uv_async_send(&m_showPeersAsync); + } +} + void P2PServer::show_peers() { MutexLock lock(m_clientsListLock); @@ -1133,12 +1166,14 @@ bool P2PServer::P2PClient::on_connect() return false; } - // Don't allow multiple connections to/from the same IP + // Don't allow multiple connections to/from the same IP (except localhost) // server->m_clientsListLock is already locked here - for (P2PClient* client = static_cast(server->m_connectedClientsList->m_next); client != server->m_connectedClientsList; client = static_cast(client->m_next)) { - if ((client != this) && (client->m_addr == m_addr)) { - LOGINFO(5, "peer " << static_cast(m_addrString) << " is already connected as " << static_cast(client->m_addrString)); - return false; + if (!m_addr.is_localhost()) { + for (P2PClient* client = static_cast(server->m_connectedClientsList->m_next); client != server->m_connectedClientsList; client = static_cast(client->m_next)) { + if ((client != this) && (client->m_addr == m_addr)) { + LOGINFO(5, "peer " << static_cast(m_addrString) << " is already connected as " << static_cast(client->m_addrString)); + return false; + } } } @@ -1891,7 +1926,7 @@ bool P2PServer::P2PClient::on_peer_list_request(const uint8_t*) uint32_t n = 0; for (P2PClient* client = static_cast(server->m_connectedClientsList->m_next); client != server->m_connectedClientsList; client = static_cast(client->m_next)) { - if ((client->m_listenPort < 0) || (client->m_addr == m_addr)) { + if (!client->is_good() || (client->m_addr == m_addr)) { continue; } diff --git a/src/p2p_server.h b/src/p2p_server.h index 0119834..f562257 100644 --- a/src/p2p_server.h +++ b/src/p2p_server.h @@ -51,6 +51,7 @@ public: void clear_cached_blocks(); void store_in_cache(const PoolBlock& block); + void connect_to_peers_async(const char* peer_list); void connect_to_peers(const std::string& peer_list); void on_connect_failed(bool is_v6, const raw_ip& ip, int port) override; @@ -134,7 +135,7 @@ public: uint64_t get_peerId() const { return m_peerId; } void print_status() override; - void show_peers(); + void show_peers_async(); size_t peer_list_size() const { MutexLock lock(m_peerListLock); return m_peerList.size(); } uint32_t max_outgoing_peers() const { return m_maxOutgoingPeers; } @@ -221,6 +222,17 @@ private: static void on_broadcast(uv_async_t* handle) { reinterpret_cast(handle->data)->on_broadcast(); } void on_broadcast(); + + uv_mutex_t m_connectToPeersLock; + uv_async_t m_connectToPeersAsync; + std::string m_connectToPeersData; + + static void on_connect_to_peers(uv_async_t* handle); + + uv_async_t m_showPeersAsync; + + static void on_show_peers(uv_async_t* handle) { reinterpret_cast(handle->data)->show_peers(); } + void show_peers(); }; } // namespace p2pool diff --git a/src/p2pool.cpp b/src/p2pool.cpp index c411270..29a9e03 100644 --- a/src/p2pool.cpp +++ b/src/p2pool.cpp @@ -65,7 +65,7 @@ p2pool::p2pool(int argc, char* argv[]) } bool is_v6; - if (!resolve_host(m_params->m_host, is_v6)) { + if (m_params->m_socks5Proxy.empty() && !resolve_host(m_params->m_host, is_v6)) { LOGERR(1, "resolve_host failed for " << m_params->m_host); throw std::exception(); } @@ -322,7 +322,7 @@ void p2pool::handle_miner_data(MinerData& data) log::Stream s(buf); s << "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_block_header_by_height\",\"params\":{\"height\":" << h << "}}\0"; - JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, buf, m_params->m_rpcLogin, + JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, buf, m_params->m_rpcLogin, m_params->m_socks5Proxy, [this, h](const char* data, size_t size) { ChainMain block; @@ -457,6 +457,8 @@ void p2pool::submit_block_async(const std::vector& blob) } } +bool init_signals(p2pool* pool, bool init); + void p2pool::on_stop(uv_async_t* async) { p2pool* pool = reinterpret_cast(async->data); @@ -470,9 +472,11 @@ void p2pool::on_stop(uv_async_t* async) uv_close(reinterpret_cast(&pool->m_stopAsync), nullptr); uv_close(reinterpret_cast(&pool->m_restartZMQAsync), nullptr); + init_signals(pool, false); + uv_loop_t* loop = uv_default_loop_checked(); delete GetLoopUserData(loop, false); - uv_stop(loop); + loop->data = nullptr; } void p2pool::submit_block() const @@ -532,7 +536,7 @@ void p2pool::submit_block() const } request.append("\"]}"); - JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, request, m_params->m_rpcLogin, + JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, request, m_params->m_rpcLogin, m_params->m_socks5Proxy, [height, diff, template_id, nonce, extra_nonce, is_external](const char* data, size_t size) { rapidjson::Document doc; @@ -648,7 +652,7 @@ void p2pool::download_block_headers(uint64_t current_height) s.m_pos = 0; s << "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_block_header_by_height\",\"params\":{\"height\":" << height << "}}\0"; - JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, buf, m_params->m_rpcLogin, + JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, buf, m_params->m_rpcLogin, m_params->m_socks5Proxy, [this, prev_seed_height, height](const char* data, size_t size) { ChainMain block; @@ -677,14 +681,14 @@ void p2pool::download_block_headers(uint64_t current_height) s.m_pos = 0; s << "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_block_headers_range\",\"params\":{\"start_height\":" << start_height << ",\"end_height\":" << current_height - 1 << "}}\0"; - JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, buf, m_params->m_rpcLogin, + JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, buf, m_params->m_rpcLogin, m_params->m_socks5Proxy, [this, start_height, current_height](const char* data, size_t size) { if (parse_block_headers_range(data, size) == current_height - start_height) { update_median_timestamp(); if (m_serversStarted.exchange(1) == 0) { try { - m_ZMQReader = new ZMQReader(m_params->m_host.c_str(), m_params->m_zmqPort, this); + m_ZMQReader = new ZMQReader(m_params->m_host, m_params->m_zmqPort, m_params->m_socks5Proxy, this); } catch (const std::exception& e) { LOGERR(1, "Couldn't start ZMQ reader: exception " << e.what()); @@ -785,7 +789,7 @@ void p2pool::stratum_on_block() void p2pool::get_info() { - JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_info\"}", m_params->m_rpcLogin, + JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_info\"}", m_params->m_rpcLogin, m_params->m_socks5Proxy, [this](const char* data, size_t size) { parse_get_info_rpc(data, size); @@ -897,7 +901,7 @@ void p2pool::parse_get_info_rpc(const char* data, size_t size) void p2pool::get_version() { - JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_version\"}", m_params->m_rpcLogin, + JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_version\"}", m_params->m_rpcLogin, m_params->m_socks5Proxy, [this](const char* data, size_t size) { parse_get_version_rpc(data, size); @@ -967,7 +971,7 @@ void p2pool::get_miner_data() { m_getMinerDataPending = true; - JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_miner_data\"}", m_params->m_rpcLogin, + JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_miner_data\"}", m_params->m_rpcLogin, m_params->m_socks5Proxy, [this](const char* data, size_t size) { parse_get_miner_data_rpc(data, size); @@ -1427,7 +1431,7 @@ static bool init_uv_threadpool() return (uv_queue_work(uv_default_loop_checked(), &dummy, [](uv_work_t*) {}, nullptr) == 0); } -static bool init_signals(p2pool* pool) +bool init_signals(p2pool* pool, bool init) { #ifdef SIGPIPE signal(SIGPIPE, SIG_IGN); @@ -1447,6 +1451,14 @@ static bool init_signals(p2pool* pool) static uv_signal_t signals[array_size(signal_names)]; + if (!init) { + for (size_t i = 0; i < array_size(signals); ++i) { + uv_signal_stop(&signals[i]); + uv_close(reinterpret_cast(&signals[i]), nullptr); + } + return true; + } + for (size_t i = 0; i < array_size(signal_names); ++i) { uv_signal_init(uv_default_loop_checked(), &signals[i]); signals[i].data = pool; @@ -1488,7 +1500,7 @@ void p2pool::restart_zmq() m_ZMQReader = nullptr; try { - m_ZMQReader = new ZMQReader(m_params->m_host.c_str(), m_params->m_zmqPort, this); + m_ZMQReader = new ZMQReader(m_params->m_host, m_params->m_zmqPort, m_params->m_socks5Proxy, this); m_zmqLastActive = seconds_since_epoch(); } catch (const std::exception& e) { @@ -1498,7 +1510,7 @@ void p2pool::restart_zmq() int p2pool::run() { - if (!m_params->ok()) { + if (!m_params->valid()) { LOGERR(1, "Invalid or missing command line. Try \"p2pool --help\"."); return 1; } @@ -1508,7 +1520,7 @@ int p2pool::run() return 1; } - if (!init_signals(this)) { + if (!init_signals(this, true)) { LOGERR(1, "failed to initialize signal handlers"); return 1; } diff --git a/src/p2pool.h b/src/p2pool.h index 6e044f1..3638271 100644 --- a/src/p2pool.h +++ b/src/p2pool.h @@ -142,7 +142,7 @@ private: void get_miner_data(); void parse_get_miner_data_rpc(const char* data, size_t size); - bool parse_block_header(const char* data, size_t size, ChainMain& result); + bool parse_block_header(const char* data, size_t size, ChainMain& c); uint32_t parse_block_headers_range(const char* data, size_t size); void api_update_network_stats(); diff --git a/src/params.cpp b/src/params.cpp index aa34ab4..02a9432 100644 --- a/src/params.cpp +++ b/src/params.cpp @@ -135,6 +135,11 @@ Params::Params(int argc, char* argv[]) ok = true; } + if ((strcmp(argv[i], "--socks5") == 0) && (i + 1 < argc)) { + m_socks5Proxy = argv[++i]; + ok = true; + } + if (!ok) { fprintf(stderr, "Unknown command line parameter %s\n\n", argv[i]); p2pool_usage(); @@ -153,7 +158,7 @@ Params::Params(int argc, char* argv[]) } } -bool Params::ok() const +bool Params::valid() const { return !m_host.empty() && m_rpcPort && m_zmqPort && m_wallet.valid(); } diff --git a/src/params.h b/src/params.h index 158b53d..62c17a7 100644 --- a/src/params.h +++ b/src/params.h @@ -25,7 +25,7 @@ struct Params { Params(int argc, char* argv[]); - bool ok() const; + bool valid() const; std::string m_host = "127.0.0.1"; uint32_t m_rpcPort = 18081; @@ -50,6 +50,7 @@ struct Params bool m_mini = false; bool m_autoDiff = true; std::string m_rpcLogin; + std::string m_socks5Proxy; }; } // namespace p2pool diff --git a/src/pow_hash.cpp b/src/pow_hash.cpp index 3a97ce8..ab5ff12 100644 --- a/src/pow_hash.cpp +++ b/src/pow_hash.cpp @@ -438,7 +438,7 @@ bool RandomX_Hasher_RPC::calculate(const void* data_ptr, size_t size, uint64_t h const Params& params = m_pool->params(); - JSONRPCRequest::call(params.m_host, params.m_rpcPort, buf, params.m_rpcLogin, + JSONRPCRequest::call(params.m_host, params.m_rpcPort, buf, params.m_rpcLogin, params.m_socks5Proxy, [&result, &h](const char* data, size_t size) { rapidjson::Document doc; diff --git a/src/pow_hash.h b/src/pow_hash.h index eb2bb66..14ad8ab 100644 --- a/src/pow_hash.h +++ b/src/pow_hash.h @@ -100,7 +100,7 @@ public: explicit RandomX_Hasher_RPC(p2pool* pool); ~RandomX_Hasher_RPC(); - bool calculate(const void* data, size_t size, uint64_t height, const hash& seed, hash& result) override; + bool calculate(const void* data_ptr, size_t size, uint64_t height, const hash& seed, hash& h) override; private: static void loop(void* data); diff --git a/src/stratum_server.cpp b/src/stratum_server.cpp index 5be4c24..4663cff 100644 --- a/src/stratum_server.cpp +++ b/src/stratum_server.cpp @@ -361,9 +361,10 @@ bool StratumServer::on_submit(StratumClient* client, uint32_t id, const char* jo if (found) { BlockTemplate& block = m_pool->block_template(); + uint64_t height; difficulty_type mainchain_diff, sidechain_diff; - if (!block.get_difficulties(template_id, mainchain_diff, sidechain_diff)) { + if (!block.get_difficulties(template_id, height, mainchain_diff, sidechain_diff)) { LOGWARN(4, "client " << static_cast(client->m_addrString) << " got a stale share"); return send(client, [id](void* buf, size_t buf_size) @@ -408,6 +409,8 @@ bool StratumServer::on_submit(StratumClient* client, uint32_t id, const char* jo share->m_target = target; share->m_resultHash = resultHash; share->m_sidechainDifficulty = sidechain_diff; + share->m_mainchainHeight = height; + share->m_effort = -1.0; share->m_timestamp = seconds_since_epoch(); uint64_t rem; @@ -852,14 +855,12 @@ void StratumServer::on_share_found(uv_work_t* req) const uint64_t n = server->m_cumulativeHashes + hashes; const double diff = sidechain_difficulty.to_double(); - const double effort = static_cast(n - server->m_cumulativeHashesAtLastShare) * 100.0 / diff; + share->m_effort = static_cast(n - server->m_cumulativeHashesAtLastShare) * 100.0 / diff; server->m_cumulativeHashesAtLastShare = n; server->m_cumulativeFoundSharesDiff += diff; ++server->m_totalFoundShares; - const char* s = client->m_customUser; - LOGINFO(0, log::Green() << "SHARE FOUND: mainchain height " << height << ", diff " << sidechain_difficulty << ", client " << static_cast(client->m_addrString) << (*s ? " user " : "") << s << ", effort " << effort << '%'); pool->submit_sidechain_block(share->m_templateId, share->m_nonce, share->m_extraNonce); } @@ -882,13 +883,16 @@ void StratumServer::on_share_found(uv_work_t* req) void StratumServer::on_after_share_found(uv_work_t* req, int /*status*/) { SubmittedShare* share = reinterpret_cast(req->data); + StratumClient* client = share->m_client; + if (share->m_highEnoughDifficulty) { + const char* s = client->m_customUser; + LOGINFO(0, log::Green() << "SHARE FOUND: mainchain height " << share->m_mainchainHeight << ", diff " << share->m_sidechainDifficulty << ", client " << static_cast(client->m_addrString) << (*s ? " user " : "") << s << ", effort " << share->m_effort << '%'); bkg_jobs_tracker.stop("StratumServer::on_share_found"); } ON_SCOPE_LEAVE([share]() { share->m_server->m_submittedSharesPool.push_back(share); }); - StratumClient* client = share->m_client; StratumServer* server = share->m_server; const bool bad_share = (share->m_result == SubmittedShare::Result::LOW_DIFF) || (share->m_result == SubmittedShare::Result::INVALID_POW); diff --git a/src/stratum_server.h b/src/stratum_server.h index 7e83af9..28099c0 100644 --- a/src/stratum_server.h +++ b/src/stratum_server.h @@ -139,6 +139,8 @@ private: uint64_t m_target; hash m_resultHash; difficulty_type m_sidechainDifficulty; + uint64_t m_mainchainHeight; + double m_effort; uint64_t m_timestamp; uint64_t m_hashes; bool m_highEnoughDifficulty; diff --git a/src/tcp_server.h b/src/tcp_server.h index 82fd624..b0673d3 100644 --- a/src/tcp_server.h +++ b/src/tcp_server.h @@ -36,7 +36,7 @@ public: bool connect_to_peer(bool is_v6, const char* ip, int port); - void drop_connections() { uv_async_send(&m_dropConnectionsAsync); } + void drop_connections_async() { uv_async_send(&m_dropConnectionsAsync); } void shutdown_tcp(); virtual void print_status(); @@ -45,7 +45,7 @@ public: int listen_port() const { return m_listenPort; } bool connect_to_peer(bool is_v6, const raw_ip& ip, int port); - virtual void on_connect_failed(bool is_v6, const raw_ip& ip, int port); + virtual void on_connect_failed(bool /*is_v6*/, const raw_ip& /*ip*/, int /*port*/) {} void ban(const raw_ip& ip, uint64_t seconds); virtual void print_bans(); @@ -58,6 +58,7 @@ public: virtual void reset(); virtual bool on_connect() = 0; virtual bool on_read(char* data, uint32_t size) = 0; + bool on_proxy_handshake(char* data, uint32_t size); virtual void on_read_failed(int /*err*/) {} virtual void on_disconnected() {} @@ -68,7 +69,7 @@ public: void close(); void ban(uint64_t seconds); - void init_addr_string(bool is_v6, const sockaddr_storage* peer_addr); + void init_addr_string(); alignas(8) char m_readBuf[READ_BUF_SIZE]; @@ -88,7 +89,13 @@ public: raw_ip m_addr; int m_port; - char m_addrString[64]; + char m_addrString[72]; + + enum class Socks5ProxyState { + Default, + MethodSelectionSent, + ConnectRequestSent, + } m_socks5ProxyState; std::atomic m_resetCounter; }; @@ -100,7 +107,6 @@ public: std::vector m_data; }; - uv_mutex_t m_writeBuffersLock; std::vector m_writeBuffers; struct SendCallbackBase @@ -131,9 +137,9 @@ private: static void on_connection_error(uv_handle_t* handle); static void on_connect(uv_connect_t* req, int status); void on_new_client(uv_stream_t* server); - void on_new_client_nolock(uv_stream_t* server, Client* client); + void on_new_client(uv_stream_t* server, Client* client); - bool connect_to_peer_nolock(Client* client, bool is_v6, const sockaddr* addr); + bool connect_to_peer(Client* client); bool send_internal(Client* client, SendCallbackBase&& callback); @@ -148,6 +154,11 @@ private: protected: void start_listening(const std::string& listen_addresses); + std::string m_socks5Proxy; + bool m_socks5ProxyV6; + raw_ip m_socks5ProxyIP; + int m_socks5ProxyPort; + std::atomic m_finished; int m_listenPort; @@ -165,7 +176,6 @@ protected: bool is_banned(const raw_ip& ip); - uv_mutex_t m_pendingConnectionsLock; unordered_set m_pendingConnections; uv_async_t m_dropConnectionsAsync; diff --git a/src/tcp_server.inl b/src/tcp_server.inl index 9b82ab6..95ca1a3 100644 --- a/src/tcp_server.inl +++ b/src/tcp_server.inl @@ -25,6 +25,9 @@ template TCPServer::TCPServer(allocate_client_callback allocate_new_client) : m_allocateNewClient(allocate_new_client) , m_loopThread{} + , m_socks5ProxyV6(false) + , m_socks5ProxyIP{} + , m_socks5ProxyPort(-1) , m_finished(0) , m_listenPort(-1) , m_loop{} @@ -57,18 +60,6 @@ TCPServer::TCPServer(allocate_client_callback all uv_mutex_init_checked(&m_clientsListLock); uv_mutex_init_checked(&m_bansLock); - uv_mutex_init_checked(&m_pendingConnectionsLock); - uv_mutex_init_checked(&m_writeBuffersLock); - - m_writeBuffers.resize(DEFAULT_BACKLOG); - for (size_t i = 0; i < m_writeBuffers.size(); ++i) { - m_writeBuffers[i] = new WriteBuf(); - } - - m_preallocatedClients.reserve(DEFAULT_BACKLOG); - for (int i = 0; i < DEFAULT_BACKLOG; ++i) { - m_preallocatedClients.emplace_back(m_allocateNewClient()); - } m_connectedClientsList = m_allocateNewClient(); m_connectedClientsList->m_next = m_connectedClientsList; @@ -228,8 +219,6 @@ bool TCPServer::connect_to_peer(bool is_v6, const return false; } - MutexLock lock(m_clientsListLock); - if (m_finished.load()) { return false; } @@ -247,48 +236,27 @@ bool TCPServer::connect_to_peer(bool is_v6, const client->m_owner = this; client->m_port = port; + client->m_isV6 = is_v6; - log::Stream s(client->m_addrString); + if (!str_to_ip(is_v6, ip, client->m_addr)) { + m_preallocatedClients.push_back(client); + return false; + } - sockaddr_storage addr; + log::Stream s(client->m_addrString); if (is_v6) { - sockaddr_in6* addr6 = reinterpret_cast(&addr); - const int err = uv_ip6_addr(ip, port, addr6); - if (err) { - LOGERR(1, "failed to parse IPv6 address " << ip << ", error " << uv_err_name(err)); - m_preallocatedClients.push_back(client); - return false; - } - - memcpy(client->m_addr.data, &addr6->sin6_addr, sizeof(in6_addr)); - s << '[' << ip << "]:" << port << '\0'; } else { - sockaddr_in* addr4 = reinterpret_cast(&addr); - const int err = uv_ip4_addr(ip, port, addr4); - if (err) { - LOGERR(1, "failed to parse IPv4 address " << ip << ", error " << uv_err_name(err)); - m_preallocatedClients.push_back(client); - return false; - } - - client->m_addr = {}; - client->m_addr.data[10] = 0xFF; - client->m_addr.data[11] = 0xFF; - memcpy(client->m_addr.data + 12, &addr4->sin_addr, sizeof(in_addr)); - s << ip << ':' << port << '\0'; } - return connect_to_peer_nolock(client, is_v6, reinterpret_cast(&addr)); + return connect_to_peer(client); } template bool TCPServer::connect_to_peer(bool is_v6, const raw_ip& ip, int port) { - MutexLock lock(m_clientsListLock); - if (m_finished.load()) { return false; } @@ -307,29 +275,10 @@ bool TCPServer::connect_to_peer(bool is_v6, const client->m_owner = this; client->m_addr = ip; client->m_port = port; + client->m_isV6 = is_v6; + client->init_addr_string(); - sockaddr_storage addr{}; - - if (is_v6) { - sockaddr_in6* addr6 = reinterpret_cast(&addr); - addr6->sin6_family = AF_INET6; - memcpy(&addr6->sin6_addr, ip.data, sizeof(in6_addr)); - addr6->sin6_port = htons(static_cast(port)); - } - else { - sockaddr_in* addr4 = reinterpret_cast(&addr); - addr4->sin_family = AF_INET; - memcpy(&addr4->sin_addr, ip.data + 12, sizeof(in_addr)); - addr4->sin_port = htons(static_cast(port)); - } - - client->init_addr_string(is_v6, &addr); - return connect_to_peer_nolock(client, is_v6, reinterpret_cast(&addr)); -} - -template -void TCPServer::on_connect_failed(bool, const raw_ip&, int) -{ + return connect_to_peer(client); } template @@ -356,7 +305,7 @@ bool TCPServer::is_banned(const raw_ip& ip) } template -bool TCPServer::connect_to_peer_nolock(Client* client, bool is_v6, const sockaddr* addr) +bool TCPServer::connect_to_peer(Client* client) { if (is_banned(client->m_addr)) { LOGINFO(5, "peer " << log::Gray() << static_cast(client->m_addrString) << log::NoColor() << " is banned, not connecting to it"); @@ -364,8 +313,6 @@ bool TCPServer::connect_to_peer_nolock(Client* cl return false; } - client->m_isV6 = is_v6; - int err = uv_tcp_init(&m_loop, &client->m_socket); if (err) { LOGERR(1, "failed to create tcp client handle, error " << uv_err_name(err)); @@ -381,8 +328,6 @@ bool TCPServer::connect_to_peer_nolock(Client* cl return false; } - MutexLock lock(m_pendingConnectionsLock); - if (!m_pendingConnections.insert(client->m_addr).second) { LOGINFO(6, "there is already a pending connection to this IP, not connecting to " << log::Gray() << static_cast(client->m_addrString)); uv_close(reinterpret_cast(&client->m_socket), on_connection_error); @@ -391,9 +336,40 @@ bool TCPServer::connect_to_peer_nolock(Client* cl uv_connect_t* connect_request = reinterpret_cast(client->m_readBuf); memset(connect_request, 0, sizeof(uv_connect_t)); - connect_request->data = client; - err = uv_tcp_connect(connect_request, &client->m_socket, addr, on_connect); + + sockaddr_storage addr{}; + + if (m_socks5Proxy.empty()) { + if (client->m_isV6) { + sockaddr_in6* addr6 = reinterpret_cast(&addr); + addr6->sin6_family = AF_INET6; + memcpy(&addr6->sin6_addr, client->m_addr.data, sizeof(in6_addr)); + addr6->sin6_port = htons(static_cast(client->m_port)); + } + else { + sockaddr_in* addr4 = reinterpret_cast(&addr); + addr4->sin_family = AF_INET; + memcpy(&addr4->sin_addr, client->m_addr.data + 12, sizeof(in_addr)); + addr4->sin_port = htons(static_cast(client->m_port)); + } + } + else { + if (m_socks5ProxyV6) { + sockaddr_in6* addr6 = reinterpret_cast(&addr); + addr6->sin6_family = AF_INET6; + memcpy(&addr6->sin6_addr, m_socks5ProxyIP.data, sizeof(in6_addr)); + addr6->sin6_port = htons(static_cast(m_socks5ProxyPort)); + } + else { + sockaddr_in* addr4 = reinterpret_cast(&addr); + addr4->sin_family = AF_INET; + memcpy(&addr4->sin_addr, m_socks5ProxyIP.data + 12, sizeof(in_addr)); + addr4->sin_port = htons(static_cast(m_socks5ProxyPort)); + } + } + + err = uv_tcp_connect(connect_request, &client->m_socket, reinterpret_cast(&addr), on_connect); if (err) { LOGERR(1, "failed to initiate tcp connection to " << static_cast(client->m_addrString) << ", error " << uv_err_name(err)); m_pendingConnections.erase(client->m_addr); @@ -429,15 +405,16 @@ void TCPServer::close_sockets(bool listen_sockets } } - MutexLock lock(m_clientsListLock); - size_t numClosed = 0; + { + MutexLock lock(m_clientsListLock); - for (Client* c = m_connectedClientsList->m_next; c != m_connectedClientsList; c = c->m_next) { - uv_handle_t* h = reinterpret_cast(&c->m_socket); - if (!uv_is_closing(h)) { - uv_close(h, on_connection_close); - ++numClosed; + for (Client* c = m_connectedClientsList->m_next; c != m_connectedClientsList; c = c->m_next) { + uv_handle_t* h = reinterpret_cast(&c->m_socket); + if (!uv_is_closing(h)) { + uv_close(h, on_connection_close); + ++numClosed; + } } } @@ -485,21 +462,8 @@ void TCPServer::shutdown_tcp() uv_thread_join(&m_loopThread); - for (Client* c : m_preallocatedClients) { - delete c; - } - uv_mutex_destroy(&m_clientsListLock); uv_mutex_destroy(&m_bansLock); - uv_mutex_destroy(&m_pendingConnectionsLock); - - { - MutexLock lock(m_writeBuffersLock); - for (WriteBuf* buf : m_writeBuffers) { - delete buf; - } - } - uv_mutex_destroy(&m_writeBuffersLock); LOGINFO(1, "stopped"); } @@ -561,17 +525,13 @@ bool TCPServer::send_internal(Client* client, Sen return true; } - WriteBuf* buf = nullptr; + WriteBuf* buf; - { - MutexLock lock(m_writeBuffersLock); - if (!m_writeBuffers.empty()) { - buf = m_writeBuffers.back(); - m_writeBuffers.pop_back(); - } + if (!m_writeBuffers.empty()) { + buf = m_writeBuffers.back(); + m_writeBuffers.pop_back(); } - - if (!buf) { + else { buf = new WriteBuf(); } @@ -586,10 +546,7 @@ bool TCPServer::send_internal(Client* client, Sen if (bytes_written == 0) { LOGWARN(1, "send callback wrote 0 bytes, nothing to do"); - { - MutexLock lock(m_writeBuffersLock); - m_writeBuffers.push_back(buf); - } + m_writeBuffers.push_back(buf); return true; } @@ -604,11 +561,8 @@ bool TCPServer::send_internal(Client* client, Sen const int err = uv_write(&buf->m_write, reinterpret_cast(&client->m_socket), bufs, 1, Client::on_write); if (err) { - { - MutexLock lock(m_writeBuffersLock); - m_writeBuffers.push_back(buf); - } LOGWARN(1, "failed to start writing data to client connection " << static_cast(client->m_addrString) << ", error " << uv_err_name(err)); + m_writeBuffers.push_back(buf); return false; } @@ -621,8 +575,27 @@ void TCPServer::loop(void* data) LOGINFO(1, "event loop started"); server_event_loop_thread = true; TCPServer* server = static_cast(data); + + server->m_writeBuffers.resize(DEFAULT_BACKLOG); + server->m_preallocatedClients.reserve(DEFAULT_BACKLOG); + for (size_t i = 0; i < DEFAULT_BACKLOG; ++i) { + server->m_writeBuffers[i] = new WriteBuf(); + server->m_preallocatedClients.emplace_back(server->m_allocateNewClient()); + } + uv_run(&server->m_loop, UV_RUN_DEFAULT); uv_loop_close(&server->m_loop); + + for (WriteBuf* buf : server->m_writeBuffers) { + delete buf; + } + server->m_writeBuffers.clear(); + + for (Client* c : server->m_preallocatedClients) { + delete c; + } + server->m_preallocatedClients.clear(); + LOGINFO(1, "event loop stopped"); server->m_loopStopped = true; } @@ -647,10 +620,6 @@ void TCPServer::on_new_connection(uv_stream_t* se template void TCPServer::on_connection_close(uv_handle_t* handle) { - if (!server_event_loop_thread) { - LOGERR(1, "on_connection_close called from another thread, this is not thread safe"); - } - Client* client = static_cast(handle->data); TCPServer* owner = client->m_owner; @@ -685,10 +654,7 @@ template void TCPServer::on_connection_error(uv_handle_t* handle) { Client* client = reinterpret_cast(handle->data); - TCPServer* server = client->m_owner; - - MutexLock lock(server->m_clientsListLock); - server->m_preallocatedClients.push_back(client); + client->m_owner->m_preallocatedClients.push_back(client); } template @@ -701,12 +667,7 @@ void TCPServer::on_connect(uv_connect_t* req, int return; } - { - MutexLock lock(server->m_pendingConnectionsLock); - server->m_pendingConnections.erase(client->m_addr); - } - - MutexLock lock(server->m_clientsListLock); + server->m_pendingConnections.erase(client->m_addr); if (status) { if (status == UV_ETIMEDOUT) { @@ -720,14 +681,12 @@ void TCPServer::on_connect(uv_connect_t* req, int return; } - server->on_new_client_nolock(nullptr, client); + server->on_new_client(nullptr, client); } template void TCPServer::on_new_client(uv_stream_t* server) { - MutexLock lock(m_clientsListLock); - if (m_finished.load()) { return; } @@ -766,62 +725,53 @@ void TCPServer::on_new_client(uv_stream_t* server return; } - on_new_client_nolock(server, client); + on_new_client(server, client); } template -void TCPServer::on_new_client_nolock(uv_stream_t* server, Client* client) +void TCPServer::on_new_client(uv_stream_t* server, Client* client) { + MutexLock lock(m_clientsListLock); + client->m_prev = m_connectedClientsList; client->m_next = m_connectedClientsList->m_next; m_connectedClientsList->m_next->m_prev = client; m_connectedClientsList->m_next = client; ++m_numConnections; - client->m_isIncoming = false; - sockaddr_storage peer_addr; - int peer_addr_len = static_cast(sizeof(peer_addr)); - int err = uv_tcp_getpeername(&client->m_socket, reinterpret_cast(&peer_addr), &peer_addr_len); - if (err) { - LOGERR(1, "failed to get IP address of the client connection, error " << uv_err_name(err)); - client->close(); - return; - } + client->m_isIncoming = (server != nullptr); - bool is_v6; - if (server) { - is_v6 = (std::find(m_listenSockets6.begin(), m_listenSockets6.end(), reinterpret_cast(server)) != m_listenSockets6.end()); - client->m_isV6 = is_v6; - } - else { - is_v6 = client->m_isV6; - } + if (client->m_isIncoming) { + client->m_isV6 = (std::find(m_listenSockets6.begin(), m_listenSockets6.end(), reinterpret_cast(server)) != m_listenSockets6.end()); - if (is_v6) { - memcpy(client->m_addr.data, &reinterpret_cast(&peer_addr)->sin6_addr, sizeof(in6_addr)); - client->m_port = ntohs(reinterpret_cast(&peer_addr)->sin6_port); - } - else { - client->m_addr = {}; - client->m_addr.data[10] = 0xFF; - client->m_addr.data[11] = 0xFF; - memcpy(client->m_addr.data + 12, &reinterpret_cast(&peer_addr)->sin_addr, sizeof(in_addr)); - client->m_port = ntohs(reinterpret_cast(&peer_addr)->sin_port); - } + sockaddr_storage peer_addr; + int peer_addr_len = static_cast(sizeof(peer_addr)); + int err = uv_tcp_getpeername(&client->m_socket, reinterpret_cast(&peer_addr), &peer_addr_len); + if (err) { + LOGERR(1, "failed to get IP address of the client connection, error " << uv_err_name(err)); + client->close(); + return; + } - client->init_addr_string(is_v6, &peer_addr); + if (client->m_isV6) { + memcpy(client->m_addr.data, &reinterpret_cast(&peer_addr)->sin6_addr, sizeof(in6_addr)); + client->m_port = ntohs(reinterpret_cast(&peer_addr)->sin6_port); + } + else { + client->m_addr = {}; + client->m_addr.data[10] = 0xFF; + client->m_addr.data[11] = 0xFF; + memcpy(client->m_addr.data + 12, &reinterpret_cast(&peer_addr)->sin_addr, sizeof(in_addr)); + client->m_port = ntohs(reinterpret_cast(&peer_addr)->sin_port); + } - if (server) { - LOGINFO(5, "new connection from " << log::Gray() << static_cast(client->m_addrString)); - client->m_isIncoming = true; + client->init_addr_string(); ++m_numIncomingConnections; } - else { - LOGINFO(5, "new connection to " << log::Gray() << static_cast(client->m_addrString)); - client->m_isIncoming = false; - } + + LOGINFO(5, "new connection " << (client->m_isIncoming ? "from " : "to ") << log::Gray() << static_cast(client->m_addrString)); if (is_banned(client->m_addr)) { LOGINFO(5, "peer " << log::Gray() << static_cast(client->m_addrString) << log::NoColor() << " is banned, disconnecting"); @@ -829,12 +779,44 @@ void TCPServer::on_new_client_nolock(uv_stream_t* return; } - if (client->m_owner->m_finished.load() || !client->on_connect()) { + TCPServer* owner = client->m_owner; + + if (owner->m_finished.load()) { client->close(); return; } - err = uv_read_start(reinterpret_cast(&client->m_socket), Client::on_alloc, Client::on_read); + if (owner->m_socks5Proxy.empty()) { + if (!client->on_connect()) { + client->close(); + return; + } + } + else { + const bool result = owner->send(client, + [](void* buf, size_t buf_size) -> size_t + { + if (buf_size < 3) { + return 0; + } + + uint8_t* p = reinterpret_cast(buf); + p[0] = 5; // Protocol version (SOCKS5) + p[1] = 1; // NMETHODS + p[2] = 0; // Method 0 (no authentication) + + return 3; + }); + + if (result) { + client->m_socks5ProxyState = Client::Socks5ProxyState::MethodSelectionSent; + } + else { + client->close(); + } + } + + const int err = uv_read_start(reinterpret_cast(&client->m_socket), Client::on_alloc, Client::on_read); if (err) { LOGERR(1, "failed to start reading from client connection, error " << uv_err_name(err)); client->close(); @@ -855,6 +837,7 @@ TCPServer::Client::Client() , m_addr{} , m_port(0) , m_addrString{} + , m_socks5ProxyState(Socks5ProxyState::Default) , m_resetCounter{ 0 } { m_readBuf[0] = '\0'; @@ -878,6 +861,7 @@ void TCPServer::Client::reset() m_addr = {}; m_port = -1; m_addrString[0] = '\0'; + m_socks5ProxyState = Socks5ProxyState::Default; } template @@ -907,34 +891,158 @@ void TCPServer::Client::on_alloc(uv_handle_t* han template void TCPServer::Client::on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { - Client* pThis = static_cast(stream->data); - pThis->m_readBufInUse = false; + Client* client = static_cast(stream->data); + client->m_readBufInUse = false; - if (pThis->m_isClosing) { - LOGWARN(5, "client " << static_cast(pThis->m_addrString) << " is being disconnected but data received from it, nread = " << nread << ". Ignoring it."); + if (client->m_isClosing) { + LOGWARN(5, "client " << static_cast(client->m_addrString) << " is being disconnected but data received from it, nread = " << nread << ". Ignoring it."); return; } if (nread > 0) { - if (pThis->m_owner && !pThis->m_owner->m_finished.load()) { - if (!pThis->on_read(buf->base, static_cast(nread))) { - pThis->close(); + if (client->m_owner && !client->m_owner->m_finished.load()) { + if (client->m_socks5ProxyState == Socks5ProxyState::Default) { + if (!client->on_read(buf->base, static_cast(nread))) { + client->close(); + } + } + else if (!client->on_proxy_handshake(buf->base, static_cast(nread))) { + client->close(); } } } else if (nread < 0) { if (nread != UV_EOF) { const int err = static_cast(nread); - LOGWARN(5, "client " << static_cast(pThis->m_addrString) << " failed to read response, err = " << uv_err_name(err)); - pThis->on_read_failed(err); + LOGWARN(5, "client " << static_cast(client->m_addrString) << " failed to read response, err = " << uv_err_name(err)); + client->on_read_failed(err); } else { - pThis->on_disconnected(); + client->on_disconnected(); } - pThis->close(); + client->close(); } } +template +bool TCPServer::Client::on_proxy_handshake(char* data, uint32_t size) +{ + if ((data != m_readBuf + m_numRead) || (data + size > m_readBuf + sizeof(m_readBuf))) { + LOGERR(1, "peer " << static_cast(m_addrString) << " invalid data pointer or size in on_read()"); + return false; + } + m_numRead += size; + + uint32_t n = 0; + + switch (m_socks5ProxyState) { + case Socks5ProxyState::MethodSelectionSent: + if (m_numRead >= 2) { + if ((m_readBuf[0] != 5) && (m_readBuf[1] != 0)) { + LOGWARN(5, "SOCKS5 proxy returned an invalid METHOD selection message"); + return false; + } + n = 2; + + const bool result = m_owner->send(this, + [this](void* buf, size_t buf_size) -> size_t + { + if (buf_size < 20) { + return 0; + } + + uint8_t* p = reinterpret_cast(buf); + p[0] = 5; // Protocol version (SOCKS5) + p[1] = 1; // CONNECT + p[2] = 0; // RESERVED + if (m_isV6) { + p[3] = 4; // ATYP + memcpy(p + 4, m_addr.data, 16); + p[20] = static_cast(m_port >> 8); + p[21] = static_cast(m_port & 0xFF); + } + else { + p[3] = 1; // ATYP + memcpy(p + 4, m_addr.data + 12, 4); + p[8] = static_cast(m_port >> 8); + p[9] = static_cast(m_port & 0xFF); + } + + return m_isV6 ? 22 : 10; + }); + + if (result) { + m_socks5ProxyState = Socks5ProxyState::ConnectRequestSent; + } + else { + close(); + } + } + break; + + case Socks5ProxyState::ConnectRequestSent: + if (m_numRead >= 4) { + uint8_t* p = reinterpret_cast(m_readBuf); + if ((p[0] != 5) && (p[1] != 0) && p[2] != 0) { + LOGWARN(5, "SOCKS5 proxy returned an invalid reply to CONNECT"); + return false; + } + + switch (p[3]) { + case 1: + if (m_numRead >= 10) { + m_socks5ProxyState = Socks5ProxyState::Default; + n = 10; + } + break; + case 3: + if (m_numRead >= 5) { + const uint32_t len = p[4]; + if (m_numRead >= 7 + len) { + m_socks5ProxyState = Socks5ProxyState::Default; + n = 7 + len; + } + } + break; + case 4: + if (m_numRead >= 22) { + m_socks5ProxyState = Socks5ProxyState::Default; + n = 22; + } + break; + } + } + break; + + default: + return false; + } + + // Move the possible unfinished message to the beginning of m_readBuf to free up more space for reading + if (n > 0) { + m_numRead -= n; + if (m_numRead > 0) { + memmove(m_readBuf, m_readBuf + n, m_numRead); + } + } + + if (m_socks5ProxyState == Socks5ProxyState::Default) { + if (!on_connect()) { + return false; + } + + if (m_numRead > 0) { + const uint32_t nread = m_numRead; + m_numRead = 0; + if (!on_read(m_readBuf, nread)) { + return false; + } + } + } + + return true; +} + template void TCPServer::Client::on_write(uv_write_t* req, int status) { @@ -943,7 +1051,6 @@ void TCPServer::Client::on_write(uv_write_t* req, TCPServer* server = client->m_owner; if (server) { - MutexLock lock(server->m_writeBuffersLock); server->m_writeBuffers.push_back(buf); } @@ -982,16 +1089,16 @@ void TCPServer::Client::ban(uint64_t seconds) } template -void TCPServer::Client::init_addr_string(bool is_v6, const sockaddr_storage* peer_addr) +void TCPServer::Client::init_addr_string() { const char* addr_str; char addr_str_buf[64]; - if (is_v6) { - addr_str = inet_ntop(AF_INET6, &reinterpret_cast(peer_addr)->sin6_addr, addr_str_buf, sizeof(addr_str_buf)); + if (m_isV6) { + addr_str = inet_ntop(AF_INET6, m_addr.data, addr_str_buf, sizeof(addr_str_buf)); } else { - addr_str = inet_ntop(AF_INET, &reinterpret_cast(peer_addr)->sin_addr, addr_str_buf, sizeof(addr_str_buf)); + addr_str = inet_ntop(AF_INET, m_addr.data + 12, addr_str_buf, sizeof(addr_str_buf)); } if (addr_str) { @@ -1001,11 +1108,11 @@ void TCPServer::Client::init_addr_string(bool is_ } log::Stream s(m_addrString); - if (is_v6) { - s << '[' << log::const_buf(addr_str, n) << "]:" << ntohs(reinterpret_cast(peer_addr)->sin6_port) << '\0'; + if (m_isV6) { + s << '[' << log::const_buf(addr_str, n) << "]:" << m_port << '\0'; } else { - s << log::const_buf(addr_str, n) << ':' << ntohs(reinterpret_cast(peer_addr)->sin_port) << '\0'; + s << log::const_buf(addr_str, n) << ':' << m_port << '\0'; } } } diff --git a/src/util.cpp b/src/util.cpp index 40d6d6c..baec96c 100644 --- a/src/util.cpp +++ b/src/util.cpp @@ -432,6 +432,57 @@ NOINLINE uint64_t bsr_reference(uint64_t x) return bsr8_table.data[y >> 24] - n0 - n1 - n2; } +bool str_to_ip(bool is_v6, const char* ip, raw_ip& result) +{ + sockaddr_storage addr; + + if (is_v6) { + sockaddr_in6* addr6 = reinterpret_cast(&addr); + const int err = uv_ip6_addr(ip, 0, addr6); + if (err) { + LOGERR(1, "failed to parse IPv6 address " << ip << ", error " << uv_err_name(err)); + return false; + } + memcpy(result.data, &addr6->sin6_addr, sizeof(in6_addr)); + } + else { + sockaddr_in* addr4 = reinterpret_cast(&addr); + const int err = uv_ip4_addr(ip, 0, addr4); + if (err) { + LOGERR(1, "failed to parse IPv4 address " << ip << ", error " << uv_err_name(err)); + return false; + } + result = {}; + result.data[10] = 0xFF; + result.data[11] = 0xFF; + memcpy(result.data + 12, &addr4->sin_addr, sizeof(in_addr)); + } + + return true; +} + +bool is_localhost(const std::string& host) +{ + if (host.empty()) { + return false; + } + + if (host.compare("localhost") == 0) { + return true; + } + + if (host.find_first_not_of("0123456789.:") != std::string::npos) { + return false; + } + + raw_ip addr; + if (!str_to_ip(host.find(':') != std::string::npos, host.c_str(), addr)) { + return false; + } + + return addr.is_localhost(); +} + UV_LoopUserData* GetLoopUserData(uv_loop_t* loop, bool create) { UV_LoopUserData* data = reinterpret_cast(loop->data); diff --git a/src/util.h b/src/util.h index b861ca1..defdd7d 100644 --- a/src/util.h +++ b/src/util.h @@ -219,6 +219,9 @@ FORCEINLINE uint64_t bsr(uint64_t x) #define bsr bsr_reference #endif +bool str_to_ip(bool is_v6, const char* ip, raw_ip& result); +bool is_localhost(const std::string& host); + } // namespace p2pool void memory_tracking_start(); diff --git a/src/zmq_reader.cpp b/src/zmq_reader.cpp index 14b39f7..919d94e 100644 --- a/src/zmq_reader.cpp +++ b/src/zmq_reader.cpp @@ -24,14 +24,20 @@ static constexpr char log_category_prefix[] = "ZMQReader "; namespace p2pool { -ZMQReader::ZMQReader(const char* address, uint32_t zmq_port, MinerCallbackHandler* handler) +ZMQReader::ZMQReader(const std::string& address, uint32_t zmq_port, const std::string& proxy, MinerCallbackHandler* handler) : m_address(address) , m_zmqPort(zmq_port) + , m_proxy(proxy) , m_handler(handler) , m_tx() , m_minerData() , m_chainmainData() { + if (!m_proxy.empty() && is_localhost(address)) { + LOGINFO(5, "not using proxy to connect to localhost address " << log::Gray() << address); + m_proxy.clear(); + } + for (uint32_t i = m_publisherPort; i < std::numeric_limits::max(); ++i) { try { m_publisherPort = 0; @@ -84,14 +90,18 @@ void ZMQReader::run_wrapper(void* arg) void ZMQReader::run() { try { - char addr[32]; + if (!m_proxy.empty()) { + m_subscriber.set(zmq::sockopt::socks_proxy, zmq::const_buffer(m_proxy.c_str(), m_proxy.length())); + } - snprintf(addr, sizeof(addr), "tcp://%s:%u", m_address, m_zmqPort); + std::string addr = "tcp://" + m_address + ':' + std::to_string(m_zmqPort); if (!connect(addr)) { return; } - snprintf(addr, sizeof(addr), "tcp://127.0.0.1:%u", m_publisherPort); + m_subscriber.set(zmq::sockopt::socks_proxy, zmq::const_buffer()); + + addr = "tcp://127.0.0.1:" + std::to_string(m_publisherPort); if (!connect(addr)) { return; } @@ -128,7 +138,7 @@ void ZMQReader::run() } } -bool ZMQReader::connect(const char* address) +bool ZMQReader::connect(const std::string& address) { struct ConnectMonitor : public zmq::monitor_t { diff --git a/src/zmq_reader.h b/src/zmq_reader.h index 0db1547..8aed572 100644 --- a/src/zmq_reader.h +++ b/src/zmq_reader.h @@ -24,18 +24,19 @@ namespace p2pool { class ZMQReader { public: - ZMQReader(const char* address, uint32_t zmq_port, MinerCallbackHandler* handler); + ZMQReader(const std::string& address, uint32_t zmq_port, const std::string& proxy, MinerCallbackHandler* handler); ~ZMQReader(); private: static void run_wrapper(void* arg); void run(); - bool connect(const char* address); + bool connect(const std::string& address); void parse(char* data, size_t size); - const char* m_address; + std::string m_address; uint32_t m_zmqPort; + std::string m_proxy; MinerCallbackHandler* m_handler; uv_thread_t m_worker{};