From 5bd518da4b9faf425513bd3b7b19342a702bbf19 Mon Sep 17 00:00:00 2001 From: SChernykh Date: Mon, 16 Jan 2023 07:18:08 +0100 Subject: [PATCH] Optimized BlockTemplate::update() --- src/block_template.cpp | 47 +++++++++++++++++++++++++++++++- src/block_template.h | 2 +- src/side_chain.cpp | 54 +++++++------------------------------ src/uv_util.h | 61 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 118 insertions(+), 46 deletions(-) diff --git a/src/block_template.cpp b/src/block_template.cpp index c195ea9..a10d564 100644 --- a/src/block_template.cpp +++ b/src/block_template.cpp @@ -138,7 +138,7 @@ BlockTemplate& BlockTemplate::operator=(const BlockTemplate& b) m_seedHash = b.m_seedHash; m_timestamp = b.m_timestamp; *m_poolBlockTemplate = *b.m_poolBlockTemplate; - m_finalReward = b.m_finalReward; + m_finalReward = b.m_finalReward.load(); memcpy(m_minerTxKeccakState, b.m_minerTxKeccakState, sizeof(m_minerTxKeccakState)); m_minerTxKeccakStateInputLength = b.m_minerTxKeccakStateInputLength; @@ -277,6 +277,51 @@ void BlockTemplate::update(const MinerData& data, const Mempool& mempool, Wallet m_sidechain->fill_sidechain_data(*m_poolBlockTemplate, m_shares); + // Pre-calculate outputs to speed up miner tx generation + if (!m_shares.empty()) { + struct Precalc + { + FORCEINLINE Precalc(const std::vector& s, const hash& k) : txKeySec(k) + { + const size_t N = s.size(); + counter = static_cast(N) - 1; + shares = reinterpret_cast*>(malloc_hook(sizeof(std::pair) * N)); + if (shares) { + const MinerShare* src = &s[0]; + std::pair* dst = shares; + std::pair* e = shares + N; + + for (; dst < e; ++src, ++dst) { + const Wallet* w = src->m_wallet; + dst->first = w->view_public_key(); + dst->second = w->spend_public_key(); + } + } + } + + FORCEINLINE Precalc(Precalc&& rhs) noexcept : txKeySec(rhs.txKeySec), counter(rhs.counter.load()), shares(rhs.shares) { rhs.shares = nullptr; } + FORCEINLINE ~Precalc() { free_hook(shares); } + + FORCEINLINE void operator()() + { + if (shares) { + hash derivation, eph_public_key; + int i; + while ((i = counter.fetch_sub(1)) >= 0) { + uint8_t view_tag; + generate_key_derivation(shares[i].first, txKeySec, i, derivation, view_tag); + derive_public_key(derivation, i, shares[i].second, eph_public_key); + } + } + } + + hash txKeySec; + std::atomic counter; + std::pair* shares; + }; + parallel_run(uv_default_loop_checked(), Precalc(m_shares, m_poolBlockTemplate->m_txkeySec)); + } + // Only choose transactions that were received 10 or more seconds ago, or high fee (>= 0.006 XMR) transactions size_t total_mempool_transactions; { diff --git a/src/block_template.h b/src/block_template.h index 2a10991..bb90336 100644 --- a/src/block_template.h +++ b/src/block_template.h @@ -102,7 +102,7 @@ private: BlockTemplate* m_oldTemplates[4] = {}; - uint64_t m_finalReward; + std::atomic m_finalReward; // Temp vectors, will be cleaned up after use and skipped in copy constructor/assignment operators std::vector m_minerTx; diff --git a/src/side_chain.cpp b/src/side_chain.cpp index 5d5583a..5c229dc 100644 --- a/src/side_chain.cpp +++ b/src/side_chain.cpp @@ -780,52 +780,18 @@ bool SideChain::get_outputs_blob(PoolBlock* block, uint64_t total_reward, std::v // Helper jobs call get_eph_public_key with indices in descending order // Current thread will process indices in ascending order so when they meet, everything will be cached if (loop) { - uint32_t HELPER_JOBS_COUNT = std::thread::hardware_concurrency(); - - // this thread will also be running, so reduce helper job count by 1 - if (HELPER_JOBS_COUNT > 0) { - --HELPER_JOBS_COUNT; - } - - // No more than 8 helper jobs because our UV worker thread pool has 8 threads - if (HELPER_JOBS_COUNT > 8) { - HELPER_JOBS_COUNT = 8; - } - - struct Work - { - uv_work_t req; - std::shared_ptr data; - }; - - for (size_t i = 0; i < HELPER_JOBS_COUNT; ++i) { - Work* w = new Work{ {}, data }; - w->req.data = w; - - const int err = uv_queue_work(loop, &w->req, - [](uv_work_t* req) - { - Data* d = reinterpret_cast(req->data)->data.get(); - hash eph_public_key; - - int index; - while ((index = d->counter.fetch_sub(1)) >= 0) { - uint8_t view_tag; - if (!d->tmpShares[index].m_wallet->get_eph_public_key(d->txkeySec, static_cast(index), eph_public_key, view_tag)) { - LOGWARN(6, "get_eph_public_key failed at index " << index); - } - } - }, - [](uv_work_t* req, int /*status*/) - { - delete reinterpret_cast(req->data); - }); + parallel_run(loop, [data]() { + Data* d = data.get(); + hash eph_public_key; - if (err) { - LOGERR(1, "get_outputs_blob: uv_queue_work failed, error " << uv_err_name(err)); - delete w; + int index; + while ((index = d->counter.fetch_sub(1)) >= 0) { + uint8_t view_tag; + if (!d->tmpShares[index].m_wallet->get_eph_public_key(d->txkeySec, static_cast(index), eph_public_key, view_tag)) { + LOGWARN(6, "get_eph_public_key failed at index " << index); + } } - } + }); } blob.reserve(n * 39 + 64); diff --git a/src/uv_util.h b/src/uv_util.h index aff1069..b3b9477 100644 --- a/src/uv_util.h +++ b/src/uv_util.h @@ -18,6 +18,7 @@ #pragma once #include +#include static_assert(sizeof(in6_addr) == 16, "struct in6_addr has invalid size"); static_assert(sizeof(in_addr) == 4, "struct in_addr has invalid size"); @@ -175,4 +176,64 @@ bool CallOnLoop(uv_loop_t* loop, T&& callback) return false; } +template +void parallel_run(uv_loop_t* loop, T&& callback, bool wait = false) +{ + uint32_t THREAD_COUNT = std::thread::hardware_concurrency(); + + if (THREAD_COUNT > 0) { + --THREAD_COUNT; + } + + // No more than 8 threads because our UV worker thread pool has 8 threads + if (THREAD_COUNT > 8) { + THREAD_COUNT = 8; + } + + struct Callback + { + explicit FORCEINLINE Callback(T&& f) : m_func(std::move(f)) {} + Callback& operator=(Callback&&) = delete; + + T m_func; + }; + + std::shared_ptr cb = std::make_shared(std::move(callback)); + + struct Work + { + uv_work_t req; + std::shared_ptr cb; + }; + + for (size_t i = 0; i < THREAD_COUNT; ++i) { + Work* w = new Work{ {}, cb }; + w->req.data = w; + + const int err = uv_queue_work(loop, &w->req, + [](uv_work_t* req) + { + std::shared_ptr& cb = reinterpret_cast(req->data)->cb; + cb->m_func(); + cb.reset(); + }, + [](uv_work_t* req, int) + { + delete reinterpret_cast(req->data); + }); + + if (err) { + delete w; + } + } + + if (wait) { + cb->m_func(); + + while (cb.use_count() > 1) { + std::this_thread::yield(); + } + } +} + } // namespace p2pool