Fixed data races

pull/144/head
SChernykh 2 years ago
parent 7b5b910f3f
commit 7522111bb7

@ -678,7 +678,7 @@ void P2PServer::remove_peer_from_list(const raw_ip& ip)
void P2PServer::broadcast(const PoolBlock& block)
{
const MinerData& miner_data = m_pool->miner_data();
MinerData miner_data = m_pool->miner_data();
if (block.m_txinGenHeight + 2 < miner_data.height) {
LOGWARN(3, "Trying to broadcast a stale block " << block.m_sidechainId << " (mainchain height " << block.m_txinGenHeight << ", current height is " << miner_data.height << ')');
@ -1736,7 +1736,7 @@ bool P2PServer::P2PClient::on_block_broadcast(const uint8_t* buf, uint32_t size)
m_broadcastedHashes[m_broadcastedHashesIndex.fetch_add(1) % array_size(&P2PClient::m_broadcastedHashes)] = server->m_block->m_sidechainId;
const MinerData& miner_data = server->m_pool->miner_data();
MinerData miner_data = server->m_pool->miner_data();
if (server->m_block->m_prevId != miner_data.prev_id) {
// This peer is mining on top of a different Monero block, investigate it

@ -108,6 +108,7 @@ p2pool::p2pool(int argc, char* argv[])
m_stopAsync.data = this;
uv_rwlock_init_checked(&m_mainchainLock);
uv_rwlock_init_checked(&m_minerDataLock);
uv_mutex_init_checked(&m_foundBlocksLock);
uv_mutex_init_checked(&m_submitBlockDataLock);
@ -149,6 +150,7 @@ p2pool::p2pool(int argc, char* argv[])
p2pool::~p2pool()
{
uv_rwlock_destroy(&m_mainchainLock);
uv_rwlock_destroy(&m_minerDataLock);
uv_mutex_destroy(&m_foundBlocksLock);
uv_mutex_destroy(&m_submitBlockDataLock);
@ -203,7 +205,7 @@ void p2pool::handle_tx(TxMempoolData& tx)
", fee = " << log::Gray() << static_cast<double>(tx.fee) / 1e6 << " um");
#if TEST_MEMPOOL_PICKING_ALGORITHM
m_blockTemplate->update(m_minerData, *m_mempool, &m_params->m_wallet);
m_blockTemplate->update(miner_data(), *m_mempool, &m_params->m_wallet);
#endif
m_zmqLastActive = seconds_since_epoch();
@ -239,7 +241,10 @@ void p2pool::handle_miner_data(MinerData& data)
data.tx_backlog.clear();
data.time_received = std::chrono::high_resolution_clock::now();
m_minerData = data;
{
WriteLock lock(m_minerDataLock);
m_minerData = data;
}
m_updateSeed = true;
update_median_timestamp();
@ -549,11 +554,13 @@ void p2pool::update_block_template_async()
void p2pool::update_block_template()
{
MinerData data = miner_data();
if (m_updateSeed) {
m_hasher->set_seed_async(m_minerData.seed_hash);
m_hasher->set_seed_async(data.seed_hash);
m_updateSeed = false;
}
m_blockTemplate->update(m_minerData, *m_mempool, &m_params->m_wallet);
m_blockTemplate->update(data, *m_mempool, &m_params->m_wallet);
stratum_on_block();
api_update_pool_stats();
}
@ -666,6 +673,7 @@ void p2pool::update_median_timestamp()
uint64_t timestamps[TIMESTAMP_WINDOW];
if (!get_timestamps(timestamps))
{
WriteLock lock(m_minerDataLock);
m_minerData.median_timestamp = 0;
return;
}
@ -673,8 +681,11 @@ void p2pool::update_median_timestamp()
std::sort(timestamps, timestamps + TIMESTAMP_WINDOW);
// Shift it +1 block compared to Monero's code because we don't have the latest block yet when we receive new miner data
m_minerData.median_timestamp = (timestamps[TIMESTAMP_WINDOW / 2] + timestamps[TIMESTAMP_WINDOW / 2 + 1]) / 2;
LOGINFO(4, "median timestamp updated to " << log::Gray() << m_minerData.median_timestamp);
const uint64_t ts = (timestamps[TIMESTAMP_WINDOW / 2] + timestamps[TIMESTAMP_WINDOW / 2 + 1]) / 2;
LOGINFO(4, "median timestamp updated to " << log::Gray() << ts);
WriteLock lock(m_minerDataLock);
m_minerData.median_timestamp = ts;
}
void p2pool::stratum_on_block()
@ -1021,10 +1032,16 @@ void p2pool::api_update_network_stats()
return;
}
hash prev_id;
{
ReadLock lock(m_minerDataLock);
prev_id = m_minerData.prev_id;
}
ChainMain mainnet_tip;
{
ReadLock lock(m_mainchainLock);
mainnet_tip = m_mainchainByHash[m_minerData.prev_id];
mainnet_tip = m_mainchainByHash[prev_id];
}
m_api->set(p2pool_api::Category::NETWORK, "stats",
@ -1086,10 +1103,16 @@ void p2pool::api_update_stats_mod()
return;
}
hash prev_id;
{
ReadLock lock(m_minerDataLock);
prev_id = m_minerData.prev_id;
}
ChainMain mainnet_tip;
{
ReadLock lock(m_mainchainLock);
mainnet_tip = m_mainchainByHash[m_minerData.prev_id];
mainnet_tip = m_mainchainByHash[prev_id];
}
time_t last_block_found_time = 0;

@ -48,7 +48,12 @@ public:
const Params& params() const { return *m_params; }
BlockTemplate& block_template() { return *m_blockTemplate; }
SideChain& side_chain() { return *m_sideChain; }
const MinerData& miner_data() const { return m_minerData; }
FORCEINLINE MinerData miner_data() const
{
ReadLock lock(m_minerDataLock);
return m_minerData;
}
p2pool_api* api() const { return m_api; }
@ -108,7 +113,6 @@ private:
SideChain* m_sideChain;
RandomX_Hasher_Base* m_hasher;
BlockTemplate* m_blockTemplate;
MinerData m_minerData;
bool m_updateSeed;
Mempool* m_mempool;
@ -116,6 +120,9 @@ private:
std::map<uint64_t, ChainMain> m_mainchainByHeight;
unordered_map<hash, ChainMain> m_mainchainByHash;
mutable uv_rwlock_t m_minerDataLock;
MinerData m_minerData;
enum { TIMESTAMP_WINDOW = 60 };
bool get_timestamps(uint64_t (&timestamps)[TIMESTAMP_WINDOW]) const;
void update_median_timestamp();
@ -185,7 +192,7 @@ private:
uv_async_t m_blockTemplateAsync;
uv_async_t m_stopAsync;
uint64_t m_zmqLastActive;
std::atomic<uint64_t> m_zmqLastActive;
uint64_t m_startTime;
ZMQReader* m_ZMQReader = nullptr;

@ -60,7 +60,7 @@ static constexpr uint8_t mini_consensus_id[HASH_SIZE] = { 57,130,201,26,149,174,
SideChain::SideChain(p2pool* pool, NetworkType type, const char* pool_name)
: m_pool(pool)
, m_networkType(type)
, m_chainTip(nullptr)
, m_chainTip{ nullptr }
, m_poolName(pool_name ? pool_name : "default")
, m_targetBlockTime(10)
, m_minDifficulty(MIN_DIFFICULTY, 0)
@ -177,7 +177,9 @@ void SideChain::fill_sidechain_data(PoolBlock& block, Wallet* w, const hash& txk
block.m_txkeySec = txkeySec;
block.m_uncles.clear();
if (!m_chainTip) {
const PoolBlock* tip = m_chainTip;
if (!tip) {
block.m_parent = {};
block.m_sidechainHeight = 0;
block.m_difficulty = m_minDifficulty;
@ -187,8 +189,8 @@ void SideChain::fill_sidechain_data(PoolBlock& block, Wallet* w, const hash& txk
return;
}
block.m_parent = m_chainTip->m_sidechainId;
block.m_sidechainHeight = m_chainTip->m_sidechainHeight + 1;
block.m_parent = tip->m_sidechainId;
block.m_sidechainHeight = tip->m_sidechainHeight + 1;
// Collect uncles from 3 previous block heights
@ -196,15 +198,15 @@ void SideChain::fill_sidechain_data(PoolBlock& block, Wallet* w, const hash& txk
std::vector<hash> mined_blocks;
mined_blocks.reserve(UNCLE_BLOCK_DEPTH * 2 + 1);
PoolBlock* tmp = m_chainTip;
for (uint64_t i = 0, n = std::min<uint64_t>(UNCLE_BLOCK_DEPTH, m_chainTip->m_sidechainHeight + 1); tmp && (i < n); ++i) {
const PoolBlock* tmp = tip;
for (uint64_t i = 0, n = std::min<uint64_t>(UNCLE_BLOCK_DEPTH, tip->m_sidechainHeight + 1); tmp && (i < n); ++i) {
mined_blocks.push_back(tmp->m_sidechainId);
mined_blocks.insert(mined_blocks.end(), tmp->m_uncles.begin(), tmp->m_uncles.end());
tmp = get_parent(tmp);
}
for (uint64_t i = 0, n = std::min<uint64_t>(UNCLE_BLOCK_DEPTH, m_chainTip->m_sidechainHeight + 1); i < n; ++i) {
for (PoolBlock* uncle : m_blocksByHeight[m_chainTip->m_sidechainHeight - i]) {
for (uint64_t i = 0, n = std::min<uint64_t>(UNCLE_BLOCK_DEPTH, tip->m_sidechainHeight + 1); i < n; ++i) {
for (PoolBlock* uncle : m_blocksByHeight[tip->m_sidechainHeight - i]) {
// Only add verified and valid blocks
if (!uncle || !uncle->m_verified || uncle->m_invalid) {
continue;
@ -218,7 +220,7 @@ void SideChain::fill_sidechain_data(PoolBlock& block, Wallet* w, const hash& txk
// Only add it if it's on the same chain
bool same_chain = false;
do {
tmp = m_chainTip;
tmp = tip;
while (tmp->m_sidechainHeight > uncle->m_sidechainHeight) {
tmp = get_parent(tmp);
if (!tmp) {
@ -262,7 +264,7 @@ void SideChain::fill_sidechain_data(PoolBlock& block, Wallet* w, const hash& txk
}
block.m_difficulty = m_curDifficulty;
block.m_cumulativeDifficulty = m_chainTip->m_cumulativeDifficulty + block.m_difficulty;
block.m_cumulativeDifficulty = tip->m_cumulativeDifficulty + block.m_difficulty;
for (const hash& uncle_id : block.m_uncles) {
auto it = m_blocksById.find(uncle_id);
@ -367,6 +369,7 @@ bool SideChain::block_seen(const PoolBlock& block)
{
// Check if it's some old block
const PoolBlock* tip = m_chainTip;
if (tip && tip->m_sidechainHeight > block.m_sidechainHeight + m_chainWindowSize * 2 &&
block.m_cumulativeDifficulty < tip->m_cumulativeDifficulty) {
return true;
@ -406,7 +409,9 @@ bool SideChain::add_external_block(PoolBlock& block, std::vector<hash>& missing_
difficulty_type diff2 = block.m_difficulty;
diff2 += block.m_difficulty;
for (PoolBlock* tmp = m_chainTip; tmp && (tmp->m_sidechainHeight + m_chainWindowSize > m_chainTip->m_sidechainHeight); tmp = get_parent(tmp)) {
const PoolBlock* tip = m_chainTip;
for (const PoolBlock* tmp = tip; tmp && (tmp->m_sidechainHeight + m_chainWindowSize > tip->m_sidechainHeight); tmp = get_parent(tmp)) {
if (diff2 >= tmp->m_difficulty) {
too_low_diff = false;
break;
@ -449,7 +454,7 @@ bool SideChain::add_external_block(PoolBlock& block, std::vector<hash>& missing_
}
// Check if it has the correct parent and difficulty to go right to monerod for checking
const MinerData& miner_data = m_pool->miner_data();
MinerData miner_data = m_pool->miner_data();
if ((block.m_prevId == miner_data.prev_id) && miner_data.difficulty.check_pow(pow_hash)) {
LOGINFO(0, log::LightGreen() << "add_external_block: block " << block.m_sidechainId << " has enough PoW for Monero network, submitting it");
m_pool->submit_block_async(block.m_mainChainData);
@ -573,7 +578,7 @@ bool SideChain::get_block_blob(const hash& id, std::vector<uint8_t>& blob)
{
MutexLock lock(m_sidechainLock);
PoolBlock* block = nullptr;
const PoolBlock* block = nullptr;
// Empty hash means we return current sidechain tip
if (id == hash()) {
@ -661,12 +666,14 @@ void SideChain::print_status()
uint64_t rem;
uint64_t pool_hashrate = udiv128(m_curDifficulty.hi, m_curDifficulty.lo, m_targetBlockTime, &rem);
const difficulty_type& network_diff = m_pool->miner_data().difficulty;
difficulty_type network_diff = m_pool->miner_data().difficulty;
uint64_t network_hashrate = udiv128(network_diff.hi, network_diff.lo, 120, &rem);
const PoolBlock* tip = m_chainTip;
uint64_t block_depth = 0;
PoolBlock* cur = m_chainTip;
const uint64_t tip_height = m_chainTip ? m_chainTip->m_sidechainHeight : 0;
const PoolBlock* cur = tip;
const uint64_t tip_height = tip ? tip->m_sidechainHeight : 0;
uint32_t total_blocks_in_window = 0;
uint32_t total_uncles_in_window = 0;
@ -715,7 +722,7 @@ void SideChain::print_status()
uint64_t your_reward = 0;
uint64_t total_reward = 0;
if (m_chainTip) {
if (tip) {
std::sort(blocks_in_window.begin(), blocks_in_window.end());
for (uint64_t i = 0; (i < m_chainWindowSize) && (i <= tip_height); ++i) {
for (PoolBlock* block : m_blocksByHeight[tip_height - i]) {
@ -730,11 +737,11 @@ void SideChain::print_status()
}
Wallet w = m_pool->params().m_wallet;
const std::vector<PoolBlock::TxOutput>& outs = m_chainTip->m_outputs;
const std::vector<PoolBlock::TxOutput>& outs = tip->m_outputs;
hash eph_public_key;
for (size_t i = 0, n = outs.size(); i < n; ++i) {
if (w.get_eph_public_key(m_chainTip->m_txkeySec, i, eph_public_key) && (outs[i].m_ephPublicKey == eph_public_key)) {
if (w.get_eph_public_key(tip->m_txkeySec, i, eph_public_key) && (outs[i].m_ephPublicKey == eph_public_key)) {
your_reward = outs[i].m_reward;
}
total_reward += outs[i].m_reward;
@ -785,7 +792,8 @@ void SideChain::print_status()
difficulty_type SideChain::total_hashes() const
{
return m_chainTip ? m_chainTip->m_cumulativeDifficulty : difficulty_type();
const PoolBlock* tip = m_chainTip;
return tip ? tip->m_cumulativeDifficulty : difficulty_type();
}
uint64_t SideChain::miner_count()
@ -809,7 +817,8 @@ uint64_t SideChain::miner_count()
uint64_t SideChain::last_updated() const
{
return m_chainTip ? m_chainTip->m_localTimestamp : 0;
const PoolBlock* tip = m_chainTip;
return tip ? tip->m_localTimestamp : 0;
}
bool SideChain::is_default() const
@ -1356,8 +1365,10 @@ void SideChain::update_chain_tip(PoolBlock* block)
return;
}
const PoolBlock* tip = m_chainTip;
bool is_alternative;
if (is_longer_chain(m_chainTip, block, is_alternative)) {
if (is_longer_chain(tip, block, is_alternative)) {
difficulty_type diff;
if (get_difficulty(block, m_difficultyData, diff)) {
m_chainTip = block;
@ -1365,7 +1376,7 @@ void SideChain::update_chain_tip(PoolBlock* block)
LOGINFO(2, "new chain tip: next height = " << log::Gray() << block->m_sidechainHeight + 1 << log::NoColor() <<
", next difficulty = " << log::Gray() << m_curDifficulty << log::NoColor() <<
", main chain height = " << log::Gray() << m_chainTip->m_txinGenHeight);
", main chain height = " << log::Gray() << block->m_txinGenHeight);
block->m_wantBroadcast = true;
if (m_pool) {
@ -1380,13 +1391,13 @@ void SideChain::update_chain_tip(PoolBlock* block)
prune_old_blocks();
}
}
else if (block->m_sidechainHeight > m_chainTip->m_sidechainHeight) {
else if (block->m_sidechainHeight > tip->m_sidechainHeight) {
LOGINFO(4, "block " << block->m_sidechainId <<
", height = " << block->m_sidechainHeight <<
" is not a longer chain than " << m_chainTip->m_sidechainId <<
", height " << m_chainTip->m_sidechainHeight);
" is not a longer chain than " << tip->m_sidechainId <<
", height " << tip->m_sidechainHeight);
}
else if (block->m_sidechainHeight + UNCLE_BLOCK_DEPTH > m_chainTip->m_sidechainHeight) {
else if (block->m_sidechainHeight + UNCLE_BLOCK_DEPTH > tip->m_sidechainHeight) {
LOGINFO(4, "possible uncle block: id = " << log::Gray() << block->m_sidechainId << log::NoColor() <<
", height = " << log::Gray() << block->m_sidechainHeight);
m_pool->update_block_template_async();
@ -1519,8 +1530,9 @@ bool SideChain::is_longer_chain(const PoolBlock* block, const PoolBlock* candida
}
// Final check: candidate chain must be built on top of recent mainchain blocks
if (candidate_mainchain_height + 10 < m_pool->miner_data().height) {
LOGWARN(3, "received a longer alternative chain but it's stale: height " << candidate_mainchain_height << ", current height " << m_pool->miner_data().height);
MinerData data = m_pool->miner_data();
if (candidate_mainchain_height + 10 < data.height) {
LOGWARN(3, "received a longer alternative chain but it's stale: height " << candidate_mainchain_height << ", current height " << data.height);
return false;
}
@ -1604,11 +1616,13 @@ void SideChain::prune_old_blocks()
const uint64_t cur_time = seconds_since_epoch();
const uint64_t prune_delay = m_chainWindowSize * 4 * m_targetBlockTime;
if (m_chainTip->m_sidechainHeight < prune_distance) {
const PoolBlock* tip = m_chainTip;
if (tip->m_sidechainHeight < prune_distance) {
return;
}
const uint64_t h = m_chainTip->m_sidechainHeight - prune_distance;
const uint64_t h = tip->m_sidechainHeight - prune_distance;
uint64_t num_blocks_pruned = 0;

@ -99,7 +99,7 @@ private:
bool check_config();
mutable uv_mutex_t m_sidechainLock;
PoolBlock* m_chainTip;
std::atomic<PoolBlock*> m_chainTip;
std::map<uint64_t, std::vector<PoolBlock*>> m_blocksByHeight;
unordered_map<hash, PoolBlock*> m_blocksById;
unordered_map<hash, uint64_t> m_seenWallets;

@ -53,7 +53,7 @@ public:
struct Client
{
Client();
virtual ~Client();
virtual ~Client() {}
virtual void reset();
virtual bool on_connect() = 0;
@ -90,8 +90,6 @@ public:
uint32_t m_numRead;
std::atomic<uint32_t> m_resetCounter{ 0 };
uv_mutex_t m_sendLock;
};
struct WriteBuf
@ -152,7 +150,7 @@ protected:
int m_listenPort;
uv_loop_t m_loop;
volatile bool m_loopStopped;
std::atomic<bool> m_loopStopped;
uv_mutex_t m_clientsListLock;
std::vector<Client*> m_preallocatedClients;

@ -27,7 +27,7 @@ TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::TCPServer(allocate_client_callback all
, m_loopThread{}
, m_finished(0)
, m_listenPort(-1)
, m_loopStopped(false)
, m_loopStopped{false}
, m_numConnections(0)
, m_numIncomingConnections(0)
{
@ -532,8 +532,6 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::send_internal(Client* client, Sen
LOGERR(1, "sending data from another thread, this is not thread safe");
}
MutexLock lock0(client->m_sendLock);
WriteBuf* buf = nullptr;
{
@ -620,9 +618,11 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_new_connection(uv_stream_t* se
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_connection_close(uv_handle_t* handle)
{
Client* client = static_cast<Client*>(handle->data);
MutexLock lock0(client->m_sendLock);
if (!server_event_loop_thread) {
LOGERR(1, "on_connection_close called from another thread, this is not thread safe");
}
Client* client = static_cast<Client*>(handle->data);
TCPServer* owner = client->m_owner;
LOGINFO(5, "peer " << log::Gray() << static_cast<char*>(client->m_addrString) << log::NoColor() << " disconnected");
@ -811,17 +811,9 @@ TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::Client()
{
Client::reset();
uv_mutex_init_checked(&m_sendLock);
m_readBuf[0] = '\0';
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::~Client()
{
uv_mutex_destroy(&m_sendLock);
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::reset()
{

Loading…
Cancel
Save