Blockchain read thread pool added

sync_microcore
moneroexamples 5 years ago
parent 6df55b056a
commit 84fae22a32

@ -57,6 +57,8 @@
"search_thread_life_in_seconds" : 120,
"max_number_of_blocks_to_import" : 132000,
"mysql_ping_every_seconds" : 200,
"_comment": "if the threadpool_size (no of threads) below is 0, its size is automaticly set based on your cpu. If its not 0, the value specified is used instead",
"blockchain_treadpool_size" : 0,
"ssl" :
{
"enable" : false,

@ -141,8 +141,10 @@ xmreg::MySqlConnector::dbname = config_json["database"]["dbname"];
// number of thread in blockchain access pool thread
auto threads_no = std::max<uint32_t>(
std::thread::hardware_concurrency()/2, 2u) - 1;
std::thread::hardware_concurrency()/2, 1u) - 1;
if (bc_setup.blockchain_treadpool_size > 0)
threads_no = bc_setup.blockchain_treadpool_size;
OMINFO << "Thread pool size: " << threads_no << " threads";

@ -170,6 +170,8 @@ BlockchainSetup::_init()
= config_json["wallet_import"]["fee"];
mysql_ping_every
= seconds {config_json["mysql_ping_every_seconds"]};
blockchain_treadpool_size
= config_json["blockchain_treadpool_size"];
get_blockchain_path();

@ -49,6 +49,8 @@ public:
uint64_t max_number_of_blocks_to_import {132000};
uint64_t blockchain_treadpool_size {0};
string import_payment_address_str;
string import_payment_viewkey_str;

@ -132,22 +132,6 @@ CurrentBlockchainStatus::get_block(uint64_t height, block& blk)
return future_result.get();
}
//vector<block>
//CurrentBlockchainStatus::get_blocks_range(
//uint64_t const& h1, uint64_t const& h2)
//{
//try
//{
//return mcore->get_blocks_range(h1, h2);
//}
//catch (BLOCK_DNE& e)
//{
//cerr << e.what() << endl;
//}
//return {};
//}
vector<block>
CurrentBlockchainStatus::get_blocks_range(
uint64_t const& h1, uint64_t const& h2)
@ -235,12 +219,10 @@ CurrentBlockchainStatus::get_txs(
bool
CurrentBlockchainStatus::tx_exist(const crypto::hash& tx_hash)
{
//return mcore->have_tx(tx_hash);
auto future_result = thread_pool->submit(
[this](auto const& tx_hash) -> bool
{
return mcore->have_tx(tx_hash);
return this->mcore->have_tx(tx_hash);
}, std::cref(tx_hash));
return future_result.get();
@ -255,7 +237,7 @@ CurrentBlockchainStatus::tx_exist(
[this](auto const& tx_hash,
auto& tx_index) -> bool
{
return mcore->tx_exists(tx_hash, tx_index);
return this->mcore->tx_exists(tx_hash, tx_index);
}, std::cref(tx_hash), std::ref(tx_index));
return future_result.get();
@ -281,12 +263,12 @@ CurrentBlockchainStatus::get_output_tx_and_index(
uint64_t amount, uint64_t index) const
{
auto future_result = thread_pool->submit(
[this](auto amount, auto index)
-> tx_out_idx
{
return this->mcore
[this](auto amount, auto index)
-> tx_out_index
{
return this->mcore
->get_output_tx_and_index(amount, index);
}, amount, index);
}, amount, index);
return future_result.get();
}
@ -396,7 +378,7 @@ CurrentBlockchainStatus::get_amount_specific_indices(
// this index is lmdb index of a tx, not tx hash
uint64_t tx_index;
if (mcore->tx_exists(tx_hash, tx_index))
if (this->mcore->tx_exists(tx_hash, tx_index))
{
out_indices = this->mcore
->get_tx_amount_output_indices(tx_index);
@ -412,9 +394,24 @@ CurrentBlockchainStatus::get_amount_specific_indices(
}, std::cref(tx_hash), std::ref(out_indices));
future_result.get();
return true;
return future_result.get();
}
bool
CurrentBlockchainStatus::get_output_histogram(
COMMAND_RPC_GET_OUTPUT_HISTOGRAM::request& req,
COMMAND_RPC_GET_OUTPUT_HISTOGRAM::response& res) const
{
auto future_result = thread_pool->submit(
[this](auto const& req, auto& res)
-> bool
{
return this->mcore
->get_output_histogram(req, res);
}, std::cref(req), std::ref(res));
return future_result.get();
}
unique_ptr<RandomOutputs>
@ -422,7 +419,8 @@ CurrentBlockchainStatus::create_random_outputs_object(
vector<uint64_t> const& amounts,
uint64_t outs_count) const
{
return make_unique<RandomOutputs>(*mcore, amounts, outs_count);
return make_unique<RandomOutputs>(
this, amounts, outs_count);
}
bool
@ -444,6 +442,26 @@ CurrentBlockchainStatus::get_random_outputs(
return true;
}
bool
CurrentBlockchainStatus::get_outs(
COMMAND_RPC_GET_OUTPUTS_BIN::request const& req,
COMMAND_RPC_GET_OUTPUTS_BIN::response& res) const
{
auto future_result = thread_pool->submit(
[this](auto const& req, auto& res)
-> bool
{
if (!this->mcore->get_outs(req, res))
{
OMERROR << "mcore->get_outs(req, res) failed";
return false;
}
return true;
}, std::cref(req), std::ref(res));
return future_result.get();
}
bool
CurrentBlockchainStatus::get_output(
@ -457,11 +475,8 @@ CurrentBlockchainStatus::get_output(
req.outputs.push_back(
get_outputs_out {amount, global_output_index});
if (!mcore->get_outs(req, res))
{
OMERROR << "mcore->get_outs(req, res) failed";
if (!get_outs(req, res))
return false;
}
output_info = res.outs.at(0);
@ -473,8 +488,7 @@ CurrentBlockchainStatus::get_dynamic_per_kb_fee_estimate() const
{
const double byte_to_kbyte_factor = 1024;
uint64_t fee_per_byte = mcore->get_dynamic_base_fee_estimate(
FEE_ESTIMATE_GRACE_BLOCKS);
uint64_t fee_per_byte = get_dynamic_base_fee_estimate();
uint64_t fee_per_kB = static_cast<uint64_t>(
fee_per_byte * byte_to_kbyte_factor);
@ -485,15 +499,29 @@ CurrentBlockchainStatus::get_dynamic_per_kb_fee_estimate() const
uint64_t
CurrentBlockchainStatus::get_dynamic_base_fee_estimate() const
{
return mcore->get_dynamic_base_fee_estimate(
FEE_ESTIMATE_GRACE_BLOCKS);
auto future_result = thread_pool->submit(
[this]() -> uint64_t
{
return this->mcore
->get_dynamic_base_fee_estimate(
FEE_ESTIMATE_GRACE_BLOCKS);
});
return future_result.get();
}
uint64_t
CurrentBlockchainStatus::get_tx_unlock_time(
crypto::hash const& tx_hash) const
{
return mcore->get_tx_unlock_time(tx_hash);
auto future_result = thread_pool->submit(
[this](auto const& tx_hash) -> uint64_t
{
return this->mcore->get_tx_unlock_time(tx_hash);
}, std::cref(tx_hash));
return future_result.get();
}
bool
@ -518,12 +546,24 @@ CurrentBlockchainStatus::read_mempool()
// get txs in the mempool
std::vector<tx_info> mempool_tx_info;
vector<spent_key_image_info> key_image_infos;
auto future_result = thread_pool->submit(
[this](auto& mempool_tx_info, auto& key_image_infos)
-> bool
{
if (!this->mcore->get_mempool_txs(
mempool_tx_info, key_image_infos))
{
OMERROR << "Getting mempool failed ";
return false;
}
return true;
if (!mcore->get_mempool_txs(mempool_tx_info, key_image_infos))
{
OMERROR << "Getting mempool failed ";
}, std::ref(mempool_tx_info),
std::ref(key_image_infos));
if (!future_result.get())
return false;
}
// not using this info at present
(void) key_image_infos;
@ -695,7 +735,16 @@ CurrentBlockchainStatus::get_output_key(
uint64_t amount,
uint64_t global_amount_index)
{
return mcore->get_output_key(amount, global_amount_index);
auto future_result = thread_pool->submit(
[this](auto amount, auto global_amount_index)
-> output_data_t
{
return this->mcore->get_output_key(
amount, global_amount_index);
}, amount, global_amount_index);
return future_result.get();
}
bool
@ -914,7 +963,7 @@ CurrentBlockchainStatus::get_tx(
crypto::hash const& tx_hash,
transaction& tx)
{
auto future_result = TP::DefaultThreadPool::submitJob(
auto future_result = thread_pool->submit(
[this](auto const& tx_hash,
auto& tx) -> bool
{
@ -937,7 +986,7 @@ CurrentBlockchainStatus::get_tx(
return false;
}
return mcore->get_tx(tx_hash, tx);
return get_tx(tx_hash, tx);
}
bool
@ -948,7 +997,14 @@ CurrentBlockchainStatus::get_tx_block_height(
if (!tx_exist(tx_hash))
return false;
tx_height = mcore->get_tx_block_height(tx_hash);
auto future_result = thread_pool->submit(
[this](auto const tx_hash)
-> int64_t
{
return this->mcore->get_tx_block_height(tx_hash);
}, std::cref(tx_hash));
tx_height = future_result.get();
return true;
}

@ -148,6 +148,15 @@ public:
RandomOutputs::outs_for_amount_v&
found_outputs);
virtual bool
get_output_histogram(
COMMAND_RPC_GET_OUTPUT_HISTOGRAM::request& req,
COMMAND_RPC_GET_OUTPUT_HISTOGRAM::response& res) const;
virtual bool
get_outs(COMMAND_RPC_GET_OUTPUTS_BIN::request const& req,
COMMAND_RPC_GET_OUTPUTS_BIN::response& res) const;
virtual uint64_t
get_dynamic_per_kb_fee_estimate() const;

@ -43,6 +43,8 @@ OutputInputIdentification::OutputInputIdentification(
"Mismatched sizes of key_derivation and rct::key");
// use identity derivation instead
// solution based on that found in wallet2.cpp in monero
// this will cause the tx output to be effectively skipped
memcpy(&derivation, rct::identity().bytes, sizeof(derivation));
}
}

@ -1,13 +1,13 @@
#include "RandomOutputs.h"
#include "CurrentBlockchainStatus.h"
namespace xmreg
{
RandomOutputs::RandomOutputs(
MicroCore const& _mcore,
CurrentBlockchainStatus const* _cbs,
vector<uint64_t> const& _amounts,
uint64_t _outs_count)
: mcore {_mcore},
: cbs {_cbs},
amounts {_amounts},
outs_count {_outs_count}
{
@ -39,7 +39,7 @@ RandomOutputs::get_output_pub_key(
req.outputs.push_back(get_outputs_out {amount, global_output_index});
if (!mcore.get_outs(req, res))
if (!cbs->get_outs(req, res))
{
OMERROR << "mcore->get_outs(req, res) failed";
return false;
@ -65,7 +65,7 @@ RandomOutputs::find_random_outputs()
req.min_count = outs_count;
req.max_count = 0;
if (!mcore.get_output_histogram(req, res))
if (!cbs->get_output_histogram(req, res))
{
OMERROR << "mcore->get_output_histogram(req, res)";
return false;
@ -80,15 +80,16 @@ RandomOutputs::find_random_outputs()
// find histogram_entry for amount that we look
// random outputs for
auto const hist_entry_it = std::find_if(
begin(res.histogram), end(res.histogram),
[&amount](histogram_entry const& he)
{
return amount == he.amount;
});
std::begin(res.histogram), std::end(res.histogram),
[&amount](auto const& he)
{
return amount == he.amount;
});
if (hist_entry_it == res.histogram.end())
{
OMERROR << "Could not find amount: it == res.histogram.end()\n";
OMERROR << "Could not find amount: it "
"== res.histogram.end()\n";
return false;
}

@ -8,6 +8,8 @@
namespace xmreg
{
class CurrentBlockchainStatus;
/**
* @brief Returns random ouputs for given amounts
*
@ -46,7 +48,7 @@ public:
using outs_for_amount_v = vector<outs_for_amount>;
RandomOutputs(MicroCore const& _mcore,
RandomOutputs(CurrentBlockchainStatus const* _cbs,
vector<uint64_t> const& _amounts,
uint64_t _outs_count);
@ -59,7 +61,8 @@ public:
virtual ~RandomOutputs() = default;
protected:
MicroCore const& mcore;
//MicroCore const& mcore;
CurrentBlockchainStatus const* cbs;
vector<uint64_t> amounts;
uint64_t outs_count;

@ -41,8 +41,12 @@ protected:
rpc = std::make_unique<MockRPCCalls>("dummy deamon url");
rpc_ptr = rpc.get();
tp = std::make_unique<TP::ThreadPool>();
tp_ptr = tp.get();
bcs = std::make_unique<xmreg::CurrentBlockchainStatus>(
bc_setup, std::move(mcore), std::move(rpc));
bc_setup, std::move(mcore), std::move(rpc),
std::move(tp));
}
network_type net_type {network_type::STAGENET};
@ -50,10 +54,12 @@ protected:
xmreg::BlockchainSetup bc_setup;
std::unique_ptr<MockMicroCore> mcore;
std::unique_ptr<MockRPCCalls> rpc;
std::unique_ptr<TP::ThreadPool> tp;
std::unique_ptr<xmreg::CurrentBlockchainStatus> bcs;
MockMicroCore* mcore_ptr;
MockRPCCalls* rpc_ptr;
TP::ThreadPool* tp_ptr;
static nlohmann::json config_json;
};
@ -64,7 +70,8 @@ nlohmann::json BCSTATUS_TEST::config_json;
TEST_P(BCSTATUS_TEST, DefaultConstruction)
{
xmreg::CurrentBlockchainStatus bcs {bc_setup, nullptr, nullptr};
xmreg::CurrentBlockchainStatus bcs {
bc_setup, nullptr, nullptr, nullptr};
EXPECT_TRUE(true);
}

@ -180,7 +180,7 @@ class MockCurrentBlockchainStatus : public xmreg::CurrentBlockchainStatus
public:
MockCurrentBlockchainStatus()
: xmreg::CurrentBlockchainStatus(xmreg::BlockchainSetup(),
nullptr, nullptr)
nullptr, nullptr, nullptr)
{}
MOCK_METHOD3(get_output_keys,

@ -1199,7 +1199,7 @@ public:
MockCurrentBlockchainStatus1()
: xmreg::CurrentBlockchainStatus(
xmreg::BlockchainSetup(),
nullptr, nullptr)
nullptr, nullptr, nullptr)
{}
bool tx_unlock_state {true};

@ -490,8 +490,12 @@ protected:
rpc = std::make_unique<MockRPCCalls>("dummy deamon url");
rpc_ptr = rpc.get();
tp = std::make_unique<TP::ThreadPool>();
tp_ptr = tp.get();
bcs = std::make_unique<xmreg::CurrentBlockchainStatus>(
bc_setup, std::move(mcore), std::move(rpc));
bc_setup, std::move(mcore), std::move(rpc),
std::move(tp));
}
network_type net_type {network_type::STAGENET};
@ -500,9 +504,11 @@ protected:
std::unique_ptr<MockMicroCore> mcore;
std::unique_ptr<MockRPCCalls> rpc;
std::unique_ptr<xmreg::CurrentBlockchainStatus> bcs;
std::unique_ptr<TP::ThreadPool> tp;
MockMicroCore* mcore_ptr;
MockRPCCalls* rpc_ptr;
TP::ThreadPool* tp_ptr;
static json config_json;
};

Loading…
Cancel
Save