Optimized BlockTemplate::update()

pull/1/head
SChernykh 1 year ago
parent ec8e40907d
commit 5bd518da4b

@ -138,7 +138,7 @@ BlockTemplate& BlockTemplate::operator=(const BlockTemplate& b)
m_seedHash = b.m_seedHash;
m_timestamp = b.m_timestamp;
*m_poolBlockTemplate = *b.m_poolBlockTemplate;
m_finalReward = b.m_finalReward;
m_finalReward = b.m_finalReward.load();
memcpy(m_minerTxKeccakState, b.m_minerTxKeccakState, sizeof(m_minerTxKeccakState));
m_minerTxKeccakStateInputLength = b.m_minerTxKeccakStateInputLength;
@ -277,6 +277,51 @@ void BlockTemplate::update(const MinerData& data, const Mempool& mempool, Wallet
m_sidechain->fill_sidechain_data(*m_poolBlockTemplate, m_shares);
// Pre-calculate outputs to speed up miner tx generation
if (!m_shares.empty()) {
struct Precalc
{
FORCEINLINE Precalc(const std::vector<MinerShare>& s, const hash& k) : txKeySec(k)
{
const size_t N = s.size();
counter = static_cast<int>(N) - 1;
shares = reinterpret_cast<std::pair<hash, hash>*>(malloc_hook(sizeof(std::pair<hash, hash>) * N));
if (shares) {
const MinerShare* src = &s[0];
std::pair<hash, hash>* dst = shares;
std::pair<hash, hash>* e = shares + N;
for (; dst < e; ++src, ++dst) {
const Wallet* w = src->m_wallet;
dst->first = w->view_public_key();
dst->second = w->spend_public_key();
}
}
}
FORCEINLINE Precalc(Precalc&& rhs) noexcept : txKeySec(rhs.txKeySec), counter(rhs.counter.load()), shares(rhs.shares) { rhs.shares = nullptr; }
FORCEINLINE ~Precalc() { free_hook(shares); }
FORCEINLINE void operator()()
{
if (shares) {
hash derivation, eph_public_key;
int i;
while ((i = counter.fetch_sub(1)) >= 0) {
uint8_t view_tag;
generate_key_derivation(shares[i].first, txKeySec, i, derivation, view_tag);
derive_public_key(derivation, i, shares[i].second, eph_public_key);
}
}
}
hash txKeySec;
std::atomic<int> counter;
std::pair<hash, hash>* shares;
};
parallel_run(uv_default_loop_checked(), Precalc(m_shares, m_poolBlockTemplate->m_txkeySec));
}
// Only choose transactions that were received 10 or more seconds ago, or high fee (>= 0.006 XMR) transactions
size_t total_mempool_transactions;
{

@ -102,7 +102,7 @@ private:
BlockTemplate* m_oldTemplates[4] = {};
uint64_t m_finalReward;
std::atomic<uint64_t> m_finalReward;
// Temp vectors, will be cleaned up after use and skipped in copy constructor/assignment operators
std::vector<uint8_t> m_minerTx;

@ -780,52 +780,18 @@ bool SideChain::get_outputs_blob(PoolBlock* block, uint64_t total_reward, std::v
// Helper jobs call get_eph_public_key with indices in descending order
// Current thread will process indices in ascending order so when they meet, everything will be cached
if (loop) {
uint32_t HELPER_JOBS_COUNT = std::thread::hardware_concurrency();
// this thread will also be running, so reduce helper job count by 1
if (HELPER_JOBS_COUNT > 0) {
--HELPER_JOBS_COUNT;
}
// No more than 8 helper jobs because our UV worker thread pool has 8 threads
if (HELPER_JOBS_COUNT > 8) {
HELPER_JOBS_COUNT = 8;
}
struct Work
{
uv_work_t req;
std::shared_ptr<Data> data;
};
for (size_t i = 0; i < HELPER_JOBS_COUNT; ++i) {
Work* w = new Work{ {}, data };
w->req.data = w;
const int err = uv_queue_work(loop, &w->req,
[](uv_work_t* req)
{
Data* d = reinterpret_cast<Work*>(req->data)->data.get();
hash eph_public_key;
int index;
while ((index = d->counter.fetch_sub(1)) >= 0) {
uint8_t view_tag;
if (!d->tmpShares[index].m_wallet->get_eph_public_key(d->txkeySec, static_cast<size_t>(index), eph_public_key, view_tag)) {
LOGWARN(6, "get_eph_public_key failed at index " << index);
}
}
},
[](uv_work_t* req, int /*status*/)
{
delete reinterpret_cast<Work*>(req->data);
});
parallel_run(loop, [data]() {
Data* d = data.get();
hash eph_public_key;
if (err) {
LOGERR(1, "get_outputs_blob: uv_queue_work failed, error " << uv_err_name(err));
delete w;
int index;
while ((index = d->counter.fetch_sub(1)) >= 0) {
uint8_t view_tag;
if (!d->tmpShares[index].m_wallet->get_eph_public_key(d->txkeySec, static_cast<size_t>(index), eph_public_key, view_tag)) {
LOGWARN(6, "get_eph_public_key failed at index " << index);
}
}
}
});
}
blob.reserve(n * 39 + 64);

@ -18,6 +18,7 @@
#pragma once
#include <uv.h>
#include <thread>
static_assert(sizeof(in6_addr) == 16, "struct in6_addr has invalid size");
static_assert(sizeof(in_addr) == 4, "struct in_addr has invalid size");
@ -175,4 +176,64 @@ bool CallOnLoop(uv_loop_t* loop, T&& callback)
return false;
}
template<typename T>
void parallel_run(uv_loop_t* loop, T&& callback, bool wait = false)
{
uint32_t THREAD_COUNT = std::thread::hardware_concurrency();
if (THREAD_COUNT > 0) {
--THREAD_COUNT;
}
// No more than 8 threads because our UV worker thread pool has 8 threads
if (THREAD_COUNT > 8) {
THREAD_COUNT = 8;
}
struct Callback
{
explicit FORCEINLINE Callback(T&& f) : m_func(std::move(f)) {}
Callback& operator=(Callback&&) = delete;
T m_func;
};
std::shared_ptr<Callback> cb = std::make_shared<Callback>(std::move(callback));
struct Work
{
uv_work_t req;
std::shared_ptr<Callback> cb;
};
for (size_t i = 0; i < THREAD_COUNT; ++i) {
Work* w = new Work{ {}, cb };
w->req.data = w;
const int err = uv_queue_work(loop, &w->req,
[](uv_work_t* req)
{
std::shared_ptr<Callback>& cb = reinterpret_cast<Work*>(req->data)->cb;
cb->m_func();
cb.reset();
},
[](uv_work_t* req, int)
{
delete reinterpret_cast<Work*>(req->data);
});
if (err) {
delete w;
}
}
if (wait) {
cb->m_func();
while (cb.use_count() > 1) {
std::this_thread::yield();
}
}
}
} // namespace p2pool

Loading…
Cancel
Save