TCPServer: removed unnecessary mutex

pull/238/head
SChernykh 1 year ago
parent 7a1afc7a95
commit 12a011a9ff

@ -261,6 +261,8 @@ void P2PServer::on_connect_failed(bool is_v6, const raw_ip& ip, int port)
void P2PServer::update_peer_connections()
{
check_event_loop_thread(__func__);
const uint64_t cur_time = seconds_since_epoch();
const uint64_t last_updated = m_pool->side_chain().last_updated();
@ -268,40 +270,37 @@ void P2PServer::update_peer_connections()
m_fastestPeer = nullptr;
unordered_set<raw_ip> connected_clients;
{
MutexLock lock(m_clientsListLock);
connected_clients.reserve(m_numConnections);
for (P2PClient* client = static_cast<P2PClient*>(m_connectedClientsList->m_next); client != m_connectedClientsList; client = static_cast<P2PClient*>(client->m_next)) {
const int timeout = client->m_handshakeComplete ? 300 : 10;
if ((cur_time >= client->m_lastAlive + timeout) && (client->m_socks5ProxyState == Client::Socks5ProxyState::Default)) {
const uint64_t idle_time = static_cast<uint64_t>(cur_time - client->m_lastAlive);
LOGWARN(5, "peer " << static_cast<char*>(client->m_addrString) << " has been idle for " << idle_time << " seconds, disconnecting");
connected_clients.reserve(m_numConnections);
for (P2PClient* client = static_cast<P2PClient*>(m_connectedClientsList->m_next); client != m_connectedClientsList; client = static_cast<P2PClient*>(client->m_next)) {
const int timeout = client->m_handshakeComplete ? 300 : 10;
if ((cur_time >= client->m_lastAlive + timeout) && (client->m_socks5ProxyState == Client::Socks5ProxyState::Default)) {
const uint64_t idle_time = static_cast<uint64_t>(cur_time - client->m_lastAlive);
LOGWARN(5, "peer " << static_cast<char*>(client->m_addrString) << " has been idle for " << idle_time << " seconds, disconnecting");
client->close();
continue;
}
if (client->m_handshakeComplete && client->m_lastBroadcastTimestamp) {
// - Side chain is at least 15 minutes newer (last_updated >= client->m_lastBroadcastTimestamp + 900)
// - It's been at least 10 seconds since side chain updated (cur_time >= last_updated + 10)
// - It's been at least 10 seconds since the last block request (peer is not syncing)
// - Peer should have sent a broadcast by now
if (last_updated && (cur_time >= std::max(last_updated, client->m_lastBlockrequestTimestamp) + 10) && (last_updated >= client->m_lastBroadcastTimestamp + 900)) {
const uint64_t dt = last_updated - client->m_lastBroadcastTimestamp;
LOGWARN(5, "peer " << static_cast<char*>(client->m_addrString) << " is not broadcasting blocks (last update " << dt << " seconds ago)");
client->ban(DEFAULT_BAN_TIME);
remove_peer_from_list(client);
client->close();
continue;
}
}
if (client->m_handshakeComplete && client->m_lastBroadcastTimestamp) {
// - Side chain is at least 15 minutes newer (last_updated >= client->m_lastBroadcastTimestamp + 900)
// - It's been at least 10 seconds since side chain updated (cur_time >= last_updated + 10)
// - It's been at least 10 seconds since the last block request (peer is not syncing)
// - Peer should have sent a broadcast by now
if (last_updated && (cur_time >= std::max(last_updated, client->m_lastBlockrequestTimestamp) + 10) && (last_updated >= client->m_lastBroadcastTimestamp + 900)) {
const uint64_t dt = last_updated - client->m_lastBroadcastTimestamp;
LOGWARN(5, "peer " << static_cast<char*>(client->m_addrString) << " is not broadcasting blocks (last update " << dt << " seconds ago)");
client->ban(DEFAULT_BAN_TIME);
remove_peer_from_list(client);
client->close();
continue;
}
}
connected_clients.insert(client->m_addr);
if (client->is_good()) {
has_good_peers = true;
if ((client->m_pingTime >= 0) && (!m_fastestPeer || (m_fastestPeer->m_pingTime > client->m_pingTime))) {
m_fastestPeer = client;
}
connected_clients.insert(client->m_addr);
if (client->is_good()) {
has_good_peers = true;
if ((client->m_pingTime >= 0) && (!m_fastestPeer || (m_fastestPeer->m_pingTime > client->m_pingTime))) {
m_fastestPeer = client;
}
}
}
@ -363,7 +362,7 @@ void P2PServer::update_peer_connections()
void P2PServer::update_peer_list()
{
MutexLock lock(m_clientsListLock);
check_event_loop_thread(__func__);
const uint64_t cur_time = seconds_since_epoch();
for (P2PClient* client = static_cast<P2PClient*>(m_connectedClientsList->m_next); client != m_connectedClientsList; client = static_cast<P2PClient*>(client->m_next)) {
@ -843,6 +842,8 @@ void P2PServer::broadcast(const PoolBlock& block, const PoolBlock* parent)
void P2PServer::on_broadcast()
{
check_event_loop_thread(__func__);
std::vector<Broadcast*> broadcast_queue;
broadcast_queue.reserve(2);
@ -863,8 +864,6 @@ void P2PServer::on_broadcast()
}
});
MutexLock lock(m_clientsListLock);
for (P2PClient* client = static_cast<P2PClient*>(m_connectedClientsList->m_next); client != m_connectedClientsList; client = static_cast<P2PClient*>(client->m_next)) {
if (!client->is_good()) {
continue;
@ -941,9 +940,7 @@ void P2PServer::on_broadcast()
uint64_t P2PServer::get_random64()
{
if (!server_event_loop_thread) {
LOGERR(1, "get_random64() was called from another thread, this is not thread safe");
}
check_event_loop_thread(__func__);
return m_rng();
}
@ -965,9 +962,9 @@ void P2PServer::show_peers_async()
}
}
void P2PServer::show_peers()
void P2PServer::show_peers() const
{
MutexLock lock(m_clientsListLock);
check_event_loop_thread(__func__);
const uint64_t cur_time = seconds_since_epoch();
size_t n = 0;
@ -1070,6 +1067,8 @@ void P2PServer::flush_cache()
void P2PServer::download_missing_blocks()
{
check_event_loop_thread(__func__);
if (!m_lookForMissingBlocks) {
return;
}
@ -1083,8 +1082,6 @@ void P2PServer::download_missing_blocks()
return;
}
MutexLock lock(m_clientsListLock);
if (m_numConnections == 0) {
return;
}
@ -1271,7 +1268,6 @@ bool P2PServer::P2PClient::on_connect()
}
// Don't allow multiple connections to/from the same IP (except localhost)
// server->m_clientsListLock is already locked here
if (!m_addr.is_localhost()) {
for (P2PClient* client = static_cast<P2PClient*>(server->m_connectedClientsList->m_next); client != server->m_connectedClientsList; client = static_cast<P2PClient*>(client->m_next)) {
if ((client != this) && (client->m_addr == m_addr)) {
@ -1757,6 +1753,8 @@ bool P2PServer::P2PClient::check_handshake_solution(const hash& solution, const
bool P2PServer::P2PClient::on_handshake_challenge(const uint8_t* buf)
{
check_event_loop_thread(__func__);
P2PServer* server = static_cast<P2PServer*>(m_owner);
uint8_t challenge[CHALLENGE_SIZE];
@ -1772,23 +1770,14 @@ bool P2PServer::P2PClient::on_handshake_challenge(const uint8_t* buf)
m_peerId = peer_id;
bool same_peer = false;
{
MutexLock lock(server->m_clientsListLock);
for (const P2PClient* client = static_cast<P2PClient*>(server->m_connectedClientsList->m_next); client != server->m_connectedClientsList; client = static_cast<P2PClient*>(client->m_next)) {
if ((client != this) && (client->m_peerId == peer_id)) {
LOGWARN(5, "tried to connect to the same peer twice: current connection " << static_cast<const char*>(client->m_addrString) << ", new connection " << static_cast<const char*>(m_addrString));
same_peer = true;
break;
}
for (const P2PClient* client = static_cast<P2PClient*>(server->m_connectedClientsList->m_next); client != server->m_connectedClientsList; client = static_cast<P2PClient*>(client->m_next)) {
if ((client != this) && (client->m_peerId == peer_id)) {
LOGWARN(5, "tried to connect to the same peer twice: current connection " << static_cast<const char*>(client->m_addrString) << ", new connection " << static_cast<const char*>(m_addrString));
close();
return true;
}
}
if (same_peer) {
close();
return true;
}
send_handshake_solution(challenge);
return true;
}
@ -2033,6 +2022,8 @@ bool P2PServer::P2PClient::on_block_broadcast(const uint8_t* buf, uint32_t size,
bool P2PServer::P2PClient::on_peer_list_request(const uint8_t*)
{
check_event_loop_thread(__func__);
P2PServer* server = static_cast<P2PServer*>(m_owner);
const uint64_t cur_time = seconds_since_epoch();
const bool first = (m_prevIncomingPeerListRequest == 0);
@ -2050,33 +2041,30 @@ bool P2PServer::P2PClient::on_peer_list_request(const uint8_t*)
Peer peers[PEER_LIST_RESPONSE_MAX_PEERS];
uint32_t num_selected_peers = 0;
{
MutexLock lock(server->m_clientsListLock);
// Send every 4th peer on average, selected at random
const uint32_t peers_to_send_target = std::min<uint32_t>(PEER_LIST_RESPONSE_MAX_PEERS, std::max<uint32_t>(1, server->m_numConnections / 4));
uint32_t n = 0;
// Send every 4th peer on average, selected at random
const uint32_t peers_to_send_target = std::min<uint32_t>(PEER_LIST_RESPONSE_MAX_PEERS, std::max<uint32_t>(1, server->m_numConnections / 4));
uint32_t n = 0;
for (P2PClient* client = static_cast<P2PClient*>(server->m_connectedClientsList->m_next); client != server->m_connectedClientsList; client = static_cast<P2PClient*>(client->m_next)) {
if (!client->is_good() || (client->m_addr == m_addr)) {
continue;
}
for (P2PClient* client = static_cast<P2PClient*>(server->m_connectedClientsList->m_next); client != server->m_connectedClientsList; client = static_cast<P2PClient*>(client->m_next)) {
if (!client->is_good() || (client->m_addr == m_addr)) {
continue;
}
const Peer p{ client->m_isV6, client->m_addr, client->m_listenPort, 0, 0 };
++n;
const Peer p{ client->m_isV6, client->m_addr, client->m_listenPort, 0, 0 };
++n;
// Use https://en.wikipedia.org/wiki/Reservoir_sampling algorithm
if (num_selected_peers < peers_to_send_target) {
peers[num_selected_peers++] = p;
continue;
}
// Use https://en.wikipedia.org/wiki/Reservoir_sampling algorithm
if (num_selected_peers < peers_to_send_target) {
peers[num_selected_peers++] = p;
continue;
}
uint64_t k;
umul128(server->get_random64(), n, &k);
uint64_t k;
umul128(server->get_random64(), n, &k);
if (k < peers_to_send_target) {
peers[k] = p;
}
if (k < peers_to_send_target) {
peers[k] = p;
}
}

@ -253,7 +253,7 @@ private:
uv_async_t m_showPeersAsync;
static void on_show_peers(uv_async_t* handle) { reinterpret_cast<P2PServer*>(handle->data)->show_peers(); }
void show_peers();
void show_peers() const;
void on_shutdown() override;
};

@ -484,11 +484,11 @@ void StratumServer::show_workers_async()
void StratumServer::show_workers()
{
check_event_loop_thread(__func__);
const uint64_t cur_time = seconds_since_epoch();
const difficulty_type pool_diff = m_pool->side_chain().difficulty();
MutexLock lock(m_clientsListLock);
int addr_len = 0;
for (const StratumClient* c = static_cast<StratumClient*>(m_connectedClientsList->m_next); c != m_connectedClientsList; c = static_cast<StratumClient*>(c->m_next)) {
addr_len = std::max(addr_len, static_cast<int>(strlen(c->m_addrString)));
@ -671,6 +671,8 @@ void StratumServer::update_auto_diff(StratumClient* client, const uint64_t times
void StratumServer::on_blobs_ready()
{
check_event_loop_thread(__func__);
std::vector<BlobsData*> blobs_queue;
blobs_queue.reserve(2);
@ -699,103 +701,100 @@ void StratumServer::on_blobs_ready()
uint32_t num_sent = 0;
const uint64_t cur_time = seconds_since_epoch();
{
MutexLock lock2(m_clientsListLock);
for (StratumClient* client = static_cast<StratumClient*>(m_connectedClientsList->m_prev); client != m_connectedClientsList; client = static_cast<StratumClient*>(client->m_prev)) {
++numClientsProcessed;
for (StratumClient* client = static_cast<StratumClient*>(m_connectedClientsList->m_prev); client != m_connectedClientsList; client = static_cast<StratumClient*>(client->m_prev)) {
++numClientsProcessed;
if (!client->m_rpcId) {
// Not logged in yet, on_login() will send the job to this client. Also close inactive connections.
if (cur_time >= client->m_connectedTime + 10) {
LOGWARN(4, "client " << static_cast<char*>(client->m_addrString) << " didn't send login data");
client->ban(DEFAULT_BAN_TIME);
client->close();
}
continue;
if (!client->m_rpcId) {
// Not logged in yet, on_login() will send the job to this client. Also close inactive connections.
if (cur_time >= client->m_connectedTime + 10) {
LOGWARN(4, "client " << static_cast<char*>(client->m_addrString) << " didn't send login data");
client->ban(DEFAULT_BAN_TIME);
client->close();
}
continue;
}
if (num_sent >= data->m_numClientsExpected) {
// We don't have any more extra_nonce values available
continue;
}
if (num_sent >= data->m_numClientsExpected) {
// We don't have any more extra_nonce values available
continue;
}
uint8_t* hashing_blob = data->m_blobs.data() + num_sent * data->m_blobSize;
uint8_t* hashing_blob = data->m_blobs.data() + num_sent * data->m_blobSize;
uint64_t target = data->m_target;
if (client->m_customDiff.lo) {
target = std::max(target, client->m_customDiff.target());
}
else if (m_autoDiff) {
// Limit autodiff to 4000000 for maximum compatibility
target = std::max(target, TARGET_4_BYTES_LIMIT);
if (client->m_autoDiff.lo) {
const uint32_t k = client->m_autoDiffIndex;
const uint16_t elapsed_time = static_cast<uint16_t>(cur_time) - client->m_autoDiffData[(k - 1) % StratumClient::AUTO_DIFF_SIZE].m_timestamp;
if (elapsed_time > AUTO_DIFF_TARGET_TIME * 5) {
// More than 500% effort, reduce the auto diff by 1/8 every time until the share is found
client->m_autoDiff.lo = std::max<uint64_t>(client->m_autoDiff.lo - client->m_autoDiff.lo / 8, MIN_DIFF);
}
target = std::max(target, client->m_autoDiff.target());
uint64_t target = data->m_target;
if (client->m_customDiff.lo) {
target = std::max(target, client->m_customDiff.target());
}
else if (m_autoDiff) {
// Limit autodiff to 4000000 for maximum compatibility
target = std::max(target, TARGET_4_BYTES_LIMIT);
if (client->m_autoDiff.lo) {
const uint32_t k = client->m_autoDiffIndex;
const uint16_t elapsed_time = static_cast<uint16_t>(cur_time) - client->m_autoDiffData[(k - 1) % StratumClient::AUTO_DIFF_SIZE].m_timestamp;
if (elapsed_time > AUTO_DIFF_TARGET_TIME * 5) {
// More than 500% effort, reduce the auto diff by 1/8 every time until the share is found
client->m_autoDiff.lo = std::max<uint64_t>(client->m_autoDiff.lo - client->m_autoDiff.lo / 8, MIN_DIFF);
}
else {
// Not enough shares from the client yet, cut diff in half every 16 seconds
const uint64_t num_halvings = (cur_time - client->m_connectedTime) / 16;
constexpr uint64_t max_target = (std::numeric_limits<uint64_t>::max() / MIN_DIFF) + 1;
for (uint64_t i = 0; (i < num_halvings) && (target < max_target); ++i) {
target *= 2;
}
target = std::min<uint64_t>(target, max_target);
target = std::max(target, client->m_autoDiff.target());
}
else {
// Not enough shares from the client yet, cut diff in half every 16 seconds
const uint64_t num_halvings = (cur_time - client->m_connectedTime) / 16;
constexpr uint64_t max_target = (std::numeric_limits<uint64_t>::max() / MIN_DIFF) + 1;
for (uint64_t i = 0; (i < num_halvings) && (target < max_target); ++i) {
target *= 2;
}
target = std::min<uint64_t>(target, max_target);
}
}
uint32_t job_id;
{
job_id = ++client->m_perConnectionJobId;
uint32_t job_id;
{
job_id = ++client->m_perConnectionJobId;
StratumClient::SavedJob& saved_job = client->m_jobs[job_id % StratumClient::JOBS_SIZE];
saved_job.job_id = job_id;
saved_job.extra_nonce = extra_nonce_start + num_sent;
saved_job.template_id = data->m_templateId;
saved_job.target = target;
}
client->m_lastJobTarget = target;
StratumClient::SavedJob& saved_job = client->m_jobs[job_id % StratumClient::JOBS_SIZE];
saved_job.job_id = job_id;
saved_job.extra_nonce = extra_nonce_start + num_sent;
saved_job.template_id = data->m_templateId;
saved_job.target = target;
}
client->m_lastJobTarget = target;
const bool result = send(client,
[data, target, hashing_blob, job_id](void* buf, size_t buf_size)
{
log::hex_buf target_hex(reinterpret_cast<const uint8_t*>(&target), sizeof(uint64_t));
const bool result = send(client,
[data, target, hashing_blob, job_id](void* buf, size_t buf_size)
{
log::hex_buf target_hex(reinterpret_cast<const uint8_t*>(&target), sizeof(uint64_t));
if (target >= TARGET_4_BYTES_LIMIT) {
target_hex.m_data += sizeof(uint32_t);
target_hex.m_size -= sizeof(uint32_t);
}
if (target >= TARGET_4_BYTES_LIMIT) {
target_hex.m_data += sizeof(uint32_t);
target_hex.m_size -= sizeof(uint32_t);
}
log::Stream s(buf, buf_size);
s << "{\"jsonrpc\":\"2.0\",\"method\":\"job\",\"params\":{\"blob\":\"";
s << log::hex_buf(hashing_blob, data->m_blobSize) << "\",\"job_id\":\"";
s << log::Hex(job_id) << "\",\"target\":\"";
s << target_hex << "\",\"algo\":\"rx/0\",\"height\":";
s << data->m_height << ",\"seed_hash\":\"";
s << data->m_seedHash << "\"}}\n";
return s.m_pos;
});
log::Stream s(buf, buf_size);
s << "{\"jsonrpc\":\"2.0\",\"method\":\"job\",\"params\":{\"blob\":\"";
s << log::hex_buf(hashing_blob, data->m_blobSize) << "\",\"job_id\":\"";
s << log::Hex(job_id) << "\",\"target\":\"";
s << target_hex << "\",\"algo\":\"rx/0\",\"height\":";
s << data->m_height << ",\"seed_hash\":\"";
s << data->m_seedHash << "\"}}\n";
return s.m_pos;
});
if (result) {
++num_sent;
}
else {
client->close();
}
if (result) {
++num_sent;
}
const uint32_t num_connections = m_numConnections;
if (numClientsProcessed != num_connections) {
LOGWARN(1, "client list is broken, expected " << num_connections << ", got " << numClientsProcessed << " clients");
else {
client->close();
}
}
const uint32_t num_connections = m_numConnections;
if (numClientsProcessed != num_connections) {
LOGWARN(1, "client list is broken, expected " << num_connections << ", got " << numClientsProcessed << " clients");
}
LOGINFO(3, "sent new job to " << num_sent << '/' << numClientsProcessed << " clients");
}

@ -170,7 +170,8 @@ protected:
uv_loop_t m_loop;
uv_mutex_t m_clientsListLock;
static void check_event_loop_thread(const char *func);
std::vector<Client*> m_preallocatedClients;
Client* get_client();

@ -61,7 +61,6 @@ TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::TCPServer(allocate_client_callback all
}
m_shutdownAsync.data = this;
uv_mutex_init_checked(&m_clientsListLock);
uv_mutex_init_checked(&m_bansLock);
m_connectedClientsList = m_allocateNewClient();
@ -369,11 +368,17 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::connect_to_peer(Client* client)
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::close_sockets(bool listen_sockets)
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::check_event_loop_thread(const char* func)
{
if (!server_event_loop_thread) {
LOGERR(1, "closing sockets from another thread, this is not thread safe");
LOGERR(1, func << " called from another thread, this is not thread safe");
}
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::close_sockets(bool listen_sockets)
{
check_event_loop_thread(__func__);
if (listen_sockets) {
for (uv_tcp_t* s : m_listenSockets6) {
@ -391,15 +396,12 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::close_sockets(bool listen_sockets
}
size_t numClosed = 0;
{
MutexLock lock(m_clientsListLock);
for (Client* c = m_connectedClientsList->m_next; c != m_connectedClientsList; c = c->m_next) {
uv_handle_t* h = reinterpret_cast<uv_handle_t*>(&c->m_socket);
if (!uv_is_closing(h)) {
uv_close(h, on_connection_close);
++numClosed;
}
for (Client* c = m_connectedClientsList->m_next; c != m_connectedClientsList; c = c->m_next) {
uv_handle_t* h = reinterpret_cast<uv_handle_t*>(&c->m_socket);
if (!uv_is_closing(h)) {
uv_close(h, on_connection_close);
++numClosed;
}
}
@ -418,7 +420,6 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::shutdown_tcp()
uv_async_send(&m_shutdownAsync);
uv_thread_join(&m_loopThread);
uv_mutex_destroy(&m_clientsListLock);
uv_mutex_destroy(&m_bansLock);
LOGINFO(1, "stopped");
@ -464,9 +465,7 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::print_bans()
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::send_internal(Client* client, SendCallbackBase&& callback)
{
if (!server_event_loop_thread) {
LOGERR(1, "sending data from another thread, this is not thread safe");
}
check_event_loop_thread(__func__);
if (client->m_isClosing) {
LOGWARN(5, "client " << static_cast<const char*>(client->m_addrString) << " is being disconnected, can't send any more data");
@ -586,14 +585,14 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_new_connection(uv_stream_t* se
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_connection_close(uv_handle_t* handle)
{
check_event_loop_thread(__func__);
Client* client = static_cast<Client*>(handle->data);
TCPServer* owner = client->m_owner;
LOGINFO(5, "peer " << log::Gray() << static_cast<char*>(client->m_addrString) << log::NoColor() << " disconnected");
if (owner) {
MutexLock lock(owner->m_clientsListLock);
Client* prev_in_list = client->m_prev;
Client* next_in_list = client->m_next;
@ -688,7 +687,7 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_new_client(uv_stream_t* server
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_new_client(uv_stream_t* server, Client* client)
{
MutexLock lock(m_clientsListLock);
check_event_loop_thread(__func__);
client->m_prev = m_connectedClientsList;
client->m_next = m_connectedClientsList->m_next;

Loading…
Cancel
Save