SOCKS5 proxy support

pull/206/head
SChernykh 2 years ago
parent 229b07391b
commit 86156f3fec

@ -70,7 +70,7 @@ jobs:
run: |
cd external/src/curl
autoreconf -fi
./configure --without-ssl --without-hyper --without-zlib --without-brotli --without-zstd --without-default-ssl-backend --without-ca-bundle --without-ca-path --without-ca-fallback --without-libpsl --without-libgsasl --without-librtmp --without-winidn --without-libidn2 --without-nghttp2 --without-ngtcp2 --without-nghttp3 --without-quiche --without-msh3 --without-zsh-functions-dir --without-fish-functions-dir --disable-ftp --disable-file --disable-ldap --disable-ldaps --disable-rtsp --disable-proxy --disable-dict --disable-telnet --disable-tftp --disable-pop3 --disable-imap --disable-smb --disable-smtp --disable-gopher --disable-mqtt --disable-manual --disable-ntlm --disable-ntlm-wb --disable-tls-srp --disable-unix-sockets --disable-cookies --disable-socketpair --disable-doh --disable-dateparse --disable-netrc --disable-progress-meter --disable-dnsshuffle --disable-hsts --disable-alt-svc --disable-ares
./configure --without-ssl --without-hyper --without-zlib --without-brotli --without-zstd --without-default-ssl-backend --without-ca-bundle --without-ca-path --without-ca-fallback --without-libpsl --without-libgsasl --without-librtmp --without-winidn --without-libidn2 --without-nghttp2 --without-ngtcp2 --without-nghttp3 --without-quiche --without-msh3 --without-zsh-functions-dir --without-fish-functions-dir --disable-ftp --disable-file --disable-ldap --disable-ldaps --disable-rtsp --disable-dict --disable-telnet --disable-tftp --disable-pop3 --disable-imap --disable-smb --disable-smtp --disable-gopher --disable-mqtt --disable-manual --disable-ntlm --disable-ntlm-wb --disable-tls-srp --disable-unix-sockets --disable-cookies --disable-socketpair --disable-doh --disable-dateparse --disable-netrc --disable-progress-meter --disable-dnsshuffle --disable-hsts --disable-alt-svc --disable-ares
make -j$(nproc)
- name: Build libuv
@ -139,7 +139,7 @@ jobs:
run: |
cd external/src/curl
autoreconf -fi
./configure --host=aarch64-linux-gnu --without-ssl --without-hyper --without-zlib --without-brotli --without-zstd --without-default-ssl-backend --without-ca-bundle --without-ca-path --without-ca-fallback --without-libpsl --without-libgsasl --without-librtmp --without-winidn --without-libidn2 --without-nghttp2 --without-ngtcp2 --without-nghttp3 --without-quiche --without-msh3 --without-zsh-functions-dir --without-fish-functions-dir --disable-ftp --disable-file --disable-ldap --disable-ldaps --disable-rtsp --disable-proxy --disable-dict --disable-telnet --disable-tftp --disable-pop3 --disable-imap --disable-smb --disable-smtp --disable-gopher --disable-mqtt --disable-manual --disable-ntlm --disable-ntlm-wb --disable-tls-srp --disable-unix-sockets --disable-cookies --disable-socketpair --disable-doh --disable-dateparse --disable-netrc --disable-progress-meter --disable-dnsshuffle --disable-hsts --disable-alt-svc --disable-ares
./configure --host=aarch64-linux-gnu --without-ssl --without-hyper --without-zlib --without-brotli --without-zstd --without-default-ssl-backend --without-ca-bundle --without-ca-path --without-ca-fallback --without-libpsl --without-libgsasl --without-librtmp --without-winidn --without-libidn2 --without-nghttp2 --without-ngtcp2 --without-nghttp3 --without-quiche --without-msh3 --without-zsh-functions-dir --without-fish-functions-dir --disable-ftp --disable-file --disable-ldap --disable-ldaps --disable-rtsp --disable-dict --disable-telnet --disable-tftp --disable-pop3 --disable-imap --disable-smb --disable-smtp --disable-gopher --disable-mqtt --disable-manual --disable-ntlm --disable-ntlm-wb --disable-tls-srp --disable-unix-sockets --disable-cookies --disable-socketpair --disable-doh --disable-dateparse --disable-netrc --disable-progress-meter --disable-dnsshuffle --disable-hsts --disable-alt-svc --disable-ares
make -j$(nproc)
- name: Build libuv
@ -195,7 +195,7 @@ jobs:
run: |
cd external/src/curl
autoreconf -fi
./configure --without-ssl --without-hyper --without-zlib --without-brotli --without-zstd --without-default-ssl-backend --without-ca-bundle --without-ca-path --without-ca-fallback --without-libpsl --without-libgsasl --without-librtmp --without-winidn --without-libidn2 --without-nghttp2 --without-ngtcp2 --without-nghttp3 --without-quiche --without-msh3 --without-zsh-functions-dir --without-fish-functions-dir --disable-ftp --disable-file --disable-ldap --disable-ldaps --disable-rtsp --disable-proxy --disable-dict --disable-telnet --disable-tftp --disable-pop3 --disable-imap --disable-smb --disable-smtp --disable-gopher --disable-mqtt --disable-manual --disable-ntlm --disable-ntlm-wb --disable-tls-srp --disable-unix-sockets --disable-cookies --disable-socketpair --disable-doh --disable-dateparse --disable-netrc --disable-progress-meter --disable-dnsshuffle --disable-hsts --disable-alt-svc --disable-ares
./configure --without-ssl --without-hyper --without-zlib --without-brotli --without-zstd --without-default-ssl-backend --without-ca-bundle --without-ca-path --without-ca-fallback --without-libpsl --without-libgsasl --without-librtmp --without-winidn --without-libidn2 --without-nghttp2 --without-ngtcp2 --without-nghttp3 --without-quiche --without-msh3 --without-zsh-functions-dir --without-fish-functions-dir --disable-ftp --disable-file --disable-ldap --disable-ldaps --disable-rtsp --disable-dict --disable-telnet --disable-tftp --disable-pop3 --disable-imap --disable-smb --disable-smtp --disable-gopher --disable-mqtt --disable-manual --disable-ntlm --disable-ntlm-wb --disable-tls-srp --disable-unix-sockets --disable-cookies --disable-socketpair --disable-doh --disable-dateparse --disable-netrc --disable-progress-meter --disable-dnsshuffle --disable-hsts --disable-alt-svc --disable-ares
make -j$(nproc)
- name: Build libuv
@ -307,7 +307,7 @@ jobs:
run: |
cd external/src/curl
autoreconf -fi
./configure --without-ssl --without-hyper --without-zlib --without-brotli --without-zstd --without-default-ssl-backend --without-ca-bundle --without-ca-path --without-ca-fallback --without-libpsl --without-libgsasl --without-librtmp --without-winidn --without-libidn2 --without-nghttp2 --without-ngtcp2 --without-nghttp3 --without-quiche --without-msh3 --without-zsh-functions-dir --without-fish-functions-dir --disable-ftp --disable-file --disable-ldap --disable-ldaps --disable-rtsp --disable-proxy --disable-dict --disable-telnet --disable-tftp --disable-pop3 --disable-imap --disable-smb --disable-smtp --disable-gopher --disable-mqtt --disable-manual --disable-ntlm --disable-ntlm-wb --disable-tls-srp --disable-unix-sockets --disable-cookies --disable-socketpair --disable-doh --disable-dateparse --disable-netrc --disable-progress-meter --disable-dnsshuffle --disable-hsts --disable-alt-svc --disable-ares
./configure --without-ssl --without-hyper --without-zlib --without-brotli --without-zstd --without-default-ssl-backend --without-ca-bundle --without-ca-path --without-ca-fallback --without-libpsl --without-libgsasl --without-librtmp --without-winidn --without-libidn2 --without-nghttp2 --without-ngtcp2 --without-nghttp3 --without-quiche --without-msh3 --without-zsh-functions-dir --without-fish-functions-dir --disable-ftp --disable-file --disable-ldap --disable-ldaps --disable-rtsp --disable-dict --disable-telnet --disable-tftp --disable-pop3 --disable-imap --disable-smb --disable-smtp --disable-gopher --disable-mqtt --disable-manual --disable-ntlm --disable-ntlm-wb --disable-tls-srp --disable-unix-sockets --disable-cookies --disable-socketpair --disable-doh --disable-dateparse --disable-netrc --disable-progress-meter --disable-dnsshuffle --disable-hsts --disable-alt-svc --disable-ares
make -j3
- name: Build libuv

@ -30,7 +30,7 @@ jobs:
timeout-minutes: 15
run: |
cd build
./p2pool --host p2pmd.xmrvsbeast.com --zmq-port 18084 --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --no-cache --loglevel 4
./p2pool --host p2pmd.xmrvsbeast.com --zmq-port 18084 --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --no-cache --loglevel 6
- name: Archive p2pool.log
uses: actions/upload-artifact@v2
@ -62,7 +62,7 @@ jobs:
timeout-minutes: 15
run: |
cd build
./p2pool --host p2pmd.xmrvsbeast.com --zmq-port 18084 --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --no-cache --loglevel 4
./p2pool --host p2pmd.xmrvsbeast.com --zmq-port 18084 --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --no-cache --loglevel 6
- name: Archive p2pool.log
uses: actions/upload-artifact@v2
@ -94,7 +94,7 @@ jobs:
timeout-minutes: 15
run: |
cd build/Release
./p2pool.exe --host p2pmd.xmrvsbeast.com --zmq-port 18084 --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --no-cache --loglevel 4
./p2pool.exe --host p2pmd.xmrvsbeast.com --zmq-port 18084 --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --no-cache --loglevel 6
- name: Archive p2pool.log
uses: actions/upload-artifact@v2

@ -23,6 +23,7 @@
--mini Connect to p2pool-mini sidechain. Note that it will also change default p2p port from 37889 to 37888
--no-autodiff Disable automatic difficulty adjustment for miners connected to stratum
--rpc-login Specify username[:password] required for Monero RPC server
--socks5 Specify IP:port of a SOCKS5 proxy to use for outgoing connections
```
### Example command line

@ -910,11 +910,12 @@ void BlockTemplate::calc_merkle_tree_main_branch()
}
}
bool BlockTemplate::get_difficulties(const uint32_t template_id, difficulty_type& mainchain_difficulty, difficulty_type& sidechain_difficulty) const
bool BlockTemplate::get_difficulties(const uint32_t template_id, uint64_t& height, difficulty_type& mainchain_difficulty, difficulty_type& sidechain_difficulty) const
{
ReadLock lock(m_lock);
if (template_id == m_templateId) {
height = m_height;
mainchain_difficulty = m_difficulty;
sidechain_difficulty = m_poolBlockTemplate->m_difficulty;
return true;
@ -923,7 +924,7 @@ bool BlockTemplate::get_difficulties(const uint32_t template_id, difficulty_type
const BlockTemplate* old = m_oldTemplates[template_id % array_size(&BlockTemplate::m_oldTemplates)];
if (old && (template_id == old->m_templateId)) {
return old->get_difficulties(template_id, mainchain_difficulty, sidechain_difficulty);
return old->get_difficulties(template_id, height, mainchain_difficulty, sidechain_difficulty);
}
return false;

@ -40,7 +40,7 @@ public:
void update(const MinerData& data, const Mempool& mempool, Wallet* miner_wallet);
bool get_difficulties(const uint32_t template_id, difficulty_type& mainchain_difficulty, difficulty_type& sidechain_difficulty) const;
bool get_difficulties(const uint32_t template_id, uint64_t& height, difficulty_type& mainchain_difficulty, difficulty_type& sidechain_difficulty) const;
uint32_t get_hashing_blob(const uint32_t template_id, uint32_t extra_nonce, uint8_t (&blob)[128], uint64_t& height, difficulty_type& difficulty, difficulty_type& sidechain_difficulty, hash& seed_hash, size_t& nonce_offset) const;
uint32_t get_hashing_blob(uint32_t extra_nonce, uint8_t (&blob)[128], uint64_t& height, difficulty_type& difficulty, difficulty_type& sidechain_difficulty, hash& seed_hash, size_t& nonce_offset, uint32_t& template_id) const;

@ -161,21 +161,21 @@ static void do_loglevel(p2pool * /* m_pool */, const char *args)
static void do_addpeers(p2pool *m_pool, const char *args)
{
if (m_pool->p2p_server()) {
m_pool->p2p_server()->connect_to_peers(args);
m_pool->p2p_server()->connect_to_peers_async(args);
}
}
static void do_droppeers(p2pool *m_pool, const char * /* args */)
{
if (m_pool->p2p_server()) {
m_pool->p2p_server()->drop_connections();
m_pool->p2p_server()->drop_connections_async();
}
}
static void do_showpeers(p2pool* m_pool, const char* /* args */)
{
if (m_pool->p2p_server()) {
m_pool->p2p_server()->show_peers();
m_pool->p2p_server()->show_peers_async();
}
}

@ -27,7 +27,7 @@ namespace JSONRPCRequest {
struct CurlContext
{
CurlContext(const std::string& address, int port, const std::string& req, const std::string& auth, CallbackBase* cb, CallbackBase* close_cb, uv_loop_t* loop);
CurlContext(const std::string& address, int port, const std::string& req, const std::string& auth, const std::string& proxy, CallbackBase* cb, CallbackBase* close_cb, uv_loop_t* loop);
~CurlContext();
static int socket_func(CURL* easy, curl_socket_t s, int action, void* userp, void* socketp)
@ -73,7 +73,6 @@ struct CurlContext
std::string m_url;
std::string m_req;
std::string m_auth;
std::vector<char> m_response;
std::string m_error;
@ -81,7 +80,7 @@ struct CurlContext
curl_slist* m_headers;
};
CurlContext::CurlContext(const std::string& address, int port, const std::string& req, const std::string& auth, CallbackBase* cb, CallbackBase* close_cb, uv_loop_t* loop)
CurlContext::CurlContext(const std::string& address, int port, const std::string& req, const std::string& auth, const std::string& proxy, CallbackBase* cb, CallbackBase* close_cb, uv_loop_t* loop)
: m_callback(cb)
, m_closeCallback(close_cb)
, m_loop(loop)
@ -90,7 +89,6 @@ CurlContext::CurlContext(const std::string& address, int port, const std::string
, m_multiHandle(nullptr)
, m_handle(nullptr)
, m_req(req)
, m_auth(auth)
, m_headers(nullptr)
{
m_pollHandles.reserve(2);
@ -176,19 +174,31 @@ CurlContext::CurlContext(const std::string& address, int port, const std::string
curl_easy_setopt_checked(m_handle, CURLOPT_WRITEFUNCTION, write_func);
curl_easy_setopt_checked(m_handle, CURLOPT_WRITEDATA, this);
const int timeout = proxy.empty() ? 1 : 5;
curl_easy_setopt_checked(m_handle, CURLOPT_URL, m_url.c_str());
curl_easy_setopt_checked(m_handle, CURLOPT_POSTFIELDS, m_req.c_str());
curl_easy_setopt_checked(m_handle, CURLOPT_CONNECTTIMEOUT, 1);
curl_easy_setopt_checked(m_handle, CURLOPT_TIMEOUT, 10);
curl_easy_setopt_checked(m_handle, CURLOPT_CONNECTTIMEOUT, timeout);
curl_easy_setopt_checked(m_handle, CURLOPT_TIMEOUT, timeout * 10);
m_headers = curl_slist_append(m_headers, "Content-Type: application/json");
if (m_headers) {
curl_easy_setopt_checked(m_handle, CURLOPT_HTTPHEADER, m_headers);
}
if (!m_auth.empty()) {
if (!auth.empty()) {
curl_easy_setopt_checked(m_handle, CURLOPT_HTTPAUTH, CURLAUTH_DIGEST | CURLAUTH_ONLY);
curl_easy_setopt_checked(m_handle, CURLOPT_USERPWD, m_auth.c_str());
curl_easy_setopt_checked(m_handle, CURLOPT_USERPWD, auth.c_str());
}
if (!proxy.empty()) {
if (is_localhost(address)) {
LOGINFO(5, "not using proxy to connect to localhost address " << log::Gray() << address);
}
else {
curl_easy_setopt_checked(m_handle, CURLOPT_PROXYTYPE, CURLPROXY_SOCKS5_HOSTNAME);
curl_easy_setopt_checked(m_handle, CURLOPT_PROXY, proxy.c_str());
}
}
CURLMcode curl_err = curl_multi_add_handle(m_multiHandle, m_handle);
@ -443,7 +453,7 @@ void CurlContext::shutdown()
}
}
void Call(const std::string& address, int port, const std::string& req, const std::string& auth, CallbackBase* cb, CallbackBase* close_cb, uv_loop_t* loop)
void Call(const std::string& address, int port, const std::string& req, const std::string& auth, const std::string& proxy, CallbackBase* cb, CallbackBase* close_cb, uv_loop_t* loop)
{
if (!loop) {
loop = uv_default_loop();
@ -453,7 +463,7 @@ void Call(const std::string& address, int port, const std::string& req, const st
[=]()
{
try {
new CurlContext(address, port, req, auth, cb, close_cb, loop);
new CurlContext(address, port, req, auth, proxy, cb, close_cb, loop);
}
catch (const std::exception& e) {
const char* msg = e.what();

@ -37,12 +37,12 @@ private:
T m_cb;
};
void Call(const std::string& address, int port, const std::string& req, const std::string& auth, CallbackBase* cb, CallbackBase* close_cb, uv_loop_t* loop);
void Call(const std::string& address, int port, const std::string& req, const std::string& auth, const std::string& proxy, CallbackBase* cb, CallbackBase* close_cb, uv_loop_t* loop);
template<typename T, typename U>
FORCEINLINE void call(const std::string& address, int port, const std::string& req, const std::string& auth, T&& cb, U&& close_cb, uv_loop_t* loop = nullptr)
FORCEINLINE void call(const std::string& address, int port, const std::string& req, const std::string& auth, const std::string& proxy, T&& cb, U&& close_cb, uv_loop_t* loop = nullptr)
{
Call(address, port, req, auth, new Callback<T>(std::move(cb)), new Callback<U>(std::move(close_cb)), loop);
Call(address, port, req, auth, proxy, new Callback<T>(std::move(cb)), new Callback<U>(std::move(close_cb)), loop);
}
} // namespace JSONRPCRequest

@ -48,6 +48,7 @@ void p2pool_usage()
"--mini Connect to p2pool-mini sidechain. Note that it will also change default p2p port from %d to %d\n"
"--no-autodiff Disable automatic difficulty adjustment for miners connected to stratum\n"
"--rpc-login Specify username[:password] required for Monero RPC server\n"
"--socks5 Specify IP:port of a SOCKS5 proxy to use for outgoing connections\n"
"--help Show this help message\n\n"
"Example command line:\n\n"
"%s --host 127.0.0.1 --rpc-port 18081 --zmq-port 18083 --wallet YOUR_WALLET_ADDRESS --stratum 0.0.0.0:%d --p2p 0.0.0.0:%d\n\n",

@ -68,6 +68,19 @@ P2PServer::P2PServer(p2pool* pool)
const Params& params = pool->params();
if (!params.m_socks5Proxy.empty()) {
parse_address_list(params.m_socks5Proxy,
[this](bool is_v6, const std::string& /*address*/, const std::string& ip, int port)
{
if (!str_to_ip(is_v6, ip.c_str(), m_socks5ProxyIP)) {
panic();
}
m_socks5ProxyV6 = is_v6;
m_socks5ProxyPort = port;
});
m_socks5Proxy = params.m_socks5Proxy;
}
set_max_outgoing_peers(params.m_maxOutgoingPeers);
set_max_incoming_peers(params.m_maxIncomingPeers);
@ -77,6 +90,7 @@ P2PServer::P2PServer(p2pool* pool)
uv_mutex_init_checked(&m_broadcastLock);
uv_mutex_init_checked(&m_missingBlockRequestsLock);
uv_rwlock_init_checked(&m_cachedBlocksLock);
uv_mutex_init_checked(&m_connectToPeersLock);
int err = uv_async_init(&m_loop, &m_broadcastAsync, on_broadcast);
if (err) {
@ -86,6 +100,20 @@ P2PServer::P2PServer(p2pool* pool)
m_broadcastAsync.data = this;
m_broadcastQueue.reserve(2);
err = uv_async_init(&m_loop, &m_connectToPeersAsync, on_connect_to_peers);
if (err) {
LOGERR(1, "uv_async_init failed, error " << uv_err_name(err));
panic();
}
m_connectToPeersAsync.data = this;
err = uv_async_init(&m_loop, &m_showPeersAsync, on_show_peers);
if (err) {
LOGERR(1, "uv_async_init failed, error " << uv_err_name(err));
panic();
}
m_showPeersAsync.data = this;
err = uv_timer_init(&m_loop, &m_timer);
if (err) {
LOGERR(1, "failed to create timer, error " << uv_err_name(err));
@ -114,6 +142,8 @@ P2PServer::~P2PServer()
uv_timer_stop(&m_timer);
uv_close(reinterpret_cast<uv_handle_t*>(&m_timer), nullptr);
uv_close(reinterpret_cast<uv_handle_t*>(&m_broadcastAsync), nullptr);
uv_close(reinterpret_cast<uv_handle_t*>(&m_connectToPeersAsync), nullptr);
uv_close(reinterpret_cast<uv_handle_t*>(&m_showPeersAsync), nullptr);
shutdown_tcp();
@ -126,6 +156,8 @@ P2PServer::~P2PServer()
clear_cached_blocks();
uv_rwlock_destroy(&m_cachedBlocksLock);
uv_mutex_destroy(&m_connectToPeersLock);
delete m_block;
delete m_cache;
@ -179,12 +211,42 @@ void P2PServer::store_in_cache(const PoolBlock& block)
}
}
void P2PServer::connect_to_peers_async(const char* peer_list)
{
{
MutexLock lock(m_connectToPeersLock);
if (!m_connectToPeersData.empty()) {
m_connectToPeersData.append(1, ',');
}
m_connectToPeersData.append(peer_list);
}
if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(&m_connectToPeersAsync))) {
uv_async_send(&m_connectToPeersAsync);
}
}
void P2PServer::on_connect_to_peers(uv_async_t* handle)
{
P2PServer* server = reinterpret_cast<P2PServer*>(handle->data);
std::string peer_list;
{
MutexLock lock(server->m_connectToPeersLock);
peer_list = std::move(server->m_connectToPeersData);
}
if (!peer_list.empty()) {
server->connect_to_peers(peer_list);
}
}
void P2PServer::connect_to_peers(const std::string& peer_list)
{
parse_address_list(peer_list,
[this](bool is_v6, const std::string& /*address*/, std::string ip, int port)
{
if (resolve_host(ip, is_v6)) {
if (!m_socks5Proxy.empty() || resolve_host(ip, is_v6)) {
connect_to_peer(is_v6, ip.c_str(), port);
}
});
@ -219,7 +281,7 @@ void P2PServer::update_peer_connections()
connected_clients.reserve(m_numConnections);
for (P2PClient* client = static_cast<P2PClient*>(m_connectedClientsList->m_next); client != m_connectedClientsList; client = static_cast<P2PClient*>(client->m_next)) {
const int timeout = client->m_handshakeComplete ? 300 : 10;
if (cur_time >= client->m_lastAlive + timeout) {
if ((cur_time >= client->m_lastAlive + timeout) && (client->m_socks5ProxyState == Client::Socks5ProxyState::Default)) {
const uint64_t idle_time = static_cast<uint64_t>(cur_time - client->m_lastAlive);
LOGWARN(5, "peer " << static_cast<char*>(client->m_addrString) << " has been idle for " << idle_time << " seconds, disconnecting");
client->close();
@ -507,29 +569,10 @@ void P2PServer::load_peer_list()
[this](bool is_v6, const std::string& /*address*/, const std::string& ip, int port)
{
Peer p;
if (is_v6) {
sockaddr_in6 addr6;
const int err = uv_ip6_addr(ip.c_str(), port, &addr6);
if (err) {
LOGERR(1, "failed to parse IPv6 address " << ip << ", error " << uv_err_name(err));
return;
}
p.m_isV6 = true;
memcpy(p.m_addr.data, &addr6.sin6_addr, sizeof(in6_addr));
}
else {
sockaddr_in addr4;
const int err = uv_ip4_addr(ip.c_str(), port, &addr4);
if (err) {
LOGERR(1, "failed to parse IPv4 address " << ip << ", error " << uv_err_name(err));
return;
}
p.m_isV6 = false;
p.m_addr = {};
p.m_addr.data[10] = 0xFF;
p.m_addr.data[11] = 0xFF;
memcpy(p.m_addr.data + 12, &addr4.sin_addr, sizeof(in_addr));
if (!str_to_ip(is_v6, ip.c_str(), p.m_addr)) {
return;
}
p.m_isV6 = is_v6;
bool already_added = false;
for (const Peer& peer : m_peerList) {
@ -555,7 +598,7 @@ void P2PServer::load_monerod_peer_list()
{
const Params& params = m_pool->params();
JSONRPCRequest::call(params.m_host, params.m_rpcPort, "/get_peer_list", params.m_rpcLogin,
JSONRPCRequest::call(params.m_host, params.m_rpcPort, "/get_peer_list", params.m_rpcLogin, m_socks5Proxy,
[this](const char* data, size_t size)
{
#define ERR_STR "/get_peer_list RPC request returned invalid JSON "
@ -604,27 +647,10 @@ void P2PServer::load_monerod_peer_list()
Peer p;
p.m_lastSeen = last_seen;
p.m_isV6 = (strchr(ip, ':') != 0);
if (strchr(ip, ':')) {
sockaddr_in6 addr6;
const int err = uv_ip6_addr(ip, port, &addr6);
if (err) {
continue;
}
p.m_isV6 = true;
memcpy(p.m_addr.data, &addr6.sin6_addr, sizeof(in6_addr));
}
else {
sockaddr_in addr4;
const int err = uv_ip4_addr(ip, port, &addr4);
if (err) {
continue;
}
p.m_isV6 = false;
p.m_addr = {};
p.m_addr.data[10] = 0xFF;
p.m_addr.data[11] = 0xFF;
memcpy(p.m_addr.data + 12, &addr4.sin_addr, sizeof(in_addr));
if (!str_to_ip(p.m_isV6, ip, p.m_addr)) {
continue;
}
p.m_port = port;
@ -635,8 +661,8 @@ void P2PServer::load_monerod_peer_list()
}
}
// Put recently active peers first in the list
std::sort(m_peerListMonero.begin(), m_peerListMonero.end(), [](const Peer& a, const Peer& b) { return a.m_lastSeen > b.m_lastSeen; });
// Put recently active peers last in the list (it will be scanned backwards)
std::sort(m_peerListMonero.begin(), m_peerListMonero.end(), [](const Peer& a, const Peer& b) { return a.m_lastSeen < b.m_lastSeen; });
LOGINFO(4, "monerod peer list loaded (" << m_peerListMonero.size() << " peers)");
},
@ -793,7 +819,7 @@ void P2PServer::on_broadcast()
MutexLock lock(m_clientsListLock);
for (P2PClient* client = static_cast<P2PClient*>(m_connectedClientsList->m_next); client != m_connectedClientsList; client = static_cast<P2PClient*>(client->m_next)) {
if (!client->m_handshakeComplete || !client->m_handshakeSolutionSent) {
if (!client->is_good()) {
continue;
}
@ -875,6 +901,13 @@ void P2PServer::print_status()
);
}
void P2PServer::show_peers_async()
{
if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(&m_showPeersAsync))) {
uv_async_send(&m_showPeersAsync);
}
}
void P2PServer::show_peers()
{
MutexLock lock(m_clientsListLock);
@ -1133,12 +1166,14 @@ bool P2PServer::P2PClient::on_connect()
return false;
}
// Don't allow multiple connections to/from the same IP
// Don't allow multiple connections to/from the same IP (except localhost)
// server->m_clientsListLock is already locked here
for (P2PClient* client = static_cast<P2PClient*>(server->m_connectedClientsList->m_next); client != server->m_connectedClientsList; client = static_cast<P2PClient*>(client->m_next)) {
if ((client != this) && (client->m_addr == m_addr)) {
LOGINFO(5, "peer " << static_cast<char*>(m_addrString) << " is already connected as " << static_cast<char*>(client->m_addrString));
return false;
if (!m_addr.is_localhost()) {
for (P2PClient* client = static_cast<P2PClient*>(server->m_connectedClientsList->m_next); client != server->m_connectedClientsList; client = static_cast<P2PClient*>(client->m_next)) {
if ((client != this) && (client->m_addr == m_addr)) {
LOGINFO(5, "peer " << static_cast<char*>(m_addrString) << " is already connected as " << static_cast<char*>(client->m_addrString));
return false;
}
}
}
@ -1891,7 +1926,7 @@ bool P2PServer::P2PClient::on_peer_list_request(const uint8_t*)
uint32_t n = 0;
for (P2PClient* client = static_cast<P2PClient*>(server->m_connectedClientsList->m_next); client != server->m_connectedClientsList; client = static_cast<P2PClient*>(client->m_next)) {
if ((client->m_listenPort < 0) || (client->m_addr == m_addr)) {
if (!client->is_good() || (client->m_addr == m_addr)) {
continue;
}

@ -51,6 +51,7 @@ public:
void clear_cached_blocks();
void store_in_cache(const PoolBlock& block);
void connect_to_peers_async(const char* peer_list);
void connect_to_peers(const std::string& peer_list);
void on_connect_failed(bool is_v6, const raw_ip& ip, int port) override;
@ -134,7 +135,7 @@ public:
uint64_t get_peerId() const { return m_peerId; }
void print_status() override;
void show_peers();
void show_peers_async();
size_t peer_list_size() const { MutexLock lock(m_peerListLock); return m_peerList.size(); }
uint32_t max_outgoing_peers() const { return m_maxOutgoingPeers; }
@ -221,6 +222,17 @@ private:
static void on_broadcast(uv_async_t* handle) { reinterpret_cast<P2PServer*>(handle->data)->on_broadcast(); }
void on_broadcast();
uv_mutex_t m_connectToPeersLock;
uv_async_t m_connectToPeersAsync;
std::string m_connectToPeersData;
static void on_connect_to_peers(uv_async_t* handle);
uv_async_t m_showPeersAsync;
static void on_show_peers(uv_async_t* handle) { reinterpret_cast<P2PServer*>(handle->data)->show_peers(); }
void show_peers();
};
} // namespace p2pool

@ -65,7 +65,7 @@ p2pool::p2pool(int argc, char* argv[])
}
bool is_v6;
if (!resolve_host(m_params->m_host, is_v6)) {
if (m_params->m_socks5Proxy.empty() && !resolve_host(m_params->m_host, is_v6)) {
LOGERR(1, "resolve_host failed for " << m_params->m_host);
throw std::exception();
}
@ -322,7 +322,7 @@ void p2pool::handle_miner_data(MinerData& data)
log::Stream s(buf);
s << "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_block_header_by_height\",\"params\":{\"height\":" << h << "}}\0";
JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, buf, m_params->m_rpcLogin,
JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, buf, m_params->m_rpcLogin, m_params->m_socks5Proxy,
[this, h](const char* data, size_t size)
{
ChainMain block;
@ -457,6 +457,8 @@ void p2pool::submit_block_async(const std::vector<uint8_t>& blob)
}
}
bool init_signals(p2pool* pool, bool init);
void p2pool::on_stop(uv_async_t* async)
{
p2pool* pool = reinterpret_cast<p2pool*>(async->data);
@ -470,9 +472,11 @@ void p2pool::on_stop(uv_async_t* async)
uv_close(reinterpret_cast<uv_handle_t*>(&pool->m_stopAsync), nullptr);
uv_close(reinterpret_cast<uv_handle_t*>(&pool->m_restartZMQAsync), nullptr);
init_signals(pool, false);
uv_loop_t* loop = uv_default_loop_checked();
delete GetLoopUserData(loop, false);
uv_stop(loop);
loop->data = nullptr;
}
void p2pool::submit_block() const
@ -532,7 +536,7 @@ void p2pool::submit_block() const
}
request.append("\"]}");
JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, request, m_params->m_rpcLogin,
JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, request, m_params->m_rpcLogin, m_params->m_socks5Proxy,
[height, diff, template_id, nonce, extra_nonce, is_external](const char* data, size_t size)
{
rapidjson::Document doc;
@ -648,7 +652,7 @@ void p2pool::download_block_headers(uint64_t current_height)
s.m_pos = 0;
s << "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_block_header_by_height\",\"params\":{\"height\":" << height << "}}\0";
JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, buf, m_params->m_rpcLogin,
JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, buf, m_params->m_rpcLogin, m_params->m_socks5Proxy,
[this, prev_seed_height, height](const char* data, size_t size)
{
ChainMain block;
@ -677,14 +681,14 @@ void p2pool::download_block_headers(uint64_t current_height)
s.m_pos = 0;
s << "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_block_headers_range\",\"params\":{\"start_height\":" << start_height << ",\"end_height\":" << current_height - 1 << "}}\0";
JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, buf, m_params->m_rpcLogin,
JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, buf, m_params->m_rpcLogin, m_params->m_socks5Proxy,
[this, start_height, current_height](const char* data, size_t size)
{
if (parse_block_headers_range(data, size) == current_height - start_height) {
update_median_timestamp();
if (m_serversStarted.exchange(1) == 0) {
try {
m_ZMQReader = new ZMQReader(m_params->m_host.c_str(), m_params->m_zmqPort, this);
m_ZMQReader = new ZMQReader(m_params->m_host, m_params->m_zmqPort, m_params->m_socks5Proxy, this);
}
catch (const std::exception& e) {
LOGERR(1, "Couldn't start ZMQ reader: exception " << e.what());
@ -785,7 +789,7 @@ void p2pool::stratum_on_block()
void p2pool::get_info()
{
JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_info\"}", m_params->m_rpcLogin,
JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_info\"}", m_params->m_rpcLogin, m_params->m_socks5Proxy,
[this](const char* data, size_t size)
{
parse_get_info_rpc(data, size);
@ -897,7 +901,7 @@ void p2pool::parse_get_info_rpc(const char* data, size_t size)
void p2pool::get_version()
{
JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_version\"}", m_params->m_rpcLogin,
JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_version\"}", m_params->m_rpcLogin, m_params->m_socks5Proxy,
[this](const char* data, size_t size)
{
parse_get_version_rpc(data, size);
@ -967,7 +971,7 @@ void p2pool::get_miner_data()
{
m_getMinerDataPending = true;
JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_miner_data\"}", m_params->m_rpcLogin,
JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_miner_data\"}", m_params->m_rpcLogin, m_params->m_socks5Proxy,
[this](const char* data, size_t size)
{
parse_get_miner_data_rpc(data, size);
@ -1427,7 +1431,7 @@ static bool init_uv_threadpool()
return (uv_queue_work(uv_default_loop_checked(), &dummy, [](uv_work_t*) {}, nullptr) == 0);
}
static bool init_signals(p2pool* pool)
bool init_signals(p2pool* pool, bool init)
{
#ifdef SIGPIPE
signal(SIGPIPE, SIG_IGN);
@ -1447,6 +1451,14 @@ static bool init_signals(p2pool* pool)
static uv_signal_t signals[array_size(signal_names)];
if (!init) {
for (size_t i = 0; i < array_size(signals); ++i) {
uv_signal_stop(&signals[i]);
uv_close(reinterpret_cast<uv_handle_t*>(&signals[i]), nullptr);
}
return true;
}
for (size_t i = 0; i < array_size(signal_names); ++i) {
uv_signal_init(uv_default_loop_checked(), &signals[i]);
signals[i].data = pool;
@ -1488,7 +1500,7 @@ void p2pool::restart_zmq()
m_ZMQReader = nullptr;
try {
m_ZMQReader = new ZMQReader(m_params->m_host.c_str(), m_params->m_zmqPort, this);
m_ZMQReader = new ZMQReader(m_params->m_host, m_params->m_zmqPort, m_params->m_socks5Proxy, this);
m_zmqLastActive = seconds_since_epoch();
}
catch (const std::exception& e) {
@ -1498,7 +1510,7 @@ void p2pool::restart_zmq()
int p2pool::run()
{
if (!m_params->ok()) {
if (!m_params->valid()) {
LOGERR(1, "Invalid or missing command line. Try \"p2pool --help\".");
return 1;
}
@ -1508,7 +1520,7 @@ int p2pool::run()
return 1;
}
if (!init_signals(this)) {
if (!init_signals(this, true)) {
LOGERR(1, "failed to initialize signal handlers");
return 1;
}

@ -142,7 +142,7 @@ private:
void get_miner_data();
void parse_get_miner_data_rpc(const char* data, size_t size);
bool parse_block_header(const char* data, size_t size, ChainMain& result);
bool parse_block_header(const char* data, size_t size, ChainMain& c);
uint32_t parse_block_headers_range(const char* data, size_t size);
void api_update_network_stats();

@ -135,6 +135,11 @@ Params::Params(int argc, char* argv[])
ok = true;
}
if ((strcmp(argv[i], "--socks5") == 0) && (i + 1 < argc)) {
m_socks5Proxy = argv[++i];
ok = true;
}
if (!ok) {
fprintf(stderr, "Unknown command line parameter %s\n\n", argv[i]);
p2pool_usage();
@ -153,7 +158,7 @@ Params::Params(int argc, char* argv[])
}
}
bool Params::ok() const
bool Params::valid() const
{
return !m_host.empty() && m_rpcPort && m_zmqPort && m_wallet.valid();
}

@ -25,7 +25,7 @@ struct Params
{
Params(int argc, char* argv[]);
bool ok() const;
bool valid() const;
std::string m_host = "127.0.0.1";
uint32_t m_rpcPort = 18081;
@ -50,6 +50,7 @@ struct Params
bool m_mini = false;
bool m_autoDiff = true;
std::string m_rpcLogin;
std::string m_socks5Proxy;
};
} // namespace p2pool

@ -438,7 +438,7 @@ bool RandomX_Hasher_RPC::calculate(const void* data_ptr, size_t size, uint64_t h
const Params& params = m_pool->params();
JSONRPCRequest::call(params.m_host, params.m_rpcPort, buf, params.m_rpcLogin,
JSONRPCRequest::call(params.m_host, params.m_rpcPort, buf, params.m_rpcLogin, params.m_socks5Proxy,
[&result, &h](const char* data, size_t size)
{
rapidjson::Document doc;

@ -100,7 +100,7 @@ public:
explicit RandomX_Hasher_RPC(p2pool* pool);
~RandomX_Hasher_RPC();
bool calculate(const void* data, size_t size, uint64_t height, const hash& seed, hash& result) override;
bool calculate(const void* data_ptr, size_t size, uint64_t height, const hash& seed, hash& h) override;
private:
static void loop(void* data);

@ -361,9 +361,10 @@ bool StratumServer::on_submit(StratumClient* client, uint32_t id, const char* jo
if (found) {
BlockTemplate& block = m_pool->block_template();
uint64_t height;
difficulty_type mainchain_diff, sidechain_diff;
if (!block.get_difficulties(template_id, mainchain_diff, sidechain_diff)) {
if (!block.get_difficulties(template_id, height, mainchain_diff, sidechain_diff)) {
LOGWARN(4, "client " << static_cast<char*>(client->m_addrString) << " got a stale share");
return send(client,
[id](void* buf, size_t buf_size)
@ -408,6 +409,8 @@ bool StratumServer::on_submit(StratumClient* client, uint32_t id, const char* jo
share->m_target = target;
share->m_resultHash = resultHash;
share->m_sidechainDifficulty = sidechain_diff;
share->m_mainchainHeight = height;
share->m_effort = -1.0;
share->m_timestamp = seconds_since_epoch();
uint64_t rem;
@ -852,14 +855,12 @@ void StratumServer::on_share_found(uv_work_t* req)
const uint64_t n = server->m_cumulativeHashes + hashes;
const double diff = sidechain_difficulty.to_double();
const double effort = static_cast<double>(n - server->m_cumulativeHashesAtLastShare) * 100.0 / diff;
share->m_effort = static_cast<double>(n - server->m_cumulativeHashesAtLastShare) * 100.0 / diff;
server->m_cumulativeHashesAtLastShare = n;
server->m_cumulativeFoundSharesDiff += diff;
++server->m_totalFoundShares;
const char* s = client->m_customUser;
LOGINFO(0, log::Green() << "SHARE FOUND: mainchain height " << height << ", diff " << sidechain_difficulty << ", client " << static_cast<char*>(client->m_addrString) << (*s ? " user " : "") << s << ", effort " << effort << '%');
pool->submit_sidechain_block(share->m_templateId, share->m_nonce, share->m_extraNonce);
}
@ -882,13 +883,16 @@ void StratumServer::on_share_found(uv_work_t* req)
void StratumServer::on_after_share_found(uv_work_t* req, int /*status*/)
{
SubmittedShare* share = reinterpret_cast<SubmittedShare*>(req->data);
StratumClient* client = share->m_client;
if (share->m_highEnoughDifficulty) {
const char* s = client->m_customUser;
LOGINFO(0, log::Green() << "SHARE FOUND: mainchain height " << share->m_mainchainHeight << ", diff " << share->m_sidechainDifficulty << ", client " << static_cast<char*>(client->m_addrString) << (*s ? " user " : "") << s << ", effort " << share->m_effort << '%');
bkg_jobs_tracker.stop("StratumServer::on_share_found");
}
ON_SCOPE_LEAVE([share]() { share->m_server->m_submittedSharesPool.push_back(share); });
StratumClient* client = share->m_client;
StratumServer* server = share->m_server;
const bool bad_share = (share->m_result == SubmittedShare::Result::LOW_DIFF) || (share->m_result == SubmittedShare::Result::INVALID_POW);

@ -139,6 +139,8 @@ private:
uint64_t m_target;
hash m_resultHash;
difficulty_type m_sidechainDifficulty;
uint64_t m_mainchainHeight;
double m_effort;
uint64_t m_timestamp;
uint64_t m_hashes;
bool m_highEnoughDifficulty;

@ -36,7 +36,7 @@ public:
bool connect_to_peer(bool is_v6, const char* ip, int port);
void drop_connections() { uv_async_send(&m_dropConnectionsAsync); }
void drop_connections_async() { uv_async_send(&m_dropConnectionsAsync); }
void shutdown_tcp();
virtual void print_status();
@ -45,7 +45,7 @@ public:
int listen_port() const { return m_listenPort; }
bool connect_to_peer(bool is_v6, const raw_ip& ip, int port);
virtual void on_connect_failed(bool is_v6, const raw_ip& ip, int port);
virtual void on_connect_failed(bool /*is_v6*/, const raw_ip& /*ip*/, int /*port*/) {}
void ban(const raw_ip& ip, uint64_t seconds);
virtual void print_bans();
@ -58,6 +58,7 @@ public:
virtual void reset();
virtual bool on_connect() = 0;
virtual bool on_read(char* data, uint32_t size) = 0;
bool on_proxy_handshake(char* data, uint32_t size);
virtual void on_read_failed(int /*err*/) {}
virtual void on_disconnected() {}
@ -68,7 +69,7 @@ public:
void close();
void ban(uint64_t seconds);
void init_addr_string(bool is_v6, const sockaddr_storage* peer_addr);
void init_addr_string();
alignas(8) char m_readBuf[READ_BUF_SIZE];
@ -88,7 +89,13 @@ public:
raw_ip m_addr;
int m_port;
char m_addrString[64];
char m_addrString[72];
enum class Socks5ProxyState {
Default,
MethodSelectionSent,
ConnectRequestSent,
} m_socks5ProxyState;
std::atomic<uint32_t> m_resetCounter;
};
@ -100,7 +107,6 @@ public:
std::vector<uint8_t> m_data;
};
uv_mutex_t m_writeBuffersLock;
std::vector<WriteBuf*> m_writeBuffers;
struct SendCallbackBase
@ -131,9 +137,9 @@ private:
static void on_connection_error(uv_handle_t* handle);
static void on_connect(uv_connect_t* req, int status);
void on_new_client(uv_stream_t* server);
void on_new_client_nolock(uv_stream_t* server, Client* client);
void on_new_client(uv_stream_t* server, Client* client);
bool connect_to_peer_nolock(Client* client, bool is_v6, const sockaddr* addr);
bool connect_to_peer(Client* client);
bool send_internal(Client* client, SendCallbackBase&& callback);
@ -148,6 +154,11 @@ private:
protected:
void start_listening(const std::string& listen_addresses);
std::string m_socks5Proxy;
bool m_socks5ProxyV6;
raw_ip m_socks5ProxyIP;
int m_socks5ProxyPort;
std::atomic<int> m_finished;
int m_listenPort;
@ -165,7 +176,6 @@ protected:
bool is_banned(const raw_ip& ip);
uv_mutex_t m_pendingConnectionsLock;
unordered_set<raw_ip> m_pendingConnections;
uv_async_t m_dropConnectionsAsync;

@ -25,6 +25,9 @@ template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::TCPServer(allocate_client_callback allocate_new_client)
: m_allocateNewClient(allocate_new_client)
, m_loopThread{}
, m_socks5ProxyV6(false)
, m_socks5ProxyIP{}
, m_socks5ProxyPort(-1)
, m_finished(0)
, m_listenPort(-1)
, m_loop{}
@ -57,18 +60,6 @@ TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::TCPServer(allocate_client_callback all
uv_mutex_init_checked(&m_clientsListLock);
uv_mutex_init_checked(&m_bansLock);
uv_mutex_init_checked(&m_pendingConnectionsLock);
uv_mutex_init_checked(&m_writeBuffersLock);
m_writeBuffers.resize(DEFAULT_BACKLOG);
for (size_t i = 0; i < m_writeBuffers.size(); ++i) {
m_writeBuffers[i] = new WriteBuf();
}
m_preallocatedClients.reserve(DEFAULT_BACKLOG);
for (int i = 0; i < DEFAULT_BACKLOG; ++i) {
m_preallocatedClients.emplace_back(m_allocateNewClient());
}
m_connectedClientsList = m_allocateNewClient();
m_connectedClientsList->m_next = m_connectedClientsList;
@ -228,8 +219,6 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::connect_to_peer(bool is_v6, const
return false;
}
MutexLock lock(m_clientsListLock);
if (m_finished.load()) {
return false;
}
@ -247,48 +236,27 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::connect_to_peer(bool is_v6, const
client->m_owner = this;
client->m_port = port;
client->m_isV6 = is_v6;
log::Stream s(client->m_addrString);
if (!str_to_ip(is_v6, ip, client->m_addr)) {
m_preallocatedClients.push_back(client);
return false;
}
sockaddr_storage addr;
log::Stream s(client->m_addrString);
if (is_v6) {
sockaddr_in6* addr6 = reinterpret_cast<sockaddr_in6*>(&addr);
const int err = uv_ip6_addr(ip, port, addr6);
if (err) {
LOGERR(1, "failed to parse IPv6 address " << ip << ", error " << uv_err_name(err));
m_preallocatedClients.push_back(client);
return false;
}
memcpy(client->m_addr.data, &addr6->sin6_addr, sizeof(in6_addr));
s << '[' << ip << "]:" << port << '\0';
}
else {
sockaddr_in* addr4 = reinterpret_cast<sockaddr_in*>(&addr);
const int err = uv_ip4_addr(ip, port, addr4);
if (err) {
LOGERR(1, "failed to parse IPv4 address " << ip << ", error " << uv_err_name(err));
m_preallocatedClients.push_back(client);
return false;
}
client->m_addr = {};
client->m_addr.data[10] = 0xFF;
client->m_addr.data[11] = 0xFF;
memcpy(client->m_addr.data + 12, &addr4->sin_addr, sizeof(in_addr));
s << ip << ':' << port << '\0';
}
return connect_to_peer_nolock(client, is_v6, reinterpret_cast<sockaddr*>(&addr));
return connect_to_peer(client);
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::connect_to_peer(bool is_v6, const raw_ip& ip, int port)
{
MutexLock lock(m_clientsListLock);
if (m_finished.load()) {
return false;
}
@ -307,29 +275,10 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::connect_to_peer(bool is_v6, const
client->m_owner = this;
client->m_addr = ip;
client->m_port = port;
client->m_isV6 = is_v6;
client->init_addr_string();
sockaddr_storage addr{};
if (is_v6) {
sockaddr_in6* addr6 = reinterpret_cast<sockaddr_in6*>(&addr);
addr6->sin6_family = AF_INET6;
memcpy(&addr6->sin6_addr, ip.data, sizeof(in6_addr));
addr6->sin6_port = htons(static_cast<uint16_t>(port));
}
else {
sockaddr_in* addr4 = reinterpret_cast<sockaddr_in*>(&addr);
addr4->sin_family = AF_INET;
memcpy(&addr4->sin_addr, ip.data + 12, sizeof(in_addr));
addr4->sin_port = htons(static_cast<uint16_t>(port));
}
client->init_addr_string(is_v6, &addr);
return connect_to_peer_nolock(client, is_v6, reinterpret_cast<sockaddr*>(&addr));
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_connect_failed(bool, const raw_ip&, int)
{
return connect_to_peer(client);
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
@ -356,7 +305,7 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::is_banned(const raw_ip& ip)
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::connect_to_peer_nolock(Client* client, bool is_v6, const sockaddr* addr)
bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::connect_to_peer(Client* client)
{
if (is_banned(client->m_addr)) {
LOGINFO(5, "peer " << log::Gray() << static_cast<char*>(client->m_addrString) << log::NoColor() << " is banned, not connecting to it");
@ -364,8 +313,6 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::connect_to_peer_nolock(Client* cl
return false;
}
client->m_isV6 = is_v6;
int err = uv_tcp_init(&m_loop, &client->m_socket);
if (err) {
LOGERR(1, "failed to create tcp client handle, error " << uv_err_name(err));
@ -381,8 +328,6 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::connect_to_peer_nolock(Client* cl
return false;
}
MutexLock lock(m_pendingConnectionsLock);
if (!m_pendingConnections.insert(client->m_addr).second) {
LOGINFO(6, "there is already a pending connection to this IP, not connecting to " << log::Gray() << static_cast<char*>(client->m_addrString));
uv_close(reinterpret_cast<uv_handle_t*>(&client->m_socket), on_connection_error);
@ -391,9 +336,40 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::connect_to_peer_nolock(Client* cl
uv_connect_t* connect_request = reinterpret_cast<uv_connect_t*>(client->m_readBuf);
memset(connect_request, 0, sizeof(uv_connect_t));
connect_request->data = client;
err = uv_tcp_connect(connect_request, &client->m_socket, addr, on_connect);
sockaddr_storage addr{};
if (m_socks5Proxy.empty()) {
if (client->m_isV6) {
sockaddr_in6* addr6 = reinterpret_cast<sockaddr_in6*>(&addr);
addr6->sin6_family = AF_INET6;
memcpy(&addr6->sin6_addr, client->m_addr.data, sizeof(in6_addr));
addr6->sin6_port = htons(static_cast<uint16_t>(client->m_port));
}
else {
sockaddr_in* addr4 = reinterpret_cast<sockaddr_in*>(&addr);
addr4->sin_family = AF_INET;
memcpy(&addr4->sin_addr, client->m_addr.data + 12, sizeof(in_addr));
addr4->sin_port = htons(static_cast<uint16_t>(client->m_port));
}
}
else {
if (m_socks5ProxyV6) {
sockaddr_in6* addr6 = reinterpret_cast<sockaddr_in6*>(&addr);
addr6->sin6_family = AF_INET6;
memcpy(&addr6->sin6_addr, m_socks5ProxyIP.data, sizeof(in6_addr));
addr6->sin6_port = htons(static_cast<uint16_t>(m_socks5ProxyPort));
}
else {
sockaddr_in* addr4 = reinterpret_cast<sockaddr_in*>(&addr);
addr4->sin_family = AF_INET;
memcpy(&addr4->sin_addr, m_socks5ProxyIP.data + 12, sizeof(in_addr));
addr4->sin_port = htons(static_cast<uint16_t>(m_socks5ProxyPort));
}
}
err = uv_tcp_connect(connect_request, &client->m_socket, reinterpret_cast<sockaddr*>(&addr), on_connect);
if (err) {
LOGERR(1, "failed to initiate tcp connection to " << static_cast<const char*>(client->m_addrString) << ", error " << uv_err_name(err));
m_pendingConnections.erase(client->m_addr);
@ -429,15 +405,16 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::close_sockets(bool listen_sockets
}
}
MutexLock lock(m_clientsListLock);
size_t numClosed = 0;
{
MutexLock lock(m_clientsListLock);
for (Client* c = m_connectedClientsList->m_next; c != m_connectedClientsList; c = c->m_next) {
uv_handle_t* h = reinterpret_cast<uv_handle_t*>(&c->m_socket);
if (!uv_is_closing(h)) {
uv_close(h, on_connection_close);
++numClosed;
for (Client* c = m_connectedClientsList->m_next; c != m_connectedClientsList; c = c->m_next) {
uv_handle_t* h = reinterpret_cast<uv_handle_t*>(&c->m_socket);
if (!uv_is_closing(h)) {
uv_close(h, on_connection_close);
++numClosed;
}
}
}
@ -485,21 +462,8 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::shutdown_tcp()
uv_thread_join(&m_loopThread);
for (Client* c : m_preallocatedClients) {
delete c;
}
uv_mutex_destroy(&m_clientsListLock);
uv_mutex_destroy(&m_bansLock);
uv_mutex_destroy(&m_pendingConnectionsLock);
{
MutexLock lock(m_writeBuffersLock);
for (WriteBuf* buf : m_writeBuffers) {
delete buf;
}
}
uv_mutex_destroy(&m_writeBuffersLock);
LOGINFO(1, "stopped");
}
@ -561,17 +525,13 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::send_internal(Client* client, Sen
return true;
}
WriteBuf* buf = nullptr;
WriteBuf* buf;
{
MutexLock lock(m_writeBuffersLock);
if (!m_writeBuffers.empty()) {
buf = m_writeBuffers.back();
m_writeBuffers.pop_back();
}
if (!m_writeBuffers.empty()) {
buf = m_writeBuffers.back();
m_writeBuffers.pop_back();
}
if (!buf) {
else {
buf = new WriteBuf();
}
@ -586,10 +546,7 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::send_internal(Client* client, Sen
if (bytes_written == 0) {
LOGWARN(1, "send callback wrote 0 bytes, nothing to do");
{
MutexLock lock(m_writeBuffersLock);
m_writeBuffers.push_back(buf);
}
m_writeBuffers.push_back(buf);
return true;
}
@ -604,11 +561,8 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::send_internal(Client* client, Sen
const int err = uv_write(&buf->m_write, reinterpret_cast<uv_stream_t*>(&client->m_socket), bufs, 1, Client::on_write);
if (err) {
{
MutexLock lock(m_writeBuffersLock);
m_writeBuffers.push_back(buf);
}
LOGWARN(1, "failed to start writing data to client connection " << static_cast<const char*>(client->m_addrString) << ", error " << uv_err_name(err));
m_writeBuffers.push_back(buf);
return false;
}
@ -621,8 +575,27 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::loop(void* data)
LOGINFO(1, "event loop started");
server_event_loop_thread = true;
TCPServer* server = static_cast<TCPServer*>(data);
server->m_writeBuffers.resize(DEFAULT_BACKLOG);
server->m_preallocatedClients.reserve(DEFAULT_BACKLOG);
for (size_t i = 0; i < DEFAULT_BACKLOG; ++i) {
server->m_writeBuffers[i] = new WriteBuf();
server->m_preallocatedClients.emplace_back(server->m_allocateNewClient());
}
uv_run(&server->m_loop, UV_RUN_DEFAULT);
uv_loop_close(&server->m_loop);
for (WriteBuf* buf : server->m_writeBuffers) {
delete buf;
}
server->m_writeBuffers.clear();
for (Client* c : server->m_preallocatedClients) {
delete c;
}
server->m_preallocatedClients.clear();
LOGINFO(1, "event loop stopped");
server->m_loopStopped = true;
}
@ -647,10 +620,6 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_new_connection(uv_stream_t* se
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_connection_close(uv_handle_t* handle)
{
if (!server_event_loop_thread) {
LOGERR(1, "on_connection_close called from another thread, this is not thread safe");
}
Client* client = static_cast<Client*>(handle->data);
TCPServer* owner = client->m_owner;
@ -685,10 +654,7 @@ template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_connection_error(uv_handle_t* handle)
{
Client* client = reinterpret_cast<Client*>(handle->data);
TCPServer* server = client->m_owner;
MutexLock lock(server->m_clientsListLock);
server->m_preallocatedClients.push_back(client);
client->m_owner->m_preallocatedClients.push_back(client);
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
@ -701,12 +667,7 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_connect(uv_connect_t* req, int
return;
}
{
MutexLock lock(server->m_pendingConnectionsLock);
server->m_pendingConnections.erase(client->m_addr);
}
MutexLock lock(server->m_clientsListLock);
server->m_pendingConnections.erase(client->m_addr);
if (status) {
if (status == UV_ETIMEDOUT) {
@ -720,14 +681,12 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_connect(uv_connect_t* req, int
return;
}
server->on_new_client_nolock(nullptr, client);
server->on_new_client(nullptr, client);
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_new_client(uv_stream_t* server)
{
MutexLock lock(m_clientsListLock);
if (m_finished.load()) {
return;
}
@ -766,62 +725,53 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_new_client(uv_stream_t* server
return;
}
on_new_client_nolock(server, client);
on_new_client(server, client);
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_new_client_nolock(uv_stream_t* server, Client* client)
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_new_client(uv_stream_t* server, Client* client)
{
MutexLock lock(m_clientsListLock);
client->m_prev = m_connectedClientsList;
client->m_next = m_connectedClientsList->m_next;
m_connectedClientsList->m_next->m_prev = client;
m_connectedClientsList->m_next = client;
++m_numConnections;
client->m_isIncoming = false;
sockaddr_storage peer_addr;
int peer_addr_len = static_cast<int>(sizeof(peer_addr));
int err = uv_tcp_getpeername(&client->m_socket, reinterpret_cast<sockaddr*>(&peer_addr), &peer_addr_len);
if (err) {
LOGERR(1, "failed to get IP address of the client connection, error " << uv_err_name(err));
client->close();
return;
}
client->m_isIncoming = (server != nullptr);
bool is_v6;
if (server) {
is_v6 = (std::find(m_listenSockets6.begin(), m_listenSockets6.end(), reinterpret_cast<uv_tcp_t*>(server)) != m_listenSockets6.end());
client->m_isV6 = is_v6;
}
else {
is_v6 = client->m_isV6;
}
if (client->m_isIncoming) {
client->m_isV6 = (std::find(m_listenSockets6.begin(), m_listenSockets6.end(), reinterpret_cast<uv_tcp_t*>(server)) != m_listenSockets6.end());
if (is_v6) {
memcpy(client->m_addr.data, &reinterpret_cast<sockaddr_in6*>(&peer_addr)->sin6_addr, sizeof(in6_addr));
client->m_port = ntohs(reinterpret_cast<sockaddr_in6*>(&peer_addr)->sin6_port);
}
else {
client->m_addr = {};
client->m_addr.data[10] = 0xFF;
client->m_addr.data[11] = 0xFF;
memcpy(client->m_addr.data + 12, &reinterpret_cast<sockaddr_in*>(&peer_addr)->sin_addr, sizeof(in_addr));
client->m_port = ntohs(reinterpret_cast<sockaddr_in*>(&peer_addr)->sin_port);
}
sockaddr_storage peer_addr;
int peer_addr_len = static_cast<int>(sizeof(peer_addr));
int err = uv_tcp_getpeername(&client->m_socket, reinterpret_cast<sockaddr*>(&peer_addr), &peer_addr_len);
if (err) {
LOGERR(1, "failed to get IP address of the client connection, error " << uv_err_name(err));
client->close();
return;
}
client->init_addr_string(is_v6, &peer_addr);
if (client->m_isV6) {
memcpy(client->m_addr.data, &reinterpret_cast<sockaddr_in6*>(&peer_addr)->sin6_addr, sizeof(in6_addr));
client->m_port = ntohs(reinterpret_cast<sockaddr_in6*>(&peer_addr)->sin6_port);
}
else {
client->m_addr = {};
client->m_addr.data[10] = 0xFF;
client->m_addr.data[11] = 0xFF;
memcpy(client->m_addr.data + 12, &reinterpret_cast<sockaddr_in*>(&peer_addr)->sin_addr, sizeof(in_addr));
client->m_port = ntohs(reinterpret_cast<sockaddr_in*>(&peer_addr)->sin_port);
}
if (server) {
LOGINFO(5, "new connection from " << log::Gray() << static_cast<char*>(client->m_addrString));
client->m_isIncoming = true;
client->init_addr_string();
++m_numIncomingConnections;
}
else {
LOGINFO(5, "new connection to " << log::Gray() << static_cast<char*>(client->m_addrString));
client->m_isIncoming = false;
}
LOGINFO(5, "new connection " << (client->m_isIncoming ? "from " : "to ") << log::Gray() << static_cast<char*>(client->m_addrString));
if (is_banned(client->m_addr)) {
LOGINFO(5, "peer " << log::Gray() << static_cast<char*>(client->m_addrString) << log::NoColor() << " is banned, disconnecting");
@ -829,12 +779,44 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_new_client_nolock(uv_stream_t*
return;
}
if (client->m_owner->m_finished.load() || !client->on_connect()) {
TCPServer* owner = client->m_owner;
if (owner->m_finished.load()) {
client->close();
return;
}
err = uv_read_start(reinterpret_cast<uv_stream_t*>(&client->m_socket), Client::on_alloc, Client::on_read);
if (owner->m_socks5Proxy.empty()) {
if (!client->on_connect()) {
client->close();
return;
}
}
else {
const bool result = owner->send(client,
[](void* buf, size_t buf_size) -> size_t
{
if (buf_size < 3) {
return 0;
}
uint8_t* p = reinterpret_cast<uint8_t*>(buf);
p[0] = 5; // Protocol version (SOCKS5)
p[1] = 1; // NMETHODS
p[2] = 0; // Method 0 (no authentication)
return 3;
});
if (result) {
client->m_socks5ProxyState = Client::Socks5ProxyState::MethodSelectionSent;
}
else {
client->close();
}
}
const int err = uv_read_start(reinterpret_cast<uv_stream_t*>(&client->m_socket), Client::on_alloc, Client::on_read);
if (err) {
LOGERR(1, "failed to start reading from client connection, error " << uv_err_name(err));
client->close();
@ -855,6 +837,7 @@ TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::Client()
, m_addr{}
, m_port(0)
, m_addrString{}
, m_socks5ProxyState(Socks5ProxyState::Default)
, m_resetCounter{ 0 }
{
m_readBuf[0] = '\0';
@ -878,6 +861,7 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::reset()
m_addr = {};
m_port = -1;
m_addrString[0] = '\0';
m_socks5ProxyState = Socks5ProxyState::Default;
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
@ -907,34 +891,158 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::on_alloc(uv_handle_t* han
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf)
{
Client* pThis = static_cast<Client*>(stream->data);
pThis->m_readBufInUse = false;
Client* client = static_cast<Client*>(stream->data);
client->m_readBufInUse = false;
if (pThis->m_isClosing) {
LOGWARN(5, "client " << static_cast<const char*>(pThis->m_addrString) << " is being disconnected but data received from it, nread = " << nread << ". Ignoring it.");
if (client->m_isClosing) {
LOGWARN(5, "client " << static_cast<const char*>(client->m_addrString) << " is being disconnected but data received from it, nread = " << nread << ". Ignoring it.");
return;
}
if (nread > 0) {
if (pThis->m_owner && !pThis->m_owner->m_finished.load()) {
if (!pThis->on_read(buf->base, static_cast<uint32_t>(nread))) {
pThis->close();
if (client->m_owner && !client->m_owner->m_finished.load()) {
if (client->m_socks5ProxyState == Socks5ProxyState::Default) {
if (!client->on_read(buf->base, static_cast<uint32_t>(nread))) {
client->close();
}
}
else if (!client->on_proxy_handshake(buf->base, static_cast<uint32_t>(nread))) {
client->close();
}
}
}
else if (nread < 0) {
if (nread != UV_EOF) {
const int err = static_cast<int>(nread);
LOGWARN(5, "client " << static_cast<const char*>(pThis->m_addrString) << " failed to read response, err = " << uv_err_name(err));
pThis->on_read_failed(err);
LOGWARN(5, "client " << static_cast<const char*>(client->m_addrString) << " failed to read response, err = " << uv_err_name(err));
client->on_read_failed(err);
}
else {
pThis->on_disconnected();
client->on_disconnected();
}
pThis->close();
client->close();
}
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::on_proxy_handshake(char* data, uint32_t size)
{
if ((data != m_readBuf + m_numRead) || (data + size > m_readBuf + sizeof(m_readBuf))) {
LOGERR(1, "peer " << static_cast<char*>(m_addrString) << " invalid data pointer or size in on_read()");
return false;
}
m_numRead += size;
uint32_t n = 0;
switch (m_socks5ProxyState) {
case Socks5ProxyState::MethodSelectionSent:
if (m_numRead >= 2) {
if ((m_readBuf[0] != 5) && (m_readBuf[1] != 0)) {
LOGWARN(5, "SOCKS5 proxy returned an invalid METHOD selection message");
return false;
}
n = 2;
const bool result = m_owner->send(this,
[this](void* buf, size_t buf_size) -> size_t
{
if (buf_size < 20) {
return 0;
}
uint8_t* p = reinterpret_cast<uint8_t*>(buf);
p[0] = 5; // Protocol version (SOCKS5)
p[1] = 1; // CONNECT
p[2] = 0; // RESERVED
if (m_isV6) {
p[3] = 4; // ATYP
memcpy(p + 4, m_addr.data, 16);
p[20] = static_cast<uint8_t>(m_port >> 8);
p[21] = static_cast<uint8_t>(m_port & 0xFF);
}
else {
p[3] = 1; // ATYP
memcpy(p + 4, m_addr.data + 12, 4);
p[8] = static_cast<uint8_t>(m_port >> 8);
p[9] = static_cast<uint8_t>(m_port & 0xFF);
}
return m_isV6 ? 22 : 10;
});
if (result) {
m_socks5ProxyState = Socks5ProxyState::ConnectRequestSent;
}
else {
close();
}
}
break;
case Socks5ProxyState::ConnectRequestSent:
if (m_numRead >= 4) {
uint8_t* p = reinterpret_cast<uint8_t*>(m_readBuf);
if ((p[0] != 5) && (p[1] != 0) && p[2] != 0) {
LOGWARN(5, "SOCKS5 proxy returned an invalid reply to CONNECT");
return false;
}
switch (p[3]) {
case 1:
if (m_numRead >= 10) {
m_socks5ProxyState = Socks5ProxyState::Default;
n = 10;
}
break;
case 3:
if (m_numRead >= 5) {
const uint32_t len = p[4];
if (m_numRead >= 7 + len) {
m_socks5ProxyState = Socks5ProxyState::Default;
n = 7 + len;
}
}
break;
case 4:
if (m_numRead >= 22) {
m_socks5ProxyState = Socks5ProxyState::Default;
n = 22;
}
break;
}
}
break;
default:
return false;
}
// Move the possible unfinished message to the beginning of m_readBuf to free up more space for reading
if (n > 0) {
m_numRead -= n;
if (m_numRead > 0) {
memmove(m_readBuf, m_readBuf + n, m_numRead);
}
}
if (m_socks5ProxyState == Socks5ProxyState::Default) {
if (!on_connect()) {
return false;
}
if (m_numRead > 0) {
const uint32_t nread = m_numRead;
m_numRead = 0;
if (!on_read(m_readBuf, nread)) {
return false;
}
}
}
return true;
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::on_write(uv_write_t* req, int status)
{
@ -943,7 +1051,6 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::on_write(uv_write_t* req,
TCPServer* server = client->m_owner;
if (server) {
MutexLock lock(server->m_writeBuffersLock);
server->m_writeBuffers.push_back(buf);
}
@ -982,16 +1089,16 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::ban(uint64_t seconds)
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::init_addr_string(bool is_v6, const sockaddr_storage* peer_addr)
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::init_addr_string()
{
const char* addr_str;
char addr_str_buf[64];
if (is_v6) {
addr_str = inet_ntop(AF_INET6, &reinterpret_cast<const sockaddr_in6*>(peer_addr)->sin6_addr, addr_str_buf, sizeof(addr_str_buf));
if (m_isV6) {
addr_str = inet_ntop(AF_INET6, m_addr.data, addr_str_buf, sizeof(addr_str_buf));
}
else {
addr_str = inet_ntop(AF_INET, &reinterpret_cast<const sockaddr_in*>(peer_addr)->sin_addr, addr_str_buf, sizeof(addr_str_buf));
addr_str = inet_ntop(AF_INET, m_addr.data + 12, addr_str_buf, sizeof(addr_str_buf));
}
if (addr_str) {
@ -1001,11 +1108,11 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::init_addr_string(bool is_
}
log::Stream s(m_addrString);
if (is_v6) {
s << '[' << log::const_buf(addr_str, n) << "]:" << ntohs(reinterpret_cast<const sockaddr_in6*>(peer_addr)->sin6_port) << '\0';
if (m_isV6) {
s << '[' << log::const_buf(addr_str, n) << "]:" << m_port << '\0';
}
else {
s << log::const_buf(addr_str, n) << ':' << ntohs(reinterpret_cast<const sockaddr_in*>(peer_addr)->sin_port) << '\0';
s << log::const_buf(addr_str, n) << ':' << m_port << '\0';
}
}
}

@ -432,6 +432,57 @@ NOINLINE uint64_t bsr_reference(uint64_t x)
return bsr8_table.data[y >> 24] - n0 - n1 - n2;
}
bool str_to_ip(bool is_v6, const char* ip, raw_ip& result)
{
sockaddr_storage addr;
if (is_v6) {
sockaddr_in6* addr6 = reinterpret_cast<sockaddr_in6*>(&addr);
const int err = uv_ip6_addr(ip, 0, addr6);
if (err) {
LOGERR(1, "failed to parse IPv6 address " << ip << ", error " << uv_err_name(err));
return false;
}
memcpy(result.data, &addr6->sin6_addr, sizeof(in6_addr));
}
else {
sockaddr_in* addr4 = reinterpret_cast<sockaddr_in*>(&addr);
const int err = uv_ip4_addr(ip, 0, addr4);
if (err) {
LOGERR(1, "failed to parse IPv4 address " << ip << ", error " << uv_err_name(err));
return false;
}
result = {};
result.data[10] = 0xFF;
result.data[11] = 0xFF;
memcpy(result.data + 12, &addr4->sin_addr, sizeof(in_addr));
}
return true;
}
bool is_localhost(const std::string& host)
{
if (host.empty()) {
return false;
}
if (host.compare("localhost") == 0) {
return true;
}
if (host.find_first_not_of("0123456789.:") != std::string::npos) {
return false;
}
raw_ip addr;
if (!str_to_ip(host.find(':') != std::string::npos, host.c_str(), addr)) {
return false;
}
return addr.is_localhost();
}
UV_LoopUserData* GetLoopUserData(uv_loop_t* loop, bool create)
{
UV_LoopUserData* data = reinterpret_cast<UV_LoopUserData*>(loop->data);

@ -219,6 +219,9 @@ FORCEINLINE uint64_t bsr(uint64_t x)
#define bsr bsr_reference
#endif
bool str_to_ip(bool is_v6, const char* ip, raw_ip& result);
bool is_localhost(const std::string& host);
} // namespace p2pool
void memory_tracking_start();

@ -24,14 +24,20 @@ static constexpr char log_category_prefix[] = "ZMQReader ";
namespace p2pool {
ZMQReader::ZMQReader(const char* address, uint32_t zmq_port, MinerCallbackHandler* handler)
ZMQReader::ZMQReader(const std::string& address, uint32_t zmq_port, const std::string& proxy, MinerCallbackHandler* handler)
: m_address(address)
, m_zmqPort(zmq_port)
, m_proxy(proxy)
, m_handler(handler)
, m_tx()
, m_minerData()
, m_chainmainData()
{
if (!m_proxy.empty() && is_localhost(address)) {
LOGINFO(5, "not using proxy to connect to localhost address " << log::Gray() << address);
m_proxy.clear();
}
for (uint32_t i = m_publisherPort; i < std::numeric_limits<uint16_t>::max(); ++i) {
try {
m_publisherPort = 0;
@ -84,14 +90,18 @@ void ZMQReader::run_wrapper(void* arg)
void ZMQReader::run()
{
try {
char addr[32];
if (!m_proxy.empty()) {
m_subscriber.set(zmq::sockopt::socks_proxy, zmq::const_buffer(m_proxy.c_str(), m_proxy.length()));
}
snprintf(addr, sizeof(addr), "tcp://%s:%u", m_address, m_zmqPort);
std::string addr = "tcp://" + m_address + ':' + std::to_string(m_zmqPort);
if (!connect(addr)) {
return;
}
snprintf(addr, sizeof(addr), "tcp://127.0.0.1:%u", m_publisherPort);
m_subscriber.set(zmq::sockopt::socks_proxy, zmq::const_buffer());
addr = "tcp://127.0.0.1:" + std::to_string(m_publisherPort);
if (!connect(addr)) {
return;
}
@ -128,7 +138,7 @@ void ZMQReader::run()
}
}
bool ZMQReader::connect(const char* address)
bool ZMQReader::connect(const std::string& address)
{
struct ConnectMonitor : public zmq::monitor_t
{

@ -24,18 +24,19 @@ namespace p2pool {
class ZMQReader {
public:
ZMQReader(const char* address, uint32_t zmq_port, MinerCallbackHandler* handler);
ZMQReader(const std::string& address, uint32_t zmq_port, const std::string& proxy, MinerCallbackHandler* handler);
~ZMQReader();
private:
static void run_wrapper(void* arg);
void run();
bool connect(const char* address);
bool connect(const std::string& address);
void parse(char* data, size_t size);
const char* m_address;
std::string m_address;
uint32_t m_zmqPort;
std::string m_proxy;
MinerCallbackHandler* m_handler;
uv_thread_t m_worker{};

Loading…
Cancel
Save