Merge pull request #130 from moneroexamples/sync_microcore

ThreadPool for accessing blockchain added
use_xmregcore
moneroexamples 5 years ago committed by GitHub
commit 47bd8c9fe5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -57,6 +57,8 @@
"search_thread_life_in_seconds" : 120,
"max_number_of_blocks_to_import" : 132000,
"mysql_ping_every_seconds" : 200,
"_comment": "if the threadpool_size (no of threads) below is 0, its size is automaticly set based on your cpu. If its not 0, the value specified is used instead",
"blockchain_treadpool_size" : 0,
"ssl" :
{
"enable" : false,

@ -0,0 +1,254 @@
/**
* The ThreadPool class.
* Keeps a set of threads constantly waiting to execute incoming jobs.
* source: http://roar11.com/2016/01/a-platform-independent-thread-pool-using-c14/
*/
#pragma once
#ifndef THREADPOOL_HPP
#define THREADPOOL_HPP
#include "ThreadSafeQueue.hpp"
#include <algorithm>
#include <atomic>
#include <cstdint>
#include <functional>
#include <future>
#include <memory>
#include <thread>
#include <type_traits>
#include <utility>
#include <vector>
namespace TP
{
class ThreadPool
{
private:
class IThreadTask
{
public:
IThreadTask(void) = default;
virtual ~IThreadTask(void) = default;
IThreadTask(const IThreadTask& rhs) = delete;
IThreadTask& operator=(const IThreadTask& rhs) = delete;
IThreadTask(IThreadTask&& other) = default;
IThreadTask& operator=(IThreadTask&& other) = default;
/**
* Run the task.
*/
virtual void execute() = 0;
};
template <typename Func>
class ThreadTask: public IThreadTask
{
public:
ThreadTask(Func&& func)
:m_func{std::move(func)}
{
}
~ThreadTask(void) override = default;
ThreadTask(const ThreadTask& rhs) = delete;
ThreadTask& operator=(const ThreadTask& rhs) = delete;
ThreadTask(ThreadTask&& other) = default;
ThreadTask& operator=(ThreadTask&& other) = default;
/**
* Run the task.
*/
void execute() override
{
m_func();
}
private:
Func m_func;
};
public:
/**
* A wrapper around a std::future that adds the behavior of futures returned from
std::async.
* Specifically, this object will block and wait for execution to finish before going out
of scope.
*/
template <typename T>
class TaskFuture
{
public:
TaskFuture(std::future<T>&& future)
:m_future{std::move(future)}
{
}
TaskFuture(const TaskFuture& rhs) = delete;
TaskFuture& operator=(const TaskFuture& rhs) = delete;
TaskFuture(TaskFuture&& other) = default;
TaskFuture& operator=(TaskFuture&& other) = default;
~TaskFuture(void)
{
if(m_future.valid())
{
m_future.get();
}
}
auto get(void)
{
return m_future.get();
}
private:
std::future<T> m_future;
};
public:
/**
* Constructor.
*/
ThreadPool(void)
:ThreadPool{std::max(std::thread::hardware_concurrency()/2, 2u) - 1u}
{
/*
* Always create at least one thread. If hardware_concurrency() returns 0,
* subtracting one would turn it to UINT_MAX, so get the maximum of
* hardware_concurrency() and 2 before subtracting 1.
*/
}
/**
* Constructor.
*/
explicit ThreadPool(const std::uint32_t numThreads)
:m_done{false},
m_workQueue{},
m_threads{}
{
try
{
for(std::uint32_t i = 0u; i < numThreads; ++i)
{
m_threads.emplace_back(&ThreadPool::worker, this);
}
}
catch(...)
{
destroy();
throw;
}
}
/**
* Non-copyable.
*/
ThreadPool(const ThreadPool& rhs) = delete;
/**
* Non-assignable.
*/
ThreadPool& operator=(const ThreadPool& rhs) = delete;
/**
* Destructor.
*/
~ThreadPool(void)
{
destroy();
}
auto queueSize() const
{
return m_workQueue.size();
}
/**
* Submit a job to be run by the thread pool.
*/
template <typename Func, typename... Args>
auto submit(Func&& func, Args&&... args)
{
auto boundTask = std::bind(std::forward<Func>(func), std::forward<Args>(args)...);
using ResultType = std::result_of_t<decltype(boundTask)()>;
using PackagedTask = std::packaged_task<ResultType()>;
using TaskType = ThreadTask<PackagedTask>;
PackagedTask task{std::move(boundTask)};
TaskFuture<ResultType> result{task.get_future()};
m_workQueue.push(std::make_unique<TaskType>(std::move(task)));
return result;
}
private:
/**
* Constantly running function each thread uses to acquire work items from the queue.
*/
void worker(void)
{
while(!m_done)
{
std::unique_ptr<IThreadTask> pTask{nullptr};
if(m_workQueue.waitPop(pTask))
{
pTask->execute();
}
}
}
/**
* Invalidates the queue and joins all running threads.
*/
void destroy(void)
{
m_done = true;
m_workQueue.invalidate();
for(auto& thread : m_threads)
{
if(thread.joinable())
{
thread.join();
}
}
}
private:
std::atomic_bool m_done;
ThreadSafeQueue<std::unique_ptr<IThreadTask>> m_workQueue;
std::vector<std::thread> m_threads;
};
namespace DefaultThreadPool
{
/**
* Get the default thread pool for the application.
* This pool is created with std::thread::hardware_concurrency() - 1 threads.
*/
inline ThreadPool& getThreadPool(void)
{
static ThreadPool defaultPool;
return defaultPool;
}
inline auto queueSize()
{
return getThreadPool().queueSize();
}
/**
* Submit a job to the default thread pool.
*/
template <typename Func, typename... Args>
inline auto submitJob(Func&& func, Args&&... args)
{
return getThreadPool().submit(
std::forward<Func>(func),
std::forward<Args>(args)...);
}
}
}
#endif

@ -0,0 +1,145 @@
/**
* The ThreadSafeQueue class.
* Provides a wrapper around a basic queue to provide thread safety.
*/
#pragma once
#ifndef THREADSAFEQUEUE_HPP
#define THREADSAFEQUEUE_HPP
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <queue>
#include <utility>
namespace TP
{
template <typename T>
class ThreadSafeQueue
{
public:
/**
* Destructor.
*/
~ThreadSafeQueue(void)
{
invalidate();
}
/**
* Attempt to get the first value in the queue.
* Returns true if a value was successfully written to the out parameter, false
otherwise.
*/
bool tryPop(T& out)
{
std::lock_guard<std::mutex> lock{m_mutex};
if(m_queue.empty() || !m_valid)
{
return false;
}
out = std::move(m_queue.front());
m_queue.pop();
return true;
}
/**
* Get the first value in the queue.
* Will block until a value is available unless clear is called or the instance is
destructed.
* Returns true if a value was successfully written to the out parameter, false
otherwise.
*/
bool waitPop(T& out)
{
std::unique_lock<std::mutex> lock{m_mutex};
m_condition.wait(lock, [this]()
{
return !m_queue.empty() || !m_valid;
});
/*
* Using the condition in the predicate ensures that spurious wakeups with a valid
* but empty queue will not proceed, so only need to check for validity before
proceeding.
*/
if(!m_valid)
{
return false;
}
out = std::move(m_queue.front());
m_queue.pop();
return true;
}
/**
* Push a new value onto the queue.
*/
void push(T value)
{
std::lock_guard<std::mutex> lock{m_mutex};
m_queue.push(std::move(value));
m_condition.notify_one();
}
/**
* Check whether or not the queue is empty.
*/
bool empty(void) const
{
std::lock_guard<std::mutex> lock{m_mutex};
return m_queue.empty();
}
/**
* Clear all items from the queue.
*/
void clear(void)
{
std::lock_guard<std::mutex> lock{m_mutex};
while(!m_queue.empty())
{
m_queue.pop();
}
m_condition.notify_all();
}
/**
* Invalidate the queue.
* Used to ensure no conditions are being waited on in waitPop when
* a thread or the application is trying to exit.
* The queue is invalid after calling this method and it is an error
* to continue using a queue after this method has been called.
*/
void invalidate(void)
{
std::lock_guard<std::mutex> lock{m_mutex};
m_valid = false;
m_condition.notify_all();
}
auto size() const
{
std::lock_guard<std::mutex> lock{m_mutex};
return m_queue.size();
}
/**
* Returns whether or not this queue is valid.
*/
bool isValid(void) const
{
std::lock_guard<std::mutex> lock{m_mutex};
return m_valid;
}
private:
std::atomic_bool m_valid{true};
mutable std::mutex m_mutex;
std::queue<T> m_queue;
std::condition_variable m_condition;
};
}
#endif

@ -462,9 +462,12 @@ var cnUtil = (function(initConfig) {
this.decode_address = function(address) {
var dec = cnBase58.decode(address);
var expectedPrefix = this.encode_varint(CRYPTONOTE_PUBLIC_ADDRESS_BASE58_PREFIX);
var expectedPrefixInt = this.encode_varint(CRYPTONOTE_PUBLIC_INTEGRATED_ADDRESS_BASE58_PREFIX);
var expectedPrefixSub = this.encode_varint(CRYPTONOTE_PUBLIC_SUBADDRESS_BASE58_PREFIX);
var prefix = dec.slice(0, expectedPrefix.length);
if (prefix !== expectedPrefix && prefix !== expectedPrefixInt && prefix !== expectedPrefixSub) {
throw "Invalid address prefix";

@ -238,7 +238,7 @@ thinwalletCtrls.controller('AccountCtrl', function($scope, $rootScope, $http, $q
// decrypt payment_id8 which results in using
// integrated address
if (transactions[i].payment_id.length == 16) {
if (transactions[i].payment_id.length === 16) {
if (transactions[i].tx_pub_key) {
var decrypted_payment_id8
= decrypt_payment_id(transactions[i].payment_id,

@ -76,18 +76,25 @@ thinwalletServices
} catch (e) {
return deferred.reject("invalid address");
}
var expected_view_pub = cnUtil.sec_key_to_pub(view_key);
var expected_spend_pub;
if (spend_key.length === 64) {
expected_spend_pub = cnUtil.sec_key_to_pub(spend_key);
}
if (public_keys.view !== expected_view_pub) {
accountService.logout();
return deferred.reject("invalid view key");
}
if (!view_only && (public_keys.spend !== expected_spend_pub)) {
accountService.logout();
return deferred.reject("invalid spend key");
if (!cnUtil.is_subaddress(address))
{
var expected_view_pub = cnUtil.sec_key_to_pub(view_key);
var expected_spend_pub;
if (spend_key.length === 64) {
expected_spend_pub = cnUtil.sec_key_to_pub(spend_key);
}
if (public_keys.view !== expected_view_pub) {
accountService.logout();
return deferred.reject("invalid view key");
}
if (!view_only && (public_keys.spend !== expected_spend_pub)) {
accountService.logout();
return deferred.reject("invalid spend key");
}
}
public_address = address;
private_keys = {

@ -139,6 +139,22 @@ xmreg::MySqlConnector::username = config_json["database"]["user"];
xmreg::MySqlConnector::password = config_json["database"]["password"];
xmreg::MySqlConnector::dbname = config_json["database"]["dbname"];
// number of thread in blockchain access pool thread
auto threads_no = std::max<uint32_t>(
std::thread::hardware_concurrency()/2, 2u) - 1;
if (bc_setup.blockchain_treadpool_size > 0)
threads_no = bc_setup.blockchain_treadpool_size;
if (threads_no > 100)
{
threads_no = 100;
OMWARN << "Requested Thread Pool size "
<< threads_no << " is greater than 100!."
" Overwriting to 100!" ;
}
OMINFO << "Thread pool size: " << threads_no << " threads";
// once we have all the parameters for the blockchain and our backend
// we can create and instance of CurrentBlockchainStatus class.
@ -150,7 +166,8 @@ auto current_bc_status
= make_shared<xmreg::CurrentBlockchainStatus>(
bc_setup,
std::make_unique<xmreg::MicroCore>(),
std::make_unique<xmreg::RPCCalls>(bc_setup.deamon_url));
std::make_unique<xmreg::RPCCalls>(bc_setup.deamon_url),
std::make_unique<TP::ThreadPool>(threads_no));
// since CurrentBlockchainStatus class monitors current status
// of the blockchain (e.g., current height) .This is the only class

@ -170,6 +170,8 @@ BlockchainSetup::_init()
= config_json["wallet_import"]["fee"];
mysql_ping_every
= seconds {config_json["mysql_ping_every_seconds"]};
blockchain_treadpool_size
= config_json["blockchain_treadpool_size"];
get_blockchain_path();

@ -49,6 +49,8 @@ public:
uint64_t max_number_of_blocks_to_import {132000};
uint64_t blockchain_treadpool_size {0};
string import_payment_address_str;
string import_payment_viewkey_str;

@ -15,10 +15,12 @@ namespace xmreg
CurrentBlockchainStatus::CurrentBlockchainStatus(
BlockchainSetup _bc_setup,
std::unique_ptr<MicroCore> _mcore,
std::unique_ptr<RPCCalls> _rpc)
std::unique_ptr<RPCCalls> _rpc,
std::unique_ptr<TP::ThreadPool> _tp)
: bc_setup {_bc_setup},
mcore {std::move(_mcore)},
rpc {std::move(_rpc)}
rpc {std::move(_rpc)},
thread_pool {std::move(_tp)}
{
is_running = false;
stop_blockchain_monitor_loop = false;
@ -44,6 +46,9 @@ CurrentBlockchainStatus::monitor_blockchain()
break;
}
OMINFO << "PoolQueue size: "
<< TP::DefaultThreadPool::queueSize();
update_current_blockchain_height();
read_mempool();
@ -115,23 +120,38 @@ CurrentBlockchainStatus::is_tx_unlocked(
bool
CurrentBlockchainStatus::get_block(uint64_t height, block& blk)
{
return mcore->get_block_from_height(height, blk);
auto future_result = thread_pool->submit(
[this](auto height, auto& blk)
-> bool
{
return this->mcore
->get_block_from_height(height, blk);
}, height, std::ref(blk));
return future_result.get();
}
vector<block>
CurrentBlockchainStatus::get_blocks_range(
uint64_t const& h1, uint64_t const& h2)
{
try
{
return mcore->get_blocks_range(h1, h2);
}
catch (BLOCK_DNE& e)
{
cerr << e.what() << endl;
}
return {};
auto future_result = thread_pool->submit(
[this](auto h1, auto h2)
-> vector<block>
{
try
{
return this->mcore->get_blocks_range(h1, h2);
}
catch (BLOCK_DNE& e)
{
OMERROR << e.what();
return {};
}
}, h1, h2);
return future_result.get();
}
bool
@ -145,37 +165,67 @@ CurrentBlockchainStatus::get_block_txs(
// the block i.e. coinbase tx.
blk_txs.push_back(blk.miner_tx);
if (!mcore->get_transactions(blk.tx_hashes, blk_txs, missed_txs))
{
OMERROR << "Cant get transactions in block: "
<< get_block_hash(blk);
auto future_result = thread_pool->submit(
[this](auto const& blk,
auto& blk_txs, auto& missed_txs)
-> bool
{
if (!this->mcore->get_transactions(
blk.tx_hashes, blk_txs,
missed_txs))
{
OMERROR << "Cant get transactions in block: "
<< get_block_hash(blk);
return false;
}
return false;
}
return true;
return true;
}, std::cref(blk), std::ref(blk_txs),
std::ref(missed_txs));
return future_result.get();
}
bool
CurrentBlockchainStatus::get_txs(
vector<crypto::hash> const& txs_to_get,
vector<transaction>& txs,
vector<crypto::hash>& missed_txs)
{
if (!mcore->get_transactions(txs_to_get, txs, missed_txs))
{
OMERROR << "CurrentBlockchainStatus::get_txs: "
"cant get transactions!";
return false;
}
return true;
auto future_result = thread_pool->submit(
[this](auto const& txs_to_get,
auto& txs, auto& missed_txs)
-> bool
{
if (!this->mcore->get_transactions(
txs_to_get, txs, missed_txs))
{
OMERROR << "CurrentBlockchainStatus::get_txs: "
"cant get transactions!";
return false;
}
return true;
}, std::cref(txs_to_get), std::ref(txs),
std::ref(missed_txs));
return future_result.get();
}
bool
CurrentBlockchainStatus::tx_exist(const crypto::hash& tx_hash)
{
return mcore->have_tx(tx_hash);
auto future_result = thread_pool->submit(
[this](auto const& tx_hash) -> bool
{
return this->mcore->have_tx(tx_hash);
}, std::cref(tx_hash));
return future_result.get();
}
bool
@ -183,7 +233,14 @@ CurrentBlockchainStatus::tx_exist(
const crypto::hash& tx_hash,
uint64_t& tx_index)
{
return mcore->tx_exists(tx_hash, tx_index);
auto future_result = thread_pool->submit(
[this](auto const& tx_hash,
auto& tx_index) -> bool
{
return this->mcore->tx_exists(tx_hash, tx_index);
}, std::cref(tx_hash), std::ref(tx_index));
return future_result.get();
}
@ -201,6 +258,20 @@ CurrentBlockchainStatus::tx_exist(
return false;
}
tx_out_index
CurrentBlockchainStatus::get_output_tx_and_index(
uint64_t amount, uint64_t index) const
{
auto future_result = thread_pool->submit(
[this](auto amount, auto index)
-> tx_out_index
{
return this->mcore
->get_output_tx_and_index(amount, index);
}, amount, index);
return future_result.get();
}
bool
CurrentBlockchainStatus::get_tx_with_output(
@ -214,7 +285,7 @@ CurrentBlockchainStatus::get_tx_with_output(
{
// get pair pair<crypto::hash, uint64_t> where first is tx hash
// and second is local index of the output i in that tx
tx_out_idx = mcore->get_output_tx_and_index(amount, output_idx);
tx_out_idx = get_output_tx_and_index(amount, output_idx);
}
catch (const OUTPUT_DNE& e)
{
@ -231,7 +302,7 @@ CurrentBlockchainStatus::get_tx_with_output(
output_idx_in_tx = tx_out_idx.second;
if (!mcore->get_tx(tx_out_idx.first, tx))
if (!get_tx(tx_out_idx.first, tx))
{
OMERROR << "Cant get tx: " << tx_out_idx.first;
@ -247,17 +318,28 @@ CurrentBlockchainStatus::get_output_keys(
const vector<uint64_t>& absolute_offsets,
vector<cryptonote::output_data_t>& outputs)
{
try
{
mcore->get_output_key(amount, absolute_offsets, outputs);
return true;
}
catch (const OUTPUT_DNE& e)
{
OMERROR << "get_output_keys: " << e.what();
}
auto future_result = thread_pool->submit(
[this](auto const& amount,
auto const& absolute_offsets,
auto& outputs) -> bool
{
try
{
this->mcore->get_output_key(amount,
absolute_offsets, outputs);
return true;
}
catch (const OUTPUT_DNE& e)
{
OMERROR << "get_output_keys: " << e.what();
}
return false;
return false;
}, std::cref(amount), std::cref(absolute_offsets),
std::ref(outputs));
return future_result.get();
}
@ -287,24 +369,49 @@ CurrentBlockchainStatus::get_amount_specific_indices(
const crypto::hash& tx_hash,
vector<uint64_t>& out_indices)
{
try
{
// this index is lmdb index of a tx, not tx hash
uint64_t tx_index;
auto future_result = thread_pool->submit(
[this](auto const& tx_hash, auto& out_indices)
-> bool
{
try
{
// this index is lmdb index of a tx, not tx hash
uint64_t tx_index;
if (this->mcore->tx_exists(tx_hash, tx_index))
{
out_indices = this->mcore
->get_tx_amount_output_indices(tx_index);
return true;
}
}
catch(const exception& e)
{
OMERROR << e.what();
}
return false;
}, std::cref(tx_hash), std::ref(out_indices));
return future_result.get();
}
if (mcore->tx_exists(tx_hash, tx_index))
bool
CurrentBlockchainStatus::get_output_histogram(
COMMAND_RPC_GET_OUTPUT_HISTOGRAM::request& req,
COMMAND_RPC_GET_OUTPUT_HISTOGRAM::response& res) const
{
auto future_result = thread_pool->submit(
[this](auto const& req, auto& res)
-> bool
{
out_indices = mcore->get_tx_amount_output_indices(tx_index);
return this->mcore
->get_output_histogram(req, res);
return true;
}
}
catch(const exception& e)
{
cerr << e.what() << endl;
}
}, std::cref(req), std::ref(res));
return false;
return future_result.get();
}
unique_ptr<RandomOutputs>
@ -312,7 +419,8 @@ CurrentBlockchainStatus::create_random_outputs_object(
vector<uint64_t> const& amounts,
uint64_t outs_count) const
{
return make_unique<RandomOutputs>(*mcore, amounts, outs_count);
return make_unique<RandomOutputs>(
this, amounts, outs_count);
}
bool
@ -334,6 +442,26 @@ CurrentBlockchainStatus::get_random_outputs(
return true;
}
bool
CurrentBlockchainStatus::get_outs(
COMMAND_RPC_GET_OUTPUTS_BIN::request const& req,
COMMAND_RPC_GET_OUTPUTS_BIN::response& res) const
{
auto future_result = thread_pool->submit(
[this](auto const& req, auto& res)
-> bool
{
if (!this->mcore->get_outs(req, res))
{
OMERROR << "mcore->get_outs(req, res) failed";
return false;
}
return true;
}, std::cref(req), std::ref(res));
return future_result.get();
}
bool
CurrentBlockchainStatus::get_output(
@ -344,13 +472,11 @@ CurrentBlockchainStatus::get_output(
COMMAND_RPC_GET_OUTPUTS_BIN::request req;
COMMAND_RPC_GET_OUTPUTS_BIN::response res;
req.outputs.push_back(get_outputs_out {amount, global_output_index});
req.outputs.push_back(
get_outputs_out {amount, global_output_index});
if (!mcore->get_outs(req, res))
{
OMERROR << "mcore->get_outs(req, res) failed";
if (!get_outs(req, res))
return false;
}
output_info = res.outs.at(0);
@ -362,8 +488,7 @@ CurrentBlockchainStatus::get_dynamic_per_kb_fee_estimate() const
{
const double byte_to_kbyte_factor = 1024;
uint64_t fee_per_byte = mcore->get_dynamic_base_fee_estimate(
FEE_ESTIMATE_GRACE_BLOCKS);
uint64_t fee_per_byte = get_dynamic_base_fee_estimate();
uint64_t fee_per_kB = static_cast<uint64_t>(
fee_per_byte * byte_to_kbyte_factor);
@ -374,15 +499,29 @@ CurrentBlockchainStatus::get_dynamic_per_kb_fee_estimate() const
uint64_t
CurrentBlockchainStatus::get_dynamic_base_fee_estimate() const
{
return mcore->get_dynamic_base_fee_estimate(
FEE_ESTIMATE_GRACE_BLOCKS);
auto future_result = thread_pool->submit(
[this]() -> uint64_t
{
return this->mcore
->get_dynamic_base_fee_estimate(
FEE_ESTIMATE_GRACE_BLOCKS);
});
return future_result.get();
}
uint64_t
CurrentBlockchainStatus::get_tx_unlock_time(
crypto::hash const& tx_hash) const
{
return mcore->get_tx_unlock_time(tx_hash);
auto future_result = thread_pool->submit(
[this](auto const& tx_hash) -> uint64_t
{
return this->mcore->get_tx_unlock_time(tx_hash);
}, std::cref(tx_hash));
return future_result.get();
}
bool
@ -407,12 +546,24 @@ CurrentBlockchainStatus::read_mempool()
// get txs in the mempool
std::vector<tx_info> mempool_tx_info;
vector<spent_key_image_info> key_image_infos;
auto future_result = thread_pool->submit(
[this](auto& mempool_tx_info, auto& key_image_infos)
-> bool
{
if (!this->mcore->get_mempool_txs(
mempool_tx_info, key_image_infos))
{
OMERROR << "Getting mempool failed ";
return false;
}
return true;
if (!mcore->get_mempool_txs(mempool_tx_info, key_image_infos))
{
OMERROR << "Getting mempool failed ";
}, std::ref(mempool_tx_info),
std::ref(key_image_infos));
if (!future_result.get())
return false;
}
// not using this info at present
(void) key_image_infos;
@ -584,7 +735,16 @@ CurrentBlockchainStatus::get_output_key(
uint64_t amount,
uint64_t global_amount_index)
{
return mcore->get_output_key(amount, global_amount_index);
auto future_result = thread_pool->submit(
[this](auto amount, auto global_amount_index)
-> output_data_t
{
return this->mcore->get_output_key(
amount, global_amount_index);
}, amount, global_amount_index);
return future_result.get();
}
bool
@ -803,7 +963,14 @@ CurrentBlockchainStatus::get_tx(
crypto::hash const& tx_hash,
transaction& tx)
{
return mcore->get_tx(tx_hash, tx);
auto future_result = thread_pool->submit(
[this](auto const& tx_hash,
auto& tx) -> bool
{
return this->mcore->get_tx(tx_hash, tx);
}, std::cref(tx_hash), std::ref(tx));
return future_result.get();
}
@ -819,7 +986,7 @@ CurrentBlockchainStatus::get_tx(
return false;
}
return mcore->get_tx(tx_hash, tx);
return get_tx(tx_hash, tx);
}
bool
@ -830,7 +997,14 @@ CurrentBlockchainStatus::get_tx_block_height(
if (!tx_exist(tx_hash))
return false;
tx_height = mcore->get_tx_block_height(tx_hash);
auto future_result = thread_pool->submit(
[this](auto const tx_hash)
-> int64_t
{
return this->mcore->get_tx_block_height(tx_hash);
}, std::cref(tx_hash));
tx_height = future_result.get();
return true;
}

@ -14,6 +14,8 @@
#include "db/MySqlAccounts.h"
#include "RandomOutputs.h"
#include "../ext/ThreadPool.hpp"
#include <iostream>
#include <memory>
#include <thread>
@ -60,7 +62,8 @@ public:
CurrentBlockchainStatus(BlockchainSetup _bc_setup,
std::unique_ptr<MicroCore> _mcore,
std::unique_ptr<RPCCalls> _rpc);
std::unique_ptr<RPCCalls> _rpc,
std::unique_ptr<TP::ThreadPool> _tp);
virtual void
monitor_blockchain();
@ -113,6 +116,10 @@ public:
get_tx_with_output(uint64_t output_idx, uint64_t amount,
transaction& tx, uint64_t& output_idx_in_tx);
virtual tx_out_index
get_output_tx_and_index(uint64_t amount,
uint64_t index) const;
virtual bool
get_output_keys(const uint64_t& amount,
const vector<uint64_t>& absolute_offsets,
@ -141,6 +148,15 @@ public:
RandomOutputs::outs_for_amount_v&
found_outputs);
virtual bool
get_output_histogram(
COMMAND_RPC_GET_OUTPUT_HISTOGRAM::request& req,
COMMAND_RPC_GET_OUTPUT_HISTOGRAM::response& res) const;
virtual bool
get_outs(COMMAND_RPC_GET_OUTPUTS_BIN::request const& req,
COMMAND_RPC_GET_OUTPUTS_BIN::response& res) const;
virtual uint64_t
get_dynamic_per_kb_fee_estimate() const;
@ -211,7 +227,8 @@ public:
get_tx(string const& tx_hash_str, transaction& tx);
virtual bool
get_tx_block_height(crypto::hash const& tx_hash, int64_t& tx_height);
get_tx_block_height(crypto::hash const& tx_hash,
int64_t& tx_height);
virtual bool
set_new_searched_blk_no(const string& address,
@ -223,7 +240,8 @@ public:
virtual bool
get_known_outputs_keys(string const& address,
unordered_map<public_key, uint64_t>& known_outputs_keys);
unordered_map<public_key,
uint64_t>& known_outputs_keys);
virtual void
clean_search_thread_map();
@ -297,6 +315,13 @@ protected:
// this class is also the only class which can
// use talk to monero deamon using RPC.
std::unique_ptr<RPCCalls> rpc;
// any operation required to use blockchain
// i.e., access through mcore, will be performed
// by threads in this thread_pool. we have to
// have fixed and limited number of threads so that
// the lmdb does not throw MDB_READERS_FULL
std::unique_ptr<TP::ThreadPool> thread_pool;
// vector of mempool transactions that all threads
// can refer to
@ -318,6 +343,7 @@ protected:
// to synchronize access to mempool_txs vector
mutex getting_mempool_txs;
// have this method will make it easier to moc
// RandomOutputs in our tests later
virtual unique_ptr<RandomOutputs>

@ -135,6 +135,7 @@ MicroCore::get_block_complete_entry(block const& b, block_complete_entry& bce)
bool
MicroCore::get_tx(crypto::hash const& tx_hash, transaction& tx) const
{
if (core_storage.have_tx(tx_hash))
{
// get transaction with given hash
@ -238,4 +239,5 @@ MicroCore::init_success() const
//}
}

@ -37,7 +37,7 @@ class MicroCore {
network_type nettype;
bool initialization_succeded {false};
public:
// <amoumt,
@ -98,6 +98,27 @@ public:
{
return core_storage.get_db().get_blocks_range(h1, h2);
}
//virtual std::vector<block>
//get_blocks_range(const uint64_t& h1, const uint64_t& h2) const
//{
//std::vector<std::pair<cryptonote::blobdata,block>> blobblocks;
//std::vector<blobdata> txs;
//vector<block> blocks;
//{
//std::lock_guard<std::mutex> lock(mtx1);
//cout << "tid: " << std::this_thread::get_id() << endl;;
//core_storage.get_blocks(h1, h2-h1+1, blobblocks, txs);
//}
//blocks.reserve(blobblocks.size());
//for (auto const& bpair: blobblocks)
//blocks.push_back(std::move(bpair.second));
//return blocks;
//}
virtual uint64_t
get_tx_unlock_time(crypto::hash const& tx_hash) const
@ -159,7 +180,6 @@ public:
{
// tx_hash , index in tx
// tx_out_index is std::pair<crypto::hash, uint64_t>;
core_storage.get_db().get_output_tx_and_index(
amount, offsets, indices);
}
@ -218,8 +238,8 @@ public:
virtual ~MicroCore() = default;
};
}
}
#endif //XMREG01_MICROCORE_H

@ -1285,7 +1285,7 @@ OpenMoneroRequests::import_recent_wallet_request(
= boost::lexical_cast<uint64_t>(
j_request["no_blocks_to_import"].get<string>());
}
catch (boost::bad_lexical_cast& e)
catch (std::exception const& e)
{
string msg = "Cant parse "
+ j_request["no_blocks_to_import"].get<string>()
@ -1872,7 +1872,7 @@ OpenMoneroRequests::login_and_start_search_thread(
acc.viewkey = view_key;
// so we have an account now. Either existing or
// newly created. Thus, we can start a tread
// newly created. Thus, we can start a thread
// which will scan for transactions belonging to
// that account, using its address and view key.
// the thread will scan the blockchain for txs belonging
@ -1884,7 +1884,7 @@ OpenMoneroRequests::login_and_start_search_thread(
// any belonging transactions in a loop.
// Thus the thread does not need
// to do anything except looking for tx and updating mysql
// with relative tx information
// with relevant tx information
if (!current_bc_status->search_thread_exist(acc.address))
{
@ -1951,7 +1951,6 @@ OpenMoneroRequests::parse_request(
json& j_request,
json& j_response)
{
try
{
j_request = body_to_json(body);
@ -1961,7 +1960,6 @@ OpenMoneroRequests::parse_request(
for (const auto& v: values_map)
{
if (j_request.count(v) == 0)
{
cerr << v + " value not provided" << endl;
@ -1971,7 +1969,6 @@ OpenMoneroRequests::parse_request(
return false;
}
}
return true;

@ -34,12 +34,18 @@ OutputInputIdentification::OutputInputIdentification(
if (!generate_key_derivation(tx_pub_key, *viewkey, derivation))
{
OMERROR << "Cant get derived key for: " << "\n"
<< "pub_tx_key: " << get_tx_pub_key_str() << " and "
<< "prv_view_key" << viewkey;;
throw OutputInputIdentificationException(
"Cant get derived key for a tx");
OMWARN << "Cant get derived key for tx: "
<< pod_to_hex(tx_hash)
<< ", pub_tx_key: "
<< pod_to_hex(tx_pub_key);
static_assert(sizeof(derivation) == sizeof(rct::key),
"Mismatched sizes of key_derivation and rct::key");
// use identity derivation instead
// solution based on that found in wallet2.cpp in monero
// this will cause the tx output to be effectively skipped
memcpy(&derivation, rct::identity().bytes, sizeof(derivation));
}
}

@ -1,13 +1,13 @@
#include "RandomOutputs.h"
#include "CurrentBlockchainStatus.h"
namespace xmreg
{
RandomOutputs::RandomOutputs(
MicroCore const& _mcore,
CurrentBlockchainStatus const* _cbs,
vector<uint64_t> const& _amounts,
uint64_t _outs_count)
: mcore {_mcore},
: cbs {_cbs},
amounts {_amounts},
outs_count {_outs_count}
{
@ -39,7 +39,7 @@ RandomOutputs::get_output_pub_key(
req.outputs.push_back(get_outputs_out {amount, global_output_index});
if (!mcore.get_outs(req, res))
if (!cbs->get_outs(req, res))
{
OMERROR << "mcore->get_outs(req, res) failed";
return false;
@ -65,7 +65,7 @@ RandomOutputs::find_random_outputs()
req.min_count = outs_count;
req.max_count = 0;
if (!mcore.get_output_histogram(req, res))
if (!cbs->get_output_histogram(req, res))
{
OMERROR << "mcore->get_output_histogram(req, res)";
return false;
@ -80,15 +80,16 @@ RandomOutputs::find_random_outputs()
// find histogram_entry for amount that we look
// random outputs for
auto const hist_entry_it = std::find_if(
begin(res.histogram), end(res.histogram),
[&amount](histogram_entry const& he)
{
return amount == he.amount;
});
std::begin(res.histogram), std::end(res.histogram),
[&amount](auto const& he)
{
return amount == he.amount;
});
if (hist_entry_it == res.histogram.end())
{
OMERROR << "Could not find amount: it == res.histogram.end()\n";
OMERROR << "Could not find amount: it "
"== res.histogram.end()\n";
return false;
}

@ -8,6 +8,8 @@
namespace xmreg
{
class CurrentBlockchainStatus;
/**
* @brief Returns random ouputs for given amounts
*
@ -46,7 +48,7 @@ public:
using outs_for_amount_v = vector<outs_for_amount>;
RandomOutputs(MicroCore const& _mcore,
RandomOutputs(CurrentBlockchainStatus const* _cbs,
vector<uint64_t> const& _amounts,
uint64_t _outs_count);
@ -59,7 +61,8 @@ public:
virtual ~RandomOutputs() = default;
protected:
MicroCore const& mcore;
//MicroCore const& mcore;
CurrentBlockchainStatus const* cbs;
vector<uint64_t> amounts;
uint64_t outs_count;

@ -86,7 +86,9 @@ TxSearch::operator()()
uint64_t h1 = searched_blk_no;
uint64_t h2 = std::min(h1 + blocks_lookahead - 1, last_block_height);
vector<block> blocks = current_bc_status->get_blocks_range(h1, h2);
vector<block> blocks;
blocks = current_bc_status->get_blocks_range(h1, h2);
if (blocks.empty())
{
@ -594,8 +596,13 @@ TxSearch::operator()()
}
//current_timestamp = loop_timestamp;
searched_blk_no = h2 + 1;
// update this only when this variable is false
// otherwise a new search block value can
// be overwritten to h2, instead of the new value
if (!searched_block_got_updated)
searched_blk_no = h2 + 1;
searched_block_got_updated = false;
} // while(continue_search)
@ -632,6 +639,7 @@ void
TxSearch::set_searched_blk_no(uint64_t new_value)
{
searched_blk_no = new_value;
searched_block_got_updated = true;
}
uint64_t

@ -46,11 +46,16 @@ private:
static seconds thread_search_life;
// indicate that a thread loop should keep running
bool continue_search {true};
std::atomic_bool continue_search {true};
// this acctually indicates whether thread loop finished
// its execution
bool searching_is_ongoing {false};
std::atomic_bool searching_is_ongoing {false};
// marked true when we set new searched block value
// from other thread. for example, when we import account
// we set it to 0
std::atomic_bool searched_block_got_updated {false};
// to store last exception thrown in the search thread
// using this, a main thread can get info what went wrong here

@ -41,8 +41,12 @@ protected:
rpc = std::make_unique<MockRPCCalls>("dummy deamon url");
rpc_ptr = rpc.get();
tp = std::make_unique<TP::ThreadPool>();
tp_ptr = tp.get();
bcs = std::make_unique<xmreg::CurrentBlockchainStatus>(
bc_setup, std::move(mcore), std::move(rpc));
bc_setup, std::move(mcore), std::move(rpc),
std::move(tp));
}
network_type net_type {network_type::STAGENET};
@ -50,10 +54,12 @@ protected:
xmreg::BlockchainSetup bc_setup;
std::unique_ptr<MockMicroCore> mcore;
std::unique_ptr<MockRPCCalls> rpc;
std::unique_ptr<TP::ThreadPool> tp;
std::unique_ptr<xmreg::CurrentBlockchainStatus> bcs;
MockMicroCore* mcore_ptr;
MockRPCCalls* rpc_ptr;
TP::ThreadPool* tp_ptr;
static nlohmann::json config_json;
};
@ -64,7 +70,8 @@ nlohmann::json BCSTATUS_TEST::config_json;
TEST_P(BCSTATUS_TEST, DefaultConstruction)
{
xmreg::CurrentBlockchainStatus bcs {bc_setup, nullptr, nullptr};
xmreg::CurrentBlockchainStatus bcs {
bc_setup, nullptr, nullptr, nullptr};
EXPECT_TRUE(true);
}

@ -180,7 +180,7 @@ class MockCurrentBlockchainStatus : public xmreg::CurrentBlockchainStatus
public:
MockCurrentBlockchainStatus()
: xmreg::CurrentBlockchainStatus(xmreg::BlockchainSetup(),
nullptr, nullptr)
nullptr, nullptr, nullptr)
{}
MOCK_METHOD3(get_output_keys,

@ -1199,7 +1199,7 @@ public:
MockCurrentBlockchainStatus1()
: xmreg::CurrentBlockchainStatus(
xmreg::BlockchainSetup(),
nullptr, nullptr)
nullptr, nullptr, nullptr)
{}
bool tx_unlock_state {true};

@ -490,8 +490,12 @@ protected:
rpc = std::make_unique<MockRPCCalls>("dummy deamon url");
rpc_ptr = rpc.get();
tp = std::make_unique<TP::ThreadPool>();
tp_ptr = tp.get();
bcs = std::make_unique<xmreg::CurrentBlockchainStatus>(
bc_setup, std::move(mcore), std::move(rpc));
bc_setup, std::move(mcore), std::move(rpc),
std::move(tp));
}
network_type net_type {network_type::STAGENET};
@ -500,9 +504,11 @@ protected:
std::unique_ptr<MockMicroCore> mcore;
std::unique_ptr<MockRPCCalls> rpc;
std::unique_ptr<xmreg::CurrentBlockchainStatus> bcs;
std::unique_ptr<TP::ThreadPool> tp;
MockMicroCore* mcore_ptr;
MockRPCCalls* rpc_ptr;
TP::ThreadPool* tp_ptr;
static json config_json;
};

Loading…
Cancel
Save