Thread pool adding started

sync_microcore
moneroexamples 5 years ago
parent a7a80e118c
commit 4f6b104ad0

@ -0,0 +1,254 @@
/**
* The ThreadPool class.
* Keeps a set of threads constantly waiting to execute incoming jobs.
* source: http://roar11.com/2016/01/a-platform-independent-thread-pool-using-c14/
*/
#pragma once
#ifndef THREADPOOL_HPP
#define THREADPOOL_HPP
#include "ThreadSafeQueue.hpp"
#include <algorithm>
#include <atomic>
#include <cstdint>
#include <functional>
#include <future>
#include <memory>
#include <thread>
#include <type_traits>
#include <utility>
#include <vector>
namespace TP
{
class ThreadPool
{
private:
class IThreadTask
{
public:
IThreadTask(void) = default;
virtual ~IThreadTask(void) = default;
IThreadTask(const IThreadTask& rhs) = delete;
IThreadTask& operator=(const IThreadTask& rhs) = delete;
IThreadTask(IThreadTask&& other) = default;
IThreadTask& operator=(IThreadTask&& other) = default;
/**
* Run the task.
*/
virtual void execute() = 0;
};
template <typename Func>
class ThreadTask: public IThreadTask
{
public:
ThreadTask(Func&& func)
:m_func{std::move(func)}
{
}
~ThreadTask(void) override = default;
ThreadTask(const ThreadTask& rhs) = delete;
ThreadTask& operator=(const ThreadTask& rhs) = delete;
ThreadTask(ThreadTask&& other) = default;
ThreadTask& operator=(ThreadTask&& other) = default;
/**
* Run the task.
*/
void execute() override
{
m_func();
}
private:
Func m_func;
};
public:
/**
* A wrapper around a std::future that adds the behavior of futures returned from
std::async.
* Specifically, this object will block and wait for execution to finish before going out
of scope.
*/
template <typename T>
class TaskFuture
{
public:
TaskFuture(std::future<T>&& future)
:m_future{std::move(future)}
{
}
TaskFuture(const TaskFuture& rhs) = delete;
TaskFuture& operator=(const TaskFuture& rhs) = delete;
TaskFuture(TaskFuture&& other) = default;
TaskFuture& operator=(TaskFuture&& other) = default;
~TaskFuture(void)
{
if(m_future.valid())
{
m_future.get();
}
}
auto get(void)
{
return m_future.get();
}
private:
std::future<T> m_future;
};
public:
/**
* Constructor.
*/
ThreadPool(void)
:ThreadPool{std::max(std::thread::hardware_concurrency()/2, 2u) - 1u}
{
/*
* Always create at least one thread. If hardware_concurrency() returns 0,
* subtracting one would turn it to UINT_MAX, so get the maximum of
* hardware_concurrency() and 2 before subtracting 1.
*/
}
/**
* Constructor.
*/
explicit ThreadPool(const std::uint32_t numThreads)
:m_done{false},
m_workQueue{},
m_threads{}
{
try
{
for(std::uint32_t i = 0u; i < numThreads; ++i)
{
m_threads.emplace_back(&ThreadPool::worker, this);
}
}
catch(...)
{
destroy();
throw;
}
}
/**
* Non-copyable.
*/
ThreadPool(const ThreadPool& rhs) = delete;
/**
* Non-assignable.
*/
ThreadPool& operator=(const ThreadPool& rhs) = delete;
/**
* Destructor.
*/
~ThreadPool(void)
{
destroy();
}
auto queueSize() const
{
return m_workQueue.size();
}
/**
* Submit a job to be run by the thread pool.
*/
template <typename Func, typename... Args>
auto submit(Func&& func, Args&&... args)
{
auto boundTask = std::bind(std::forward<Func>(func), std::forward<Args>(args)...);
using ResultType = std::result_of_t<decltype(boundTask)()>;
using PackagedTask = std::packaged_task<ResultType()>;
using TaskType = ThreadTask<PackagedTask>;
PackagedTask task{std::move(boundTask)};
TaskFuture<ResultType> result{task.get_future()};
m_workQueue.push(std::make_unique<TaskType>(std::move(task)));
return result;
}
private:
/**
* Constantly running function each thread uses to acquire work items from the queue.
*/
void worker(void)
{
while(!m_done)
{
std::unique_ptr<IThreadTask> pTask{nullptr};
if(m_workQueue.waitPop(pTask))
{
pTask->execute();
}
}
}
/**
* Invalidates the queue and joins all running threads.
*/
void destroy(void)
{
m_done = true;
m_workQueue.invalidate();
for(auto& thread : m_threads)
{
if(thread.joinable())
{
thread.join();
}
}
}
private:
std::atomic_bool m_done;
ThreadSafeQueue<std::unique_ptr<IThreadTask>> m_workQueue;
std::vector<std::thread> m_threads;
};
namespace DefaultThreadPool
{
/**
* Get the default thread pool for the application.
* This pool is created with std::thread::hardware_concurrency() - 1 threads.
*/
inline ThreadPool& getThreadPool(void)
{
static ThreadPool defaultPool;
return defaultPool;
}
inline auto queueSize()
{
return getThreadPool().queueSize();
}
/**
* Submit a job to the default thread pool.
*/
template <typename Func, typename... Args>
inline auto submitJob(Func&& func, Args&&... args)
{
return getThreadPool().submit(
std::forward<Func>(func),
std::forward<Args>(args)...);
}
}
}
#endif

@ -0,0 +1,145 @@
/**
* The ThreadSafeQueue class.
* Provides a wrapper around a basic queue to provide thread safety.
*/
#pragma once
#ifndef THREADSAFEQUEUE_HPP
#define THREADSAFEQUEUE_HPP
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <queue>
#include <utility>
namespace TP
{
template <typename T>
class ThreadSafeQueue
{
public:
/**
* Destructor.
*/
~ThreadSafeQueue(void)
{
invalidate();
}
/**
* Attempt to get the first value in the queue.
* Returns true if a value was successfully written to the out parameter, false
otherwise.
*/
bool tryPop(T& out)
{
std::lock_guard<std::mutex> lock{m_mutex};
if(m_queue.empty() || !m_valid)
{
return false;
}
out = std::move(m_queue.front());
m_queue.pop();
return true;
}
/**
* Get the first value in the queue.
* Will block until a value is available unless clear is called or the instance is
destructed.
* Returns true if a value was successfully written to the out parameter, false
otherwise.
*/
bool waitPop(T& out)
{
std::unique_lock<std::mutex> lock{m_mutex};
m_condition.wait(lock, [this]()
{
return !m_queue.empty() || !m_valid;
});
/*
* Using the condition in the predicate ensures that spurious wakeups with a valid
* but empty queue will not proceed, so only need to check for validity before
proceeding.
*/
if(!m_valid)
{
return false;
}
out = std::move(m_queue.front());
m_queue.pop();
return true;
}
/**
* Push a new value onto the queue.
*/
void push(T value)
{
std::lock_guard<std::mutex> lock{m_mutex};
m_queue.push(std::move(value));
m_condition.notify_one();
}
/**
* Check whether or not the queue is empty.
*/
bool empty(void) const
{
std::lock_guard<std::mutex> lock{m_mutex};
return m_queue.empty();
}
/**
* Clear all items from the queue.
*/
void clear(void)
{
std::lock_guard<std::mutex> lock{m_mutex};
while(!m_queue.empty())
{
m_queue.pop();
}
m_condition.notify_all();
}
/**
* Invalidate the queue.
* Used to ensure no conditions are being waited on in waitPop when
* a thread or the application is trying to exit.
* The queue is invalid after calling this method and it is an error
* to continue using a queue after this method has been called.
*/
void invalidate(void)
{
std::lock_guard<std::mutex> lock{m_mutex};
m_valid = false;
m_condition.notify_all();
}
auto size() const
{
std::lock_guard<std::mutex> lock{m_mutex};
return m_queue.size();
}
/**
* Returns whether or not this queue is valid.
*/
bool isValid(void) const
{
std::lock_guard<std::mutex> lock{m_mutex};
return m_valid;
}
private:
std::atomic_bool m_valid{true};
mutable std::mutex m_mutex;
std::queue<T> m_queue;
std::condition_variable m_condition;
};
}
#endif

@ -139,6 +139,12 @@ xmreg::MySqlConnector::username = config_json["database"]["user"];
xmreg::MySqlConnector::password = config_json["database"]["password"];
xmreg::MySqlConnector::dbname = config_json["database"]["dbname"];
// number of thread in blockchain access pool thread
auto threads_no = std::max<uint32_t>(
std::thread::hardware_concurrency()/2, 2u) - 1;
OMINFO << "Thread pool size: " << threads_no << " threads";
// once we have all the parameters for the blockchain and our backend
// we can create and instance of CurrentBlockchainStatus class.
@ -150,7 +156,8 @@ auto current_bc_status
= make_shared<xmreg::CurrentBlockchainStatus>(
bc_setup,
std::make_unique<xmreg::MicroCore>(),
std::make_unique<xmreg::RPCCalls>(bc_setup.deamon_url));
std::make_unique<xmreg::RPCCalls>(bc_setup.deamon_url),
std::make_unique<TP::ThreadPool>(threads_no));
// since CurrentBlockchainStatus class monitors current status
// of the blockchain (e.g., current height) .This is the only class

@ -15,10 +15,12 @@ namespace xmreg
CurrentBlockchainStatus::CurrentBlockchainStatus(
BlockchainSetup _bc_setup,
std::unique_ptr<MicroCore> _mcore,
std::unique_ptr<RPCCalls> _rpc)
std::unique_ptr<RPCCalls> _rpc,
std::unique_ptr<TP::ThreadPool> _tp)
: bc_setup {_bc_setup},
mcore {std::move(_mcore)},
rpc {std::move(_rpc)}
rpc {std::move(_rpc)},
thread_pool {std::move(_tp)}
{
is_running = false;
stop_blockchain_monitor_loop = false;
@ -44,6 +46,9 @@ CurrentBlockchainStatus::monitor_blockchain()
break;
}
OMINFO << "PoolQueue size: "
<< TP::DefaultThreadPool::queueSize();
update_current_blockchain_height();
read_mempool();
@ -115,23 +120,54 @@ CurrentBlockchainStatus::is_tx_unlocked(
bool
CurrentBlockchainStatus::get_block(uint64_t height, block& blk)
{
return mcore->get_block_from_height(height, blk);
auto future_result = thread_pool->submit(
[this](auto height, auto& blk)
-> bool
{
return this->mcore
->get_block_from_height(height, blk);
}, height, std::ref(blk));
return future_result.get();
}
//vector<block>
//CurrentBlockchainStatus::get_blocks_range(
//uint64_t const& h1, uint64_t const& h2)
//{
//try
//{
//return mcore->get_blocks_range(h1, h2);
//}
//catch (BLOCK_DNE& e)
//{
//cerr << e.what() << endl;
//}
//return {};
//}
vector<block>
CurrentBlockchainStatus::get_blocks_range(
uint64_t const& h1, uint64_t const& h2)
{
try
{
return mcore->get_blocks_range(h1, h2);
}
catch (BLOCK_DNE& e)
{
cerr << e.what() << endl;
}
return {};
auto future_result = thread_pool->submit(
[this](auto h1, auto h2)
-> vector<block>
{
try
{
return this->mcore->get_blocks_range(h1, h2);
}
catch (BLOCK_DNE& e)
{
OMERROR << e.what();
return {};
}
}, h1, h2);
return future_result.get();
}
bool
@ -145,37 +181,69 @@ CurrentBlockchainStatus::get_block_txs(
// the block i.e. coinbase tx.
blk_txs.push_back(blk.miner_tx);
if (!mcore->get_transactions(blk.tx_hashes, blk_txs, missed_txs))
{
OMERROR << "Cant get transactions in block: "
<< get_block_hash(blk);
auto future_result = thread_pool->submit(
[this](auto const& blk,
auto& blk_txs, auto& missed_txs)
-> bool
{
if (!this->mcore->get_transactions(
blk.tx_hashes, blk_txs,
missed_txs))
{
OMERROR << "Cant get transactions in block: "
<< get_block_hash(blk);
return false;
}
return false;
}
return true;
return true;
}, std::cref(blk), std::ref(blk_txs),
std::ref(missed_txs));
return future_result.get();
}
bool
CurrentBlockchainStatus::get_txs(
vector<crypto::hash> const& txs_to_get,
vector<transaction>& txs,
vector<crypto::hash>& missed_txs)
{
if (!mcore->get_transactions(txs_to_get, txs, missed_txs))
{
OMERROR << "CurrentBlockchainStatus::get_txs: "
"cant get transactions!";
return false;
}
return true;
auto future_result = thread_pool->submit(
[this](auto const& txs_to_get,
auto& txs, auto& missed_txs)
-> bool
{
if (!this->mcore->get_transactions(
txs_to_get, txs, missed_txs))
{
OMERROR << "CurrentBlockchainStatus::get_txs: "
"cant get transactions!";
return false;
}
return true;
}, std::cref(txs_to_get), std::ref(txs),
std::ref(missed_txs));
return future_result.get();
}
bool
CurrentBlockchainStatus::tx_exist(const crypto::hash& tx_hash)
{
return mcore->have_tx(tx_hash);
//return mcore->have_tx(tx_hash);
auto future_result = thread_pool->submit(
[this](auto const& tx_hash) -> bool
{
return mcore->have_tx(tx_hash);
}, std::cref(tx_hash));
return future_result.get();
}
bool
@ -183,7 +251,14 @@ CurrentBlockchainStatus::tx_exist(
const crypto::hash& tx_hash,
uint64_t& tx_index)
{
return mcore->tx_exists(tx_hash, tx_index);
auto future_result = thread_pool->submit(
[this](auto const& tx_hash,
auto& tx_index) -> bool
{
return mcore->tx_exists(tx_hash, tx_index);
}, std::cref(tx_hash), std::ref(tx_index));
return future_result.get();
}
@ -201,6 +276,20 @@ CurrentBlockchainStatus::tx_exist(
return false;
}
tx_out_index
CurrentBlockchainStatus::get_output_tx_and_index(
uint64_t amount, uint64_t index) const
{
auto future_result = thread_pool->submit(
[this](auto amount, auto index)
-> tx_out_idx
{
return this->mcore
->get_output_tx_and_index(amount, index);
}, amount, index);
return future_result.get();
}
bool
CurrentBlockchainStatus::get_tx_with_output(
@ -214,7 +303,7 @@ CurrentBlockchainStatus::get_tx_with_output(
{
// get pair pair<crypto::hash, uint64_t> where first is tx hash
// and second is local index of the output i in that tx
tx_out_idx = mcore->get_output_tx_and_index(amount, output_idx);
tx_out_idx = get_output_tx_and_index(amount, output_idx);
}
catch (const OUTPUT_DNE& e)
{
@ -231,7 +320,7 @@ CurrentBlockchainStatus::get_tx_with_output(
output_idx_in_tx = tx_out_idx.second;
if (!mcore->get_tx(tx_out_idx.first, tx))
if (!get_tx(tx_out_idx.first, tx))
{
OMERROR << "Cant get tx: " << tx_out_idx.first;
@ -247,17 +336,28 @@ CurrentBlockchainStatus::get_output_keys(
const vector<uint64_t>& absolute_offsets,
vector<cryptonote::output_data_t>& outputs)
{
try
{
mcore->get_output_key(amount, absolute_offsets, outputs);
return true;
}
catch (const OUTPUT_DNE& e)
{
OMERROR << "get_output_keys: " << e.what();
}
auto future_result = thread_pool->submit(
[this](auto const& amount,
auto const& absolute_offsets,
auto& outputs) -> bool
{
try
{
this->mcore->get_output_key(amount,
absolute_offsets, outputs);
return true;
}
catch (const OUTPUT_DNE& e)
{
OMERROR << "get_output_keys: " << e.what();
}
return false;
return false;
}, std::cref(amount), std::cref(absolute_offsets),
std::ref(outputs));
return future_result.get();
}
@ -287,24 +387,34 @@ CurrentBlockchainStatus::get_amount_specific_indices(
const crypto::hash& tx_hash,
vector<uint64_t>& out_indices)
{
try
{
// this index is lmdb index of a tx, not tx hash
uint64_t tx_index;
if (mcore->tx_exists(tx_hash, tx_index))
auto future_result = thread_pool->submit(
[this](auto const& tx_hash, auto& out_indices)
-> bool
{
out_indices = mcore->get_tx_amount_output_indices(tx_index);
try
{
// this index is lmdb index of a tx, not tx hash
uint64_t tx_index;
return true;
}
}
catch(const exception& e)
{
cerr << e.what() << endl;
}
if (mcore->tx_exists(tx_hash, tx_index))
{
out_indices = this->mcore
->get_tx_amount_output_indices(tx_index);
return false;
return true;
}
}
catch(const exception& e)
{
OMERROR << e.what();
}
return false;
}, std::cref(tx_hash), std::ref(out_indices));
future_result.get();
return true;
}
unique_ptr<RandomOutputs>
@ -344,7 +454,8 @@ CurrentBlockchainStatus::get_output(
COMMAND_RPC_GET_OUTPUTS_BIN::request req;
COMMAND_RPC_GET_OUTPUTS_BIN::response res;
req.outputs.push_back(get_outputs_out {amount, global_output_index});
req.outputs.push_back(
get_outputs_out {amount, global_output_index});
if (!mcore->get_outs(req, res))
{
@ -803,7 +914,14 @@ CurrentBlockchainStatus::get_tx(
crypto::hash const& tx_hash,
transaction& tx)
{
return mcore->get_tx(tx_hash, tx);
auto future_result = TP::DefaultThreadPool::submitJob(
[this](auto const& tx_hash,
auto& tx) -> bool
{
return this->mcore->get_tx(tx_hash, tx);
}, std::cref(tx_hash), std::ref(tx));
return future_result.get();
}

@ -14,6 +14,8 @@
#include "db/MySqlAccounts.h"
#include "RandomOutputs.h"
#include "../ext/ThreadPool.hpp"
#include <iostream>
#include <memory>
#include <thread>
@ -60,7 +62,8 @@ public:
CurrentBlockchainStatus(BlockchainSetup _bc_setup,
std::unique_ptr<MicroCore> _mcore,
std::unique_ptr<RPCCalls> _rpc);
std::unique_ptr<RPCCalls> _rpc,
std::unique_ptr<TP::ThreadPool> _tp);
virtual void
monitor_blockchain();
@ -113,6 +116,10 @@ public:
get_tx_with_output(uint64_t output_idx, uint64_t amount,
transaction& tx, uint64_t& output_idx_in_tx);
virtual tx_out_index
get_output_tx_and_index(uint64_t amount,
uint64_t index) const;
virtual bool
get_output_keys(const uint64_t& amount,
const vector<uint64_t>& absolute_offsets,
@ -211,7 +218,8 @@ public:
get_tx(string const& tx_hash_str, transaction& tx);
virtual bool
get_tx_block_height(crypto::hash const& tx_hash, int64_t& tx_height);
get_tx_block_height(crypto::hash const& tx_hash,
int64_t& tx_height);
virtual bool
set_new_searched_blk_no(const string& address,
@ -223,7 +231,8 @@ public:
virtual bool
get_known_outputs_keys(string const& address,
unordered_map<public_key, uint64_t>& known_outputs_keys);
unordered_map<public_key,
uint64_t>& known_outputs_keys);
virtual void
clean_search_thread_map();
@ -297,6 +306,13 @@ protected:
// this class is also the only class which can
// use talk to monero deamon using RPC.
std::unique_ptr<RPCCalls> rpc;
// any operation required to use blockchain
// i.e., access through mcore, will be performed
// by threads in this thread_pool. we have to
// have fixed and limited number of threads so that
// the lmdb does not throw MDB_READERS_FULL
std::unique_ptr<TP::ThreadPool> thread_pool;
// vector of mempool transactions that all threads
// can refer to
@ -318,6 +334,7 @@ protected:
// to synchronize access to mempool_txs vector
mutex getting_mempool_txs;
// have this method will make it easier to moc
// RandomOutputs in our tests later
virtual unique_ptr<RandomOutputs>

@ -135,6 +135,7 @@ MicroCore::get_block_complete_entry(block const& b, block_complete_entry& bce)
bool
MicroCore::get_tx(crypto::hash const& tx_hash, transaction& tx) const
{
if (core_storage.have_tx(tx_hash))
{
// get transaction with given hash
@ -238,4 +239,5 @@ MicroCore::init_success() const
//}
}

@ -37,7 +37,7 @@ class MicroCore {
network_type nettype;
bool initialization_succeded {false};
public:
// <amoumt,
@ -98,6 +98,27 @@ public:
{
return core_storage.get_db().get_blocks_range(h1, h2);
}
//virtual std::vector<block>
//get_blocks_range(const uint64_t& h1, const uint64_t& h2) const
//{
//std::vector<std::pair<cryptonote::blobdata,block>> blobblocks;
//std::vector<blobdata> txs;
//vector<block> blocks;
//{
//std::lock_guard<std::mutex> lock(mtx1);
//cout << "tid: " << std::this_thread::get_id() << endl;;
//core_storage.get_blocks(h1, h2-h1+1, blobblocks, txs);
//}
//blocks.reserve(blobblocks.size());
//for (auto const& bpair: blobblocks)
//blocks.push_back(std::move(bpair.second));
//return blocks;
//}
virtual uint64_t
get_tx_unlock_time(crypto::hash const& tx_hash) const
@ -159,7 +180,6 @@ public:
{
// tx_hash , index in tx
// tx_out_index is std::pair<crypto::hash, uint64_t>;
core_storage.get_db().get_output_tx_and_index(
amount, offsets, indices);
}
@ -218,8 +238,8 @@ public:
virtual ~MicroCore() = default;
};
}
}
#endif //XMREG01_MICROCORE_H

@ -86,7 +86,9 @@ TxSearch::operator()()
uint64_t h1 = searched_blk_no;
uint64_t h2 = std::min(h1 + blocks_lookahead - 1, last_block_height);
vector<block> blocks = current_bc_status->get_blocks_range(h1, h2);
vector<block> blocks;
blocks = current_bc_status->get_blocks_range(h1, h2);
if (blocks.empty())
{
@ -258,14 +260,14 @@ TxSearch::operator()()
}
if (!current_bc_status->tx_exist(tx_hash, blockchain_tx_id))
{
OMERROR << "Tx " << oi_identification.get_tx_hash_str()
<< " " << pod_to_hex(tx_hash)
<< " not found in blockchain !";
throw TxSearchException("Cant get tx from blockchain: "
+ pod_to_hex(tx_hash));
}
//if (!current_bc_status->tx_exist(tx_hash, blockchain_tx_id))
//{
//OMERROR << "Tx " << oi_identification.get_tx_hash_str()
//<< " " << pod_to_hex(tx_hash)
//<< " not found in blockchain !";
//throw TxSearchException("Cant get tx from blockchain: "
//+ pod_to_hex(tx_hash));
//}
OMINFO << address_prefix
+ ": found some outputs in block "
@ -594,8 +596,13 @@ TxSearch::operator()()
}
//current_timestamp = loop_timestamp;
searched_blk_no = h2 + 1;
// update this only when this variable is false
// otherwise a new search block value can
// be overwritten to h2, instead of the new value
if (!searched_block_got_updated)
searched_blk_no = h2 + 1;
searched_block_got_updated = false;
} // while(continue_search)
@ -632,6 +639,7 @@ void
TxSearch::set_searched_blk_no(uint64_t new_value)
{
searched_blk_no = new_value;
searched_block_got_updated = true;
}
uint64_t

@ -46,11 +46,16 @@ private:
static seconds thread_search_life;
// indicate that a thread loop should keep running
bool continue_search {true};
std::atomic_bool continue_search {true};
// this acctually indicates whether thread loop finished
// its execution
bool searching_is_ongoing {false};
std::atomic_bool searching_is_ongoing {false};
// marked true when we set new searched block value
// from other thread. for example, when we import account
// we set it to 0
std::atomic_bool searched_block_got_updated {false};
// to store last exception thrown in the search thread
// using this, a main thread can get info what went wrong here

Loading…
Cancel
Save