Refactored TCPServer to reduce code duplication

tcp_refactor
SChernykh 1 year ago
parent 72adfd3126
commit 6e258bb210

@ -56,7 +56,6 @@ set(HEADERS
src/side_chain.h
src/stratum_server.h
src/tcp_server.h
src/tcp_server.inl
src/util.h
src/uv_util.h
src/wallet.h
@ -85,6 +84,7 @@ set(SOURCES
src/pow_hash.cpp
src/side_chain.cpp
src/stratum_server.cpp
src/tcp_server.cpp
src/util.cpp
src/wallet.cpp
src/zmq_reader.cpp

@ -32,12 +32,10 @@ static constexpr char log_category_prefix[] = "ConsoleCommands ";
static constexpr int DEFAULT_BACKLOG = 1;
#include "tcp_server.inl"
namespace p2pool {
ConsoleCommands::ConsoleCommands(p2pool* pool)
: TCPServer(ConsoleClient::allocate)
: TCPServer(DEFAULT_BACKLOG, ConsoleClient::allocate)
, m_pool(pool)
, m_tty{}
, m_stdin_pipe{}

@ -24,7 +24,7 @@ namespace p2pool {
class p2pool;
class ConsoleCommands : public TCPServer<256, 256>
class ConsoleCommands : public TCPServer
{
public:
explicit ConsoleCommands(p2pool* pool);
@ -32,6 +32,7 @@ public:
struct ConsoleClient : public Client
{
ConsoleClient() : Client(m_consoleReadBuf, sizeof(m_consoleReadBuf)) {}
~ConsoleClient() {}
static Client* allocate() { return new ConsoleClient(); }
@ -41,19 +42,23 @@ public:
bool on_connect() override { return true; };
bool on_read(char* data, uint32_t size) override { static_cast<ConsoleCommands*>(m_owner)->process_input(m_command, data, size); return true; };
char m_consoleReadBuf[1024] = {};
std::string m_command;
};
void on_shutdown() override;
private:
const char* get_category() const override { return "ConsoleCommands "; }
p2pool* m_pool;
uv_tty_t m_tty;
uv_pipe_t m_stdin_pipe;
uv_stream_t* m_stdin_handle;
char m_readBuf[64];
char m_readBuf[1024];
bool m_readBufInUse;
std::string m_command;

@ -356,6 +356,7 @@ private:
hash m_derivation;
uint32_t m_viewTags1[2] = { 0xFFFFFFFFUL, 0xFFFFFFFFUL };
std::vector<uint32_t> m_viewTags2;
// cppcheck-suppress unusedStructMember
uint64_t m_timestamp;
FORCEINLINE bool find_view_tag(size_t output_index, uint8_t& view_tag) const

@ -20,29 +20,17 @@
namespace p2pool {
namespace JSONRPCRequest {
struct CallbackBase
{
virtual ~CallbackBase() {}
virtual void operator()(const char* data, size_t size) = 0;
};
template<typename T>
struct Callback : public CallbackBase
{
explicit FORCEINLINE Callback(T&& cb) : m_cb(std::move(cb)) {}
void operator()(const char* data, size_t size) override { m_cb(data, size); }
private:
Callback& operator=(Callback&&) = delete;
T m_cb;
};
typedef Callback<void, const char*, size_t>::Base CallbackBase;
void Call(const std::string& address, int port, const std::string& req, const std::string& auth, const std::string& proxy, CallbackBase* cb, CallbackBase* close_cb, uv_loop_t* loop);
template<typename T, typename U>
FORCEINLINE void call(const std::string& address, int port, const std::string& req, const std::string& auth, const std::string& proxy, T&& cb, U&& close_cb, uv_loop_t* loop = nullptr)
{
Call(address, port, req, auth, proxy, new Callback<T>(std::move(cb)), new Callback<U>(std::move(close_cb)), loop);
typedef Callback<void, const char*, size_t>::Derived<T> CallbackT;
typedef Callback<void, const char*, size_t>::Derived<U> CallbackU;
Call(address, port, req, auth, proxy, new CallbackT(std::move(cb)), new CallbackU(std::move(close_cb)), loop);
}
} // namespace JSONRPCRequest

@ -41,12 +41,10 @@ static constexpr uint64_t DEFAULT_BAN_TIME = 600;
static constexpr size_t SEND_BUF_MIN_SIZE = 256;
#include "tcp_server.inl"
namespace p2pool {
P2PServer::P2PServer(p2pool* pool)
: TCPServer(P2PClient::allocate)
: TCPServer(DEFAULT_BACKLOG, P2PClient::allocate)
, m_pool(pool)
, m_cache(pool->params().m_blockCache ? new BlockCache() : nullptr)
, m_cacheLoaded(false)
@ -62,6 +60,8 @@ P2PServer::P2PServer(p2pool* pool)
, m_lookForMissingBlocks(true)
, m_fastestPeer(nullptr)
{
m_callbackBuf.resize(P2P_BUF_SIZE);
m_blockDeserializeBuf.reserve(131072);
// Diffuse the initial state in case it has low quality
@ -1188,7 +1188,8 @@ void P2PServer::check_block_template()
}
P2PServer::P2PClient::P2PClient()
: m_peerId(0)
: Client(m_p2pReadBuf, sizeof(m_p2pReadBuf))
, m_peerId(0)
, m_connectedTime(0)
, m_broadcastMaxHeight(0)
, m_expectedMessage(MessageId::HANDSHAKE_CHALLENGE)
@ -1211,6 +1212,7 @@ P2PServer::P2PClient::P2PClient()
, m_lastBlockrequestTimestamp(0)
, m_broadcastedHashes{}
{
m_p2pReadBuf[0] = '\0';
}
void P2PServer::on_shutdown()
@ -1352,7 +1354,7 @@ bool P2PServer::P2PClient::on_read(char* data, uint32_t size)
return false;
}
if ((data != m_readBuf + m_numRead) || (data + size > m_readBuf + sizeof(m_readBuf))) {
if ((data != m_readBuf + m_numRead) || (data + size > m_readBuf + m_readBufSize)) {
LOGERR(1, "peer " << static_cast<char*>(m_addrString) << " invalid data pointer or size in on_read()");
ban(DEFAULT_BAN_TIME);
server->remove_peer_from_list(this);
@ -1811,9 +1813,8 @@ bool P2PServer::P2PClient::check_handshake_solution(const hash& solution, const
bool P2PServer::P2PClient::on_handshake_challenge(const uint8_t* buf)
{
check_event_loop_thread(__func__);
P2PServer* server = static_cast<P2PServer*>(m_owner);
server->check_event_loop_thread(__func__);
uint8_t challenge[CHALLENGE_SIZE];
memcpy(challenge, buf, CHALLENGE_SIZE);
@ -2084,9 +2085,9 @@ bool P2PServer::P2PClient::on_block_broadcast(const uint8_t* buf, uint32_t size,
bool P2PServer::P2PClient::on_peer_list_request(const uint8_t*)
{
check_event_loop_thread(__func__);
P2PServer* server = static_cast<P2PServer*>(m_owner);
server->check_event_loop_thread(__func__);
const uint64_t cur_time = seconds_since_epoch();
const bool first = (m_prevIncomingPeerListRequest == 0);

@ -36,7 +36,7 @@ static constexpr uint32_t PROTOCOL_VERSION_1_1 = 0x00010001UL;
static constexpr uint32_t SUPPORTED_PROTOCOL_VERSION = PROTOCOL_VERSION_1_1;
class P2PServer : public TCPServer<P2P_BUF_SIZE, P2P_BUF_SIZE>
class P2PServer : public TCPServer
{
public:
enum class MessageId {
@ -114,6 +114,8 @@ public:
const char* software_name() const;
alignas(8) char m_p2pReadBuf[P2P_BUF_SIZE];
uint64_t m_peerId;
uint64_t m_connectedTime;
uint64_t m_broadcastMaxHeight;
@ -167,6 +169,8 @@ public:
const PoolBlock* get_block() const { return m_block; }
private:
const char* get_category() const override { return "P2PServer "; }
p2pool* m_pool;
BlockCache* m_cache;
bool m_cacheLoaded;

@ -103,7 +103,7 @@ void p2pool_api::on_stop()
uv_close(reinterpret_cast<uv_handle_t*>(&m_dumpToFileAsync), nullptr);
}
void p2pool_api::dump_to_file_async_internal(Category category, const char* filename, DumpFileCallbackBase&& callback)
void p2pool_api::dump_to_file_async_internal(Category category, const char* filename, Callback<void, log::Stream&>::Base&& callback)
{
std::vector<char> buf(16384);
log::Stream s(buf.data(), buf.size());

@ -38,7 +38,7 @@ public:
void on_stop();
template<typename T>
void set(Category category, const char* filename, T&& callback) { dump_to_file_async_internal(category, filename, DumpFileCallback<T>(std::move(callback))); }
void set(Category category, const char* filename, T&& callback) { dump_to_file_async_internal(category, filename, Callback<void, log::Stream&>::Derived<T>(std::move(callback))); }
private:
void create_dir(const std::string& path);
@ -54,25 +54,7 @@ private:
std::vector<char> buf;
};
struct DumpFileCallbackBase
{
virtual ~DumpFileCallbackBase() {}
virtual void operator()(log::Stream&) = 0;
};
template<typename T>
struct DumpFileCallback : public DumpFileCallbackBase
{
explicit FORCEINLINE DumpFileCallback(T&& callback) : m_callback(std::move(callback)) {}
void operator()(log::Stream& s) override { m_callback(s); }
private:
DumpFileCallback& operator=(DumpFileCallback&&) = delete;
T m_callback;
};
void dump_to_file_async_internal(Category category, const char* filename, DumpFileCallbackBase&& callback);
void dump_to_file_async_internal(Category category, const char* filename, Callback<void, log::Stream&>::Base&& callback);
void dump_to_file();
static void on_fs_open(uv_fs_t* req);
static void on_fs_write(uv_fs_t* req);

@ -37,12 +37,10 @@ static constexpr int32_t BAD_SHARE_POINTS = -5;
static constexpr int32_t GOOD_SHARE_POINTS = 1;
static constexpr int32_t BAN_THRESHOLD_POINTS = -15;
#include "tcp_server.inl"
namespace p2pool {
StratumServer::StratumServer(p2pool* pool)
: TCPServer(StratumClient::allocate)
: TCPServer(DEFAULT_BACKLOG, StratumClient::allocate)
, m_pool(pool)
, m_autoDiff(pool->params().m_autoDiff)
, m_rng(RandomDeviceSeed::instance)
@ -57,6 +55,8 @@ StratumServer::StratumServer(p2pool* pool)
, m_totalFailedShares(0)
, m_apiLastUpdateTime(0)
{
m_callbackBuf.resize(STRATUM_BUF_SIZE);
// Diffuse the initial state in case it has low quality
m_rng.discard(10000);
@ -1027,7 +1027,8 @@ void StratumServer::on_shutdown()
}
StratumServer::StratumClient::StratumClient()
: m_rpcId(0)
: Client(m_stratumReadBuf, sizeof(m_stratumReadBuf))
, m_rpcId(0)
, m_perConnectionJobId(0)
, m_connectedTime(0)
, m_jobs{}
@ -1040,6 +1041,7 @@ StratumServer::StratumClient::StratumClient()
, m_lastJobTarget(0)
, m_score(0)
{
m_stratumReadBuf[0] = '\0';
}
void StratumServer::StratumClient::reset()
@ -1072,7 +1074,7 @@ bool StratumServer::StratumClient::on_connect()
bool StratumServer::StratumClient::on_read(char* data, uint32_t size)
{
if ((data != m_readBuf + m_numRead) || (data + size > m_readBuf + sizeof(m_readBuf))) {
if ((data != m_readBuf + m_numRead) || (data + size > m_readBuf + m_readBufSize)) {
LOGERR(1, "client: invalid data pointer or size in on_read()");
ban(DEFAULT_BAN_TIME);
return false;

@ -28,7 +28,7 @@ class BlockTemplate;
static constexpr size_t STRATUM_BUF_SIZE = log::Stream::BUF_SIZE + 1;
static constexpr int DEFAULT_STRATUM_PORT = 3333;
class StratumServer : public TCPServer<STRATUM_BUF_SIZE, STRATUM_BUF_SIZE>
class StratumServer : public TCPServer
{
public:
explicit StratumServer(p2pool *pool);
@ -52,6 +52,8 @@ public:
bool process_login(rapidjson::Document& doc, uint32_t id);
bool process_submit(rapidjson::Document& doc, uint32_t id);
alignas(8) char m_stratumReadBuf[STRATUM_BUF_SIZE];
uint32_t m_rpcId;
uint32_t m_perConnectionJobId;
uint64_t m_connectedTime;
@ -95,6 +97,8 @@ public:
void reset_share_counters();
private:
const char* get_category() const override { return "StratumServer "; }
void print_stratum_status() const;
void update_auto_diff(StratumClient* client, const uint64_t timestamp, const uint64_t hashes);

@ -15,15 +15,17 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <thread>
#include "common.h"
#include "tcp_server.h"
static thread_local bool server_event_loop_thread = false;
static thread_local void* server_event_loop_thread = nullptr;
static thread_local const char* log_category_prefix = "TCPServer ";
namespace p2pool {
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::TCPServer(allocate_client_callback allocate_new_client)
TCPServer::TCPServer(int default_backlog, allocate_client_callback allocate_new_client)
: m_allocateNewClient(allocate_new_client)
, m_defaultBacklog(default_backlog)
, m_loopThread{}
#ifdef WITH_UPNP
, m_portMapping(0)
@ -71,9 +73,7 @@ TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::TCPServer(allocate_client_callback all
m_connectedClientsList->m_prev = m_connectedClientsList;
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
// cppcheck-suppress functionStatic
TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::~TCPServer()
TCPServer::~TCPServer()
{
if (m_finished.load() == 0) {
LOGERR(1, "TCP wasn't shutdown properly");
@ -83,10 +83,7 @@ TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::~TCPServer()
delete m_connectedClientsList;
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
template<typename T>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::parse_address_list(const std::string& address_list, T callback)
void TCPServer::parse_address_list_internal(const std::string& address_list, Callback<void, bool, const std::string&, const std::string&, int>::Base&& callback)
{
if (address_list.empty()) {
return;
@ -120,7 +117,7 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::parse_address_list(const std::str
callback(is_v6, address, ip, port);
}
else {
LOGWARN(1, "invalid IP:port " << address);
error_invalid_ip(address);
}
}
@ -130,8 +127,7 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::parse_address_list(const std::str
}
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::start_listening(bool is_v6, const std::string& ip, int port, std::string address)
bool TCPServer::start_listening(bool is_v6, const std::string& ip, int port, std::string address)
{
if ((m_listenPort >= 0) && (m_listenPort != port)) {
LOGERR(1, "all sockets must be listening on the same port number, fix the command line");
@ -205,7 +201,7 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::start_listening(bool is_v6, const
}
}
err = uv_listen(reinterpret_cast<uv_stream_t*>(socket), DEFAULT_BACKLOG, on_new_connection);
err = uv_listen(reinterpret_cast<uv_stream_t*>(socket), m_defaultBacklog, on_new_connection);
if (err) {
LOGERR(1, "failed to listen on tcp server socket " << address << ", error " << uv_err_name(err));
return false;
@ -226,8 +222,7 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::start_listening(bool is_v6, const
return true;
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::start_listening(const std::string& listen_addresses, bool upnp)
void TCPServer::start_listening(const std::string& listen_addresses, bool upnp)
{
if (listen_addresses.empty()) {
LOGERR(1, "listen address not set");
@ -257,8 +252,7 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::start_listening(const std::string
}
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::connect_to_peer(bool is_v6, const char* ip, int port)
bool TCPServer::connect_to_peer(bool is_v6, const char* ip, int port)
{
if (!ip || (strlen(ip) > sizeof(Client::m_addrString) - 16)) {
LOGERR(1, "failed to parse IP address, too long");
@ -290,8 +284,7 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::connect_to_peer(bool is_v6, const
return connect_to_peer(client);
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::connect_to_peer(bool is_v6, const raw_ip& ip, int port)
bool TCPServer::connect_to_peer(bool is_v6, const raw_ip& ip, int port)
{
if (m_finished.load()) {
return false;
@ -307,8 +300,7 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::connect_to_peer(bool is_v6, const
return connect_to_peer(client);
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::is_banned(const raw_ip& ip)
bool TCPServer::is_banned(const raw_ip& ip)
{
if (ip.is_localhost()) {
return false;
@ -330,8 +322,7 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::is_banned(const raw_ip& ip)
return false;
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::connect_to_peer(Client* client)
bool TCPServer::connect_to_peer(Client* client)
{
if (is_banned(client->m_addr)) {
LOGINFO(5, "peer " << log::Gray() << static_cast<char*>(client->m_addrString) << log::NoColor() << " is banned, not connecting to it");
@ -360,7 +351,11 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::connect_to_peer(Client* client)
return false;
}
static_assert(sizeof(client->m_readBuf) >= sizeof(uv_connect_t), "READ_BUF_SIZE must be large enough");
if (client->m_readBufSize < sizeof(uv_connect_t)) {
LOGERR(1, "client read buf size is too small (" << client->m_readBufSize << " bytes), expected at least " << sizeof(uv_connect_t) << " bytes");
uv_close(reinterpret_cast<uv_handle_t*>(&client->m_socket), on_connection_error);
return false;
}
uv_connect_t* connect_request = reinterpret_cast<uv_connect_t*>(client->m_readBuf);
memset(connect_request, 0, sizeof(uv_connect_t));
@ -412,17 +407,15 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::connect_to_peer(Client* client)
}
#ifdef P2POOL_DEBUGGING
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::check_event_loop_thread(const char* func)
void TCPServer::check_event_loop_thread(const char* func) const
{
if (!server_event_loop_thread) {
if (server_event_loop_thread != this) {
LOGERR(1, func << " called from another thread, this is not thread safe");
}
}
#endif
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::close_sockets(bool listen_sockets)
void TCPServer::close_sockets(bool listen_sockets)
{
check_event_loop_thread(__func__);
@ -456,8 +449,12 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::close_sockets(bool listen_sockets
}
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::shutdown_tcp()
void TCPServer::error_invalid_ip(const std::string& address)
{
LOGERR(1, "invalid IP:port " << address);
}
void TCPServer::shutdown_tcp()
{
if (m_finished.exchange(1)) {
return;
@ -478,16 +475,14 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::shutdown_tcp()
LOGINFO(1, "stopped");
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::print_status()
void TCPServer::print_status()
{
LOGINFO(0, "status" <<
"\nConnections = " << m_numConnections.load() << " (" << m_numIncomingConnections.load() << " incoming)"
);
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::ban(const raw_ip& ip, uint64_t seconds)
void TCPServer::ban(const raw_ip& ip, uint64_t seconds)
{
if (ip.is_localhost()) {
return;
@ -499,8 +494,7 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::ban(const raw_ip& ip, uint64_t se
m_bans[ip] = ban_time;
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::print_bans()
void TCPServer::print_bans()
{
using namespace std::chrono;
const auto cur_time = steady_clock::now();
@ -515,8 +509,7 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::print_bans()
}
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::send_internal(Client* client, SendCallbackBase&& callback)
bool TCPServer::send_internal(Client* client, Callback<size_t, uint8_t*, size_t>::Base&& callback)
{
check_event_loop_thread(__func__);
@ -525,12 +518,10 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::send_internal(Client* client, Sen
return true;
}
// callback_buf is used in only 1 thread, so it's safe
static uint8_t callback_buf[WRITE_BUF_SIZE];
const size_t bytes_written = callback(callback_buf, sizeof(callback_buf));
const size_t bytes_written = callback(m_callbackBuf.data(), m_callbackBuf.size());
if (bytes_written > WRITE_BUF_SIZE) {
LOGERR(0, "send callback wrote " << bytes_written << " bytes, expected no more than " << WRITE_BUF_SIZE << " bytes");
if (bytes_written > m_callbackBuf.size()) {
LOGERR(0, "send callback wrote " << bytes_written << " bytes, expected no more than " << m_callbackBuf.size() << " bytes");
PANIC_STOP();
}
@ -553,7 +544,7 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::send_internal(Client* client, Sen
}
}
memcpy(buf->m_data, callback_buf, bytes_written);
memcpy(buf->m_data, m_callbackBuf.data(), bytes_written);
uv_buf_t bufs[1];
bufs[0].base = reinterpret_cast<char*>(buf->m_data);
@ -569,15 +560,16 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::send_internal(Client* client, Sen
return true;
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::loop(void* data)
void TCPServer::loop(void* data)
{
LOGINFO(1, "event loop started");
server_event_loop_thread = true;
TCPServer* server = static_cast<TCPServer*>(data);
log_category_prefix = server->get_category();
LOGINFO(1, "event loop started");
server_event_loop_thread = data;
server->m_preallocatedClients.reserve(DEFAULT_BACKLOG);
for (size_t i = 0; i < DEFAULT_BACKLOG; ++i) {
server->m_preallocatedClients.reserve(server->m_defaultBacklog);
for (int i = 0; i < server->m_defaultBacklog; ++i) {
WriteBuf* wb = new WriteBuf();
const size_t capacity = wb->m_dataCapacity;
@ -622,8 +614,7 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::loop(void* data)
LOGINFO(1, "event loop stopped");
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_new_connection(uv_stream_t* server, int status)
void TCPServer::on_new_connection(uv_stream_t* server, int status)
{
TCPServer* pThis = static_cast<TCPServer*>(server->data);
@ -639,17 +630,16 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_new_connection(uv_stream_t* se
pThis->on_new_client(server);
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_connection_close(uv_handle_t* handle)
void TCPServer::on_connection_close(uv_handle_t* handle)
{
check_event_loop_thread(__func__);
Client* client = static_cast<Client*>(handle->data);
TCPServer* owner = client->m_owner;
LOGINFO(5, "peer " << log::Gray() << static_cast<char*>(client->m_addrString) << log::NoColor() << " disconnected");
if (owner) {
owner->check_event_loop_thread(__func__);
Client* prev_in_list = client->m_prev;
Client* next_in_list = client->m_next;
@ -672,15 +662,13 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_connection_close(uv_handle_t*
}
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_connection_error(uv_handle_t* handle)
void TCPServer::on_connection_error(uv_handle_t* handle)
{
Client* client = reinterpret_cast<Client*>(handle->data);
client->m_owner->return_client(client);
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_connect(uv_connect_t* req, int status)
void TCPServer::on_connect(uv_connect_t* req, int status)
{
Client* client = reinterpret_cast<Client*>(req->data);
@ -706,8 +694,7 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_connect(uv_connect_t* req, int
server->on_new_client(nullptr, client);
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_new_client(uv_stream_t* server)
void TCPServer::on_new_client(uv_stream_t* server)
{
if (m_finished.load()) {
return;
@ -741,8 +728,7 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_new_client(uv_stream_t* server
on_new_client(server, client);
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_new_client(uv_stream_t* server, Client* client)
void TCPServer::on_new_client(uv_stream_t* server, Client* client)
{
check_event_loop_thread(__func__);
@ -836,8 +822,7 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_new_client(uv_stream_t* server
}
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_shutdown(uv_async_t* async)
void TCPServer::on_shutdown(uv_async_t* async)
{
TCPServer* s = reinterpret_cast<TCPServer*>(async->data);
s->on_shutdown();
@ -897,8 +882,7 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_shutdown(uv_async_t* async)
});
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
typename TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::WriteBuf* TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::get_write_buffer(size_t size_hint)
TCPServer::WriteBuf* TCPServer::get_write_buffer(size_t size_hint)
{
WriteBuf* buf;
@ -925,8 +909,7 @@ typename TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::WriteBuf* TCPServer<READ_BUF_
return buf;
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::return_write_buffer(WriteBuf* buf)
void TCPServer::return_write_buffer(WriteBuf* buf)
{
const size_t capacity = buf->m_dataCapacity;
@ -938,8 +921,7 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::return_write_buffer(WriteBuf* buf
m_writeBuffers.emplace(capacity, buf);
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
typename TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client* TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::get_client()
TCPServer::Client* TCPServer::get_client()
{
Client* c;
@ -957,16 +939,16 @@ typename TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client* TCPServer<READ_BUF_SI
return c;
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::return_client(Client* c)
void TCPServer::return_client(Client* c)
{
ASAN_POISON_MEMORY_REGION(c, c->size());
m_preallocatedClients.push_back(c);
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::Client()
: m_owner(nullptr)
TCPServer::Client::Client(char* read_buf, size_t size)
: m_readBuf(read_buf)
, m_readBufSize(static_cast<uint32_t>(size))
, m_owner(nullptr)
, m_prev(nullptr)
, m_next(nullptr)
, m_socket{}
@ -982,11 +964,10 @@ TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::Client()
, m_resetCounter{ 0 }
{
m_readBuf[0] = '\0';
m_readBuf[READ_BUF_SIZE - 1] = '\0';
m_readBuf[m_readBufSize - 1] = '\0';
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::reset()
void TCPServer::Client::reset()
{
m_resetCounter.fetch_add(1);
@ -1004,11 +985,10 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::reset()
m_addrString[0] = '\0';
m_socks5ProxyState = Socks5ProxyState::Default;
m_readBuf[0] = '\0';
m_readBuf[READ_BUF_SIZE - 1] = '\0';
m_readBuf[m_readBufSize - 1] = '\0';
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::on_alloc(uv_handle_t* handle, size_t /*suggested_size*/, uv_buf_t* buf)
void TCPServer::Client::on_alloc(uv_handle_t* handle, size_t /*suggested_size*/, uv_buf_t* buf)
{
Client* pThis = static_cast<Client*>(handle->data);
@ -1019,20 +999,19 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::on_alloc(uv_handle_t* han
return;
}
if (pThis->m_numRead >= sizeof(pThis->m_readBuf)) {
if (pThis->m_numRead >= pThis->m_readBufSize) {
LOGWARN(4, "client " << static_cast<const char*>(pThis->m_addrString) << " read buffer is full");
buf->len = 0;
buf->base = nullptr;
return;
}
buf->len = sizeof(pThis->m_readBuf) - pThis->m_numRead;
buf->len = pThis->m_readBufSize - pThis->m_numRead;
buf->base = pThis->m_readBuf + pThis->m_numRead;
pThis->m_readBufInUse = true;
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf)
void TCPServer::Client::on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf)
{
Client* client = static_cast<Client*>(stream->data);
client->m_readBufInUse = false;
@ -1067,10 +1046,9 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::on_read(uv_stream_t* stre
}
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::on_proxy_handshake(char* data, uint32_t size)
bool TCPServer::Client::on_proxy_handshake(char* data, uint32_t size)
{
if ((data != m_readBuf + m_numRead) || (data + size > m_readBuf + sizeof(m_readBuf))) {
if ((data != m_readBuf + m_numRead) || (data + size > m_readBuf + m_readBufSize)) {
LOGERR(1, "peer " << static_cast<char*>(m_addrString) << " invalid data pointer or size in on_read()");
return false;
}
@ -1189,8 +1167,7 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::on_proxy_handshake(char*
return true;
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::on_write(uv_write_t* req, int status)
void TCPServer::Client::on_write(uv_write_t* req, int status)
{
WriteBuf* buf = static_cast<WriteBuf*>(req->data);
Client* client = buf->m_client;
@ -1206,8 +1183,7 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::on_write(uv_write_t* req,
}
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::close()
void TCPServer::Client::close()
{
if (m_isClosing || !m_owner) {
// Already closed
@ -1225,8 +1201,7 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::close()
}
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::ban(uint64_t seconds)
void TCPServer::Client::ban(uint64_t seconds)
{
if (m_addr.is_localhost()) {
return;
@ -1238,8 +1213,7 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::ban(uint64_t seconds)
}
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::init_addr_string()
void TCPServer::Client::init_addr_string()
{
const char* addr_str;
char addr_str_buf[64];

@ -22,19 +22,15 @@
namespace p2pool {
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
class TCPServer : public nocopy_nomove
{
public:
struct Client;
typedef Client* (*allocate_client_callback)();
explicit TCPServer(allocate_client_callback allocate_new_client);
TCPServer(int default_backlog, allocate_client_callback allocate_new_client);
virtual ~TCPServer();
template<typename T>
void parse_address_list(const std::string& address_list, T callback);
bool connect_to_peer(bool is_v6, const char* ip, int port);
void drop_connections_async() { if (m_finished.load() == 0) { uv_async_send(&m_dropConnectionsAsync); } }
@ -53,7 +49,7 @@ public:
struct Client
{
Client();
Client(char* read_buf, size_t size);
virtual ~Client() {}
virtual size_t size() const = 0;
@ -74,7 +70,8 @@ public:
void init_addr_string();
alignas(8) char m_readBuf[READ_BUF_SIZE];
char* m_readBuf;
uint32_t m_readBufSize;
TCPServer* m_owner;
@ -116,26 +113,14 @@ public:
WriteBuf* get_write_buffer(size_t size_hint);
void return_write_buffer(WriteBuf* buf);
struct SendCallbackBase
{
virtual ~SendCallbackBase() {}
virtual size_t operator()(void*, size_t) = 0;
};
template<typename T>
struct SendCallback : public SendCallbackBase
FORCEINLINE static void parse_address_list(const std::string& address_list, T&& callback)
{
explicit FORCEINLINE SendCallback(T&& callback) : m_callback(std::move(callback)) {}
size_t operator()(void* buf, size_t buf_size) override { return m_callback(buf, buf_size); }
private:
SendCallback& operator=(SendCallback&&) = delete;
T m_callback;
};
return parse_address_list_internal(address_list, Callback<void, bool, const std::string&, const std::string&, int>::Derived<T>(std::move(callback)));
}
template<typename T>
FORCEINLINE bool send(Client* client, T&& callback) { return send_internal(client, SendCallback<T>(std::move(callback))); }
FORCEINLINE bool send(Client* client, T&& callback) { return send_internal(client, Callback<size_t, uint8_t*, size_t>::Derived<T>(std::move(callback))); }
private:
static void on_new_connection(uv_stream_t* server, int status);
@ -147,20 +132,27 @@ private:
bool connect_to_peer(Client* client);
bool send_internal(Client* client, SendCallbackBase&& callback);
bool send_internal(Client* client, Callback<size_t, uint8_t*, size_t>::Base&& callback);
allocate_client_callback m_allocateNewClient;
void close_sockets(bool listen_sockets);
static void error_invalid_ip(const std::string& address);
std::vector<uv_tcp_t*> m_listenSockets6;
std::vector<uv_tcp_t*> m_listenSockets;
protected:
virtual const char* get_category() const { return "TCPServer "; }
std::vector<uint8_t> m_callbackBuf;
int m_defaultBacklog;
uv_thread_t m_loopThread;
static void loop(void* data);
static void parse_address_list_internal(const std::string& address_list, Callback<void, bool, const std::string&, const std::string&, int>::Base&& callback);
void start_listening(const std::string& listen_addresses, bool upnp);
bool start_listening(bool is_v6, const std::string& ip, int port, std::string address = std::string());
@ -179,7 +171,7 @@ protected:
uv_loop_t m_loop;
#ifdef P2POOL_DEBUGGING
static void check_event_loop_thread(const char *func);
void check_event_loop_thread(const char *func) const;
#else
static FORCEINLINE void check_event_loop_thread(const char*) {}
#endif

@ -275,6 +275,27 @@ struct PerfTimer
#define PERFLOG(level, name) PerfTimer CONCAT(perf_timer_, __LINE__)(level, name)
#endif
template<typename R, typename ...Args>
struct Callback
{
struct Base
{
virtual ~Base() {}
virtual R operator()(Args...) = 0;
};
template<typename T>
struct Derived : public Base
{
explicit FORCEINLINE Derived(T&& cb) : m_cb(std::move(cb)) {}
R operator()(Args... args) override { return m_cb(args...); }
private:
Derived& operator=(Derived&&) = delete;
T m_cb;
};
};
} // namespace p2pool
void memory_tracking_start();

@ -64,22 +64,7 @@ void uv_rwlock_init_checked(uv_rwlock_t* lock);
void uv_async_init_checked(uv_loop_t* loop, uv_async_t* async, uv_async_cb async_cb);
uv_loop_t* uv_default_loop_checked();
struct UV_LoopCallbackBase
{
virtual ~UV_LoopCallbackBase() {}
virtual void operator()() = 0;
};
template<typename T>
struct UV_LoopCallback : public UV_LoopCallbackBase
{
explicit FORCEINLINE UV_LoopCallback(T&& cb) : m_cb(std::move(cb)) {}
void operator()() override { m_cb(); }
private:
UV_LoopCallback& operator=(UV_LoopCallback&&) = delete;
T m_cb;
};
typedef Callback<void>::Base UV_LoopCallbackBase;
struct UV_LoopUserData
{
@ -147,7 +132,7 @@ bool CallOnLoop(uv_loop_t* loop, T&& callback)
return false;
}
UV_LoopCallbackBase* cb = new UV_LoopCallback<T>(std::move(callback));
UV_LoopCallbackBase* cb = new Callback<void>::Derived<T>(std::move(callback));
{
MutexLock lock(data->m_callbacksLock);
data->m_callbacks.push_back(cb);

@ -55,6 +55,7 @@ set(SOURCES
../src/pow_hash.cpp
../src/side_chain.cpp
../src/stratum_server.cpp
../src/tcp_server.cpp
../src/util.cpp
../src/wallet.cpp
../src/zmq_reader.cpp

@ -87,7 +87,7 @@ TEST(pool_block, deserialize)
class RandomX_Hasher_Test : public RandomX_Hasher_Base
{
public:
bool calculate(const void* data, size_t size, uint64_t, const hash&, hash& result, bool force_light_mode) override
bool calculate(const void* data, size_t size, uint64_t, const hash&, hash& result, bool /*force_light_mode*/) override
{
if (size == 76) {
char buf[76 * 2 + 1];

Loading…
Cancel
Save