Speedup SideChain::get_outputs_blob()

pull/206/head
SChernykh 2 years ago
parent ea6a19a260
commit 30861bbf91

@ -198,7 +198,7 @@ void BlockCache::load_all(SideChain& side_chain, P2PServer& server)
continue;
}
if (block.deserialize(data + sizeof(uint32_t), n, side_chain) == 0) {
if (block.deserialize(data + sizeof(uint32_t), n, side_chain, uv_default_loop_checked()) == 0) {
server.add_cached_block(block);
++blocks_loaded;
}

@ -548,7 +548,7 @@ void BlockTemplate::update(const MinerData& data, const Mempool& mempool, Wallet
buf.insert(buf.end(), m_poolBlockTemplate->m_sideChainData.begin(), m_poolBlockTemplate->m_sideChainData.end());
PoolBlock check;
const int result = check.deserialize(buf.data(), buf.size(), m_pool->side_chain());
const int result = check.deserialize(buf.data(), buf.size(), m_pool->side_chain(), nullptr);
if (result != 0) {
LOGERR(1, "pool block blob generation and/or parsing is broken, error " << result);
}
@ -1077,7 +1077,7 @@ void BlockTemplate::submit_sidechain_block(uint32_t template_id, uint32_t nonce,
buf.insert(buf.end(), m_poolBlockTemplate->m_sideChainData.begin(), m_poolBlockTemplate->m_sideChainData.end());
PoolBlock check;
const int result = check.deserialize(buf.data(), buf.size(), side_chain);
const int result = check.deserialize(buf.data(), buf.size(), side_chain, nullptr);
if (result != 0) {
LOGERR(1, "pool block blob generation and/or parsing is broken, error " << result);
}

@ -900,7 +900,7 @@ int P2PServer::deserialize_block(const uint8_t* buf, uint32_t size)
result = m_blockDeserializeResult;
}
else {
result = m_block->deserialize(buf, size, m_pool->side_chain());
result = m_block->deserialize(buf, size, m_pool->side_chain(), &m_loop);
m_blockDeserializeBuf.assign(buf, buf + size);
m_blockDeserializeResult = result;
m_lookForMissingBlocks = true;

@ -139,7 +139,7 @@ struct PoolBlock
void serialize_mainchain_data(uint32_t nonce, uint32_t extra_nonce, const hash& sidechain_hash);
void serialize_sidechain_data();
int deserialize(const uint8_t* data, size_t size, const SideChain& sidechain);
int deserialize(const uint8_t* data, size_t size, const SideChain& sidechain, uv_loop_t* loop);
void reset_offchain_data();
bool get_pow_hash(RandomX_Hasher_Base* hasher, uint64_t height, const hash& seed_hash, hash& pow_hash);

@ -23,7 +23,7 @@ namespace p2pool {
// Since data here can come from external and possibly malicious sources, check everything
// Only the syntax (i.e. the serialized block binary format) and the keccak hash are checked here
// Semantics must also be checked elsewhere before accepting the block (PoW, reward split between miners, difficulty calculation and so on)
int PoolBlock::deserialize(const uint8_t* data, size_t size, const SideChain& sidechain)
int PoolBlock::deserialize(const uint8_t* data, size_t size, const SideChain& sidechain, uv_loop_t* loop)
{
try {
// Sanity check
@ -272,7 +272,7 @@ int PoolBlock::deserialize(const uint8_t* data, size_t size, const SideChain& si
return __LINE__;
}
if ((num_outputs == 0) && !sidechain.get_outputs_blob(this, total_reward, outputs_blob)) {
if ((num_outputs == 0) && !sidechain.get_outputs_blob(this, total_reward, outputs_blob, loop)) {
return __LINE__;
}

@ -682,7 +682,7 @@ bool SideChain::get_block_blob(const hash& id, std::vector<uint8_t>& blob) const
return true;
}
bool SideChain::get_outputs_blob(PoolBlock* block, uint64_t total_reward, std::vector<uint8_t>& blob) const
bool SideChain::get_outputs_blob(PoolBlock* block, uint64_t total_reward, std::vector<uint8_t>& blob, uv_loop_t* loop) const
{
blob.clear();
@ -719,6 +719,60 @@ bool SideChain::get_outputs_blob(PoolBlock* block, uint64_t total_reward, std::v
const size_t n = tmpShares.size();
// Helper jobs call get_eph_public_key with indices in descending order
// Current thread will process indices in ascending order so when they meet, everything will be cached
std::atomic<int> counter{ 0 };
std::atomic<int> num_helper_jobs_finished{ 0 };
int num_helper_jobs_started = 0;
if (loop) {
constexpr size_t HELPER_JOBS_COUNT = 4;
struct Work
{
uv_work_t req;
const std::vector<MinerShare>& tmpShares;
const hash& txkeySec;
std::atomic<int>& counter;
std::atomic<int>& num_helper_jobs_finished;
// Fix MSVC warnings
Work() = delete;
Work& operator=(Work&&) = delete;
};
counter = static_cast<int>(n) - 1;
num_helper_jobs_started = HELPER_JOBS_COUNT;
for (size_t i = 0; i < HELPER_JOBS_COUNT; ++i) {
Work* w = new Work{ {}, tmpShares, block->m_txkeySec, counter, num_helper_jobs_finished };
w->req.data = w;
const int err = uv_queue_work(loop, &w->req,
[](uv_work_t* req)
{
Work* work = reinterpret_cast<Work*>(req->data);
hash eph_public_key;
int index;
while ((index = work->counter.fetch_sub(1)) >= 0) {
uint8_t view_tag;
work->tmpShares[index].m_wallet->get_eph_public_key(work->txkeySec, static_cast<size_t>(index), eph_public_key, view_tag);
}
++work->num_helper_jobs_finished;
delete work;
}, nullptr);
if (err) {
LOGERR(1, "get_outputs_blob: uv_queue_work failed, error " << uv_err_name(err));
--num_helper_jobs_started;
delete w;
}
}
}
blob.reserve(n * 39 + 64);
writeVarint(n, blob);
@ -730,6 +784,13 @@ bool SideChain::get_outputs_blob(PoolBlock* block, uint64_t total_reward, std::v
hash eph_public_key;
for (size_t i = 0; i < n; ++i) {
// stop helper jobs when they meet with current thread
const int c = counter.load();
if ((c >= 0) && (static_cast<int>(i) >= c)) {
// this will cause all helper jobs to finish immediately
counter = -1;
}
writeVarint(tmpRewards[i], blob);
blob.emplace_back(tx_type);
@ -747,6 +808,15 @@ bool SideChain::get_outputs_blob(PoolBlock* block, uint64_t total_reward, std::v
block->m_outputs.emplace_back(tmpRewards[i], eph_public_key, tx_type, view_tag);
}
if (loop) {
// this will cause all helper jobs to finish immediately
counter = -1;
while (num_helper_jobs_finished < num_helper_jobs_started) {
std::this_thread::yield();
}
}
return true;
}

@ -56,7 +56,7 @@ public:
void watch_mainchain_block(const ChainMain& data, const hash& possible_id);
bool get_block_blob(const hash& id, std::vector<uint8_t>& blob) const;
bool get_outputs_blob(PoolBlock* block, uint64_t total_reward, std::vector<uint8_t>& blob) const;
bool get_outputs_blob(PoolBlock* block, uint64_t total_reward, std::vector<uint8_t>& blob, uv_loop_t* loop) const;
void print_status() const;
double get_reward_share(const Wallet& w) const;

@ -49,7 +49,7 @@ TEST(pool_block, deserialize)
f.read(reinterpret_cast<char*>(buf.data()), buf.size());
ASSERT_EQ(f.good(), true);
ASSERT_EQ(b.deserialize(buf.data(), buf.size(), sidechain), 0);
ASSERT_EQ(b.deserialize(buf.data(), buf.size(), sidechain, nullptr), 0);
ASSERT_EQ(b.m_mainChainData.size(), 5607);
ASSERT_EQ(b.m_mainChainHeaderSize, 43);
@ -121,7 +121,7 @@ TEST(pool_block, verify)
p += sizeof(uint32_t);
ASSERT_TRUE(p + n <= e);
ASSERT_EQ(b.deserialize(p, n, sidechain), 0);
ASSERT_EQ(b.deserialize(p, n, sidechain, nullptr), 0);
p += n;
sidechain.add_block(b);

Loading…
Cancel
Save