diff --git a/config/config.json b/config/config.json index 17e3261..a8a05f4 100755 --- a/config/config.json +++ b/config/config.json @@ -57,6 +57,8 @@ "search_thread_life_in_seconds" : 120, "max_number_of_blocks_to_import" : 132000, "mysql_ping_every_seconds" : 200, + "_comment": "if the threadpool_size (no of threads) below is 0, its size is automaticly set based on your cpu. If its not 0, the value specified is used instead", + "blockchain_treadpool_size" : 0, "ssl" : { "enable" : false, diff --git a/ext/ThreadPool.hpp b/ext/ThreadPool.hpp new file mode 100644 index 0000000..20a2f3b --- /dev/null +++ b/ext/ThreadPool.hpp @@ -0,0 +1,254 @@ +/** + * The ThreadPool class. + * Keeps a set of threads constantly waiting to execute incoming jobs. + * source: http://roar11.com/2016/01/a-platform-independent-thread-pool-using-c14/ + */ +#pragma once + +#ifndef THREADPOOL_HPP +#define THREADPOOL_HPP + +#include "ThreadSafeQueue.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace TP +{ + class ThreadPool + { + private: + class IThreadTask + { + public: + IThreadTask(void) = default; + virtual ~IThreadTask(void) = default; + IThreadTask(const IThreadTask& rhs) = delete; + IThreadTask& operator=(const IThreadTask& rhs) = delete; + IThreadTask(IThreadTask&& other) = default; + IThreadTask& operator=(IThreadTask&& other) = default; + + /** + * Run the task. + */ + virtual void execute() = 0; + }; + + template + class ThreadTask: public IThreadTask + { + public: + ThreadTask(Func&& func) + :m_func{std::move(func)} + { + } + + ~ThreadTask(void) override = default; + ThreadTask(const ThreadTask& rhs) = delete; + ThreadTask& operator=(const ThreadTask& rhs) = delete; + ThreadTask(ThreadTask&& other) = default; + ThreadTask& operator=(ThreadTask&& other) = default; + + /** + * Run the task. + */ + void execute() override + { + m_func(); + } + + private: + Func m_func; + }; + + public: + /** + * A wrapper around a std::future that adds the behavior of futures returned from +std::async. + * Specifically, this object will block and wait for execution to finish before going out +of scope. + */ + template + class TaskFuture + { + public: + TaskFuture(std::future&& future) + :m_future{std::move(future)} + { + } + + TaskFuture(const TaskFuture& rhs) = delete; + TaskFuture& operator=(const TaskFuture& rhs) = delete; + TaskFuture(TaskFuture&& other) = default; + TaskFuture& operator=(TaskFuture&& other) = default; + ~TaskFuture(void) + { + if(m_future.valid()) + { + m_future.get(); + } + } + + auto get(void) + { + return m_future.get(); + } + + + private: + std::future m_future; + }; + + public: + /** + * Constructor. + */ + ThreadPool(void) + :ThreadPool{std::max(std::thread::hardware_concurrency()/2, 2u) - 1u} + { + /* + * Always create at least one thread. If hardware_concurrency() returns 0, + * subtracting one would turn it to UINT_MAX, so get the maximum of + * hardware_concurrency() and 2 before subtracting 1. + */ + } + + /** + * Constructor. + */ + explicit ThreadPool(const std::uint32_t numThreads) + :m_done{false}, + m_workQueue{}, + m_threads{} + { + try + { + for(std::uint32_t i = 0u; i < numThreads; ++i) + { + m_threads.emplace_back(&ThreadPool::worker, this); + } + } + catch(...) + { + destroy(); + throw; + } + } + + /** + * Non-copyable. + */ + ThreadPool(const ThreadPool& rhs) = delete; + + /** + * Non-assignable. + */ + ThreadPool& operator=(const ThreadPool& rhs) = delete; + + /** + * Destructor. + */ + ~ThreadPool(void) + { + destroy(); + } + + auto queueSize() const + { + return m_workQueue.size(); + } + + /** + * Submit a job to be run by the thread pool. + */ + template + auto submit(Func&& func, Args&&... args) + { + auto boundTask = std::bind(std::forward(func), std::forward(args)...); + using ResultType = std::result_of_t; + using PackagedTask = std::packaged_task; + using TaskType = ThreadTask; + + PackagedTask task{std::move(boundTask)}; + TaskFuture result{task.get_future()}; + m_workQueue.push(std::make_unique(std::move(task))); + return result; + } + + private: + /** + * Constantly running function each thread uses to acquire work items from the queue. + */ + void worker(void) + { + while(!m_done) + { + std::unique_ptr pTask{nullptr}; + if(m_workQueue.waitPop(pTask)) + { + pTask->execute(); + } + } + } + + /** + * Invalidates the queue and joins all running threads. + */ + void destroy(void) + { + m_done = true; + m_workQueue.invalidate(); + for(auto& thread : m_threads) + { + if(thread.joinable()) + { + thread.join(); + } + } + } + + private: + std::atomic_bool m_done; + ThreadSafeQueue> m_workQueue; + std::vector m_threads; + }; + + namespace DefaultThreadPool + { + /** + * Get the default thread pool for the application. + * This pool is created with std::thread::hardware_concurrency() - 1 threads. + */ + inline ThreadPool& getThreadPool(void) + { + static ThreadPool defaultPool; + return defaultPool; + } + + inline auto queueSize() + { + return getThreadPool().queueSize(); + } + + /** + * Submit a job to the default thread pool. + */ + template + inline auto submitJob(Func&& func, Args&&... args) + { + return getThreadPool().submit( + std::forward(func), + std::forward(args)...); + } + } +} + +#endif diff --git a/ext/ThreadSafeQueue.hpp b/ext/ThreadSafeQueue.hpp new file mode 100644 index 0000000..c4d9376 --- /dev/null +++ b/ext/ThreadSafeQueue.hpp @@ -0,0 +1,145 @@ +/** + * The ThreadSafeQueue class. + * Provides a wrapper around a basic queue to provide thread safety. + */ +#pragma once + +#ifndef THREADSAFEQUEUE_HPP +#define THREADSAFEQUEUE_HPP + +#include +#include +#include +#include +#include + +namespace TP +{ + template + class ThreadSafeQueue + { + public: + /** + * Destructor. + */ + ~ThreadSafeQueue(void) + { + invalidate(); + } + + /** + * Attempt to get the first value in the queue. + * Returns true if a value was successfully written to the out parameter, false +otherwise. + */ + bool tryPop(T& out) + { + std::lock_guard lock{m_mutex}; + if(m_queue.empty() || !m_valid) + { + return false; + } + out = std::move(m_queue.front()); + m_queue.pop(); + return true; + } + + /** + * Get the first value in the queue. + * Will block until a value is available unless clear is called or the instance is +destructed. + * Returns true if a value was successfully written to the out parameter, false +otherwise. + */ + bool waitPop(T& out) + { + std::unique_lock lock{m_mutex}; + m_condition.wait(lock, [this]() + { + return !m_queue.empty() || !m_valid; + }); + /* + * Using the condition in the predicate ensures that spurious wakeups with a valid + * but empty queue will not proceed, so only need to check for validity before +proceeding. + */ + if(!m_valid) + { + return false; + } + out = std::move(m_queue.front()); + m_queue.pop(); + return true; + } + + /** + * Push a new value onto the queue. + */ + void push(T value) + { + std::lock_guard lock{m_mutex}; + m_queue.push(std::move(value)); + m_condition.notify_one(); + } + + /** + * Check whether or not the queue is empty. + */ + bool empty(void) const + { + std::lock_guard lock{m_mutex}; + return m_queue.empty(); + } + + /** + * Clear all items from the queue. + */ + void clear(void) + { + std::lock_guard lock{m_mutex}; + while(!m_queue.empty()) + { + m_queue.pop(); + } + m_condition.notify_all(); + } + + /** + * Invalidate the queue. + * Used to ensure no conditions are being waited on in waitPop when + * a thread or the application is trying to exit. + * The queue is invalid after calling this method and it is an error + * to continue using a queue after this method has been called. + */ + void invalidate(void) + { + std::lock_guard lock{m_mutex}; + m_valid = false; + m_condition.notify_all(); + + } + + auto size() const + { + std::lock_guard lock{m_mutex}; + return m_queue.size(); + } + + /** + * Returns whether or not this queue is valid. + */ + bool isValid(void) const + { + std::lock_guard lock{m_mutex}; + return m_valid; + } + + private: + std::atomic_bool m_valid{true}; + mutable std::mutex m_mutex; + std::queue m_queue; + std::condition_variable m_condition; + }; +} + +#endif diff --git a/html/js/cn_util.js b/html/js/cn_util.js index a9ed843..b164534 100644 --- a/html/js/cn_util.js +++ b/html/js/cn_util.js @@ -462,9 +462,12 @@ var cnUtil = (function(initConfig) { this.decode_address = function(address) { var dec = cnBase58.decode(address); + var expectedPrefix = this.encode_varint(CRYPTONOTE_PUBLIC_ADDRESS_BASE58_PREFIX); var expectedPrefixInt = this.encode_varint(CRYPTONOTE_PUBLIC_INTEGRATED_ADDRESS_BASE58_PREFIX); var expectedPrefixSub = this.encode_varint(CRYPTONOTE_PUBLIC_SUBADDRESS_BASE58_PREFIX); + + var prefix = dec.slice(0, expectedPrefix.length); if (prefix !== expectedPrefix && prefix !== expectedPrefixInt && prefix !== expectedPrefixSub) { throw "Invalid address prefix"; diff --git a/html/js/controllers/account.js b/html/js/controllers/account.js index b51e4a6..1941c95 100755 --- a/html/js/controllers/account.js +++ b/html/js/controllers/account.js @@ -238,7 +238,7 @@ thinwalletCtrls.controller('AccountCtrl', function($scope, $rootScope, $http, $q // decrypt payment_id8 which results in using // integrated address - if (transactions[i].payment_id.length == 16) { + if (transactions[i].payment_id.length === 16) { if (transactions[i].tx_pub_key) { var decrypted_payment_id8 = decrypt_payment_id(transactions[i].payment_id, diff --git a/html/js/services/account.js b/html/js/services/account.js index cf69d62..0056309 100755 --- a/html/js/services/account.js +++ b/html/js/services/account.js @@ -76,18 +76,25 @@ thinwalletServices } catch (e) { return deferred.reject("invalid address"); } - var expected_view_pub = cnUtil.sec_key_to_pub(view_key); - var expected_spend_pub; - if (spend_key.length === 64) { - expected_spend_pub = cnUtil.sec_key_to_pub(spend_key); - } - if (public_keys.view !== expected_view_pub) { - accountService.logout(); - return deferred.reject("invalid view key"); - } - if (!view_only && (public_keys.spend !== expected_spend_pub)) { - accountService.logout(); - return deferred.reject("invalid spend key"); + + if (!cnUtil.is_subaddress(address)) + { + var expected_view_pub = cnUtil.sec_key_to_pub(view_key); + var expected_spend_pub; + if (spend_key.length === 64) { + expected_spend_pub = cnUtil.sec_key_to_pub(spend_key); + } + + + + if (public_keys.view !== expected_view_pub) { + accountService.logout(); + return deferred.reject("invalid view key"); + } + if (!view_only && (public_keys.spend !== expected_spend_pub)) { + accountService.logout(); + return deferred.reject("invalid spend key"); + } } public_address = address; private_keys = { diff --git a/main.cpp b/main.cpp index 980409e..a1b94a7 100755 --- a/main.cpp +++ b/main.cpp @@ -139,6 +139,22 @@ xmreg::MySqlConnector::username = config_json["database"]["user"]; xmreg::MySqlConnector::password = config_json["database"]["password"]; xmreg::MySqlConnector::dbname = config_json["database"]["dbname"]; +// number of thread in blockchain access pool thread +auto threads_no = std::max( + std::thread::hardware_concurrency()/2, 2u) - 1; + +if (bc_setup.blockchain_treadpool_size > 0) + threads_no = bc_setup.blockchain_treadpool_size; + +if (threads_no > 100) +{ + threads_no = 100; + OMWARN << "Requested Thread Pool size " + << threads_no << " is greater than 100!." + " Overwriting to 100!" ; +} + +OMINFO << "Thread pool size: " << threads_no << " threads"; // once we have all the parameters for the blockchain and our backend // we can create and instance of CurrentBlockchainStatus class. @@ -150,7 +166,8 @@ auto current_bc_status = make_shared( bc_setup, std::make_unique(), - std::make_unique(bc_setup.deamon_url)); + std::make_unique(bc_setup.deamon_url), + std::make_unique(threads_no)); // since CurrentBlockchainStatus class monitors current status // of the blockchain (e.g., current height) .This is the only class diff --git a/src/BlockchainSetup.cpp b/src/BlockchainSetup.cpp index feddb18..6ed97d3 100755 --- a/src/BlockchainSetup.cpp +++ b/src/BlockchainSetup.cpp @@ -170,6 +170,8 @@ BlockchainSetup::_init() = config_json["wallet_import"]["fee"]; mysql_ping_every = seconds {config_json["mysql_ping_every_seconds"]}; + blockchain_treadpool_size + = config_json["blockchain_treadpool_size"]; get_blockchain_path(); diff --git a/src/BlockchainSetup.h b/src/BlockchainSetup.h index 0ea46c3..dc225d5 100755 --- a/src/BlockchainSetup.h +++ b/src/BlockchainSetup.h @@ -49,6 +49,8 @@ public: uint64_t max_number_of_blocks_to_import {132000}; + uint64_t blockchain_treadpool_size {0}; + string import_payment_address_str; string import_payment_viewkey_str; diff --git a/src/CurrentBlockchainStatus.cpp b/src/CurrentBlockchainStatus.cpp index efcb065..841d9d2 100755 --- a/src/CurrentBlockchainStatus.cpp +++ b/src/CurrentBlockchainStatus.cpp @@ -15,10 +15,12 @@ namespace xmreg CurrentBlockchainStatus::CurrentBlockchainStatus( BlockchainSetup _bc_setup, std::unique_ptr _mcore, - std::unique_ptr _rpc) + std::unique_ptr _rpc, + std::unique_ptr _tp) : bc_setup {_bc_setup}, mcore {std::move(_mcore)}, - rpc {std::move(_rpc)} + rpc {std::move(_rpc)}, + thread_pool {std::move(_tp)} { is_running = false; stop_blockchain_monitor_loop = false; @@ -44,6 +46,9 @@ CurrentBlockchainStatus::monitor_blockchain() break; } + OMINFO << "PoolQueue size: " + << TP::DefaultThreadPool::queueSize(); + update_current_blockchain_height(); read_mempool(); @@ -115,23 +120,38 @@ CurrentBlockchainStatus::is_tx_unlocked( bool CurrentBlockchainStatus::get_block(uint64_t height, block& blk) { - return mcore->get_block_from_height(height, blk); + + auto future_result = thread_pool->submit( + [this](auto height, auto& blk) + -> bool + { + return this->mcore + ->get_block_from_height(height, blk); + }, height, std::ref(blk)); + + return future_result.get(); } vector CurrentBlockchainStatus::get_blocks_range( uint64_t const& h1, uint64_t const& h2) { - try - { - return mcore->get_blocks_range(h1, h2); - } - catch (BLOCK_DNE& e) - { - cerr << e.what() << endl; - } - - return {}; + auto future_result = thread_pool->submit( + [this](auto h1, auto h2) + -> vector + { + try + { + return this->mcore->get_blocks_range(h1, h2); + } + catch (BLOCK_DNE& e) + { + OMERROR << e.what(); + return {}; + } + }, h1, h2); + + return future_result.get(); } bool @@ -145,37 +165,67 @@ CurrentBlockchainStatus::get_block_txs( // the block i.e. coinbase tx. blk_txs.push_back(blk.miner_tx); - if (!mcore->get_transactions(blk.tx_hashes, blk_txs, missed_txs)) - { - OMERROR << "Cant get transactions in block: " - << get_block_hash(blk); + auto future_result = thread_pool->submit( + [this](auto const& blk, + auto& blk_txs, auto& missed_txs) + -> bool + { + if (!this->mcore->get_transactions( + blk.tx_hashes, blk_txs, + missed_txs)) + { + OMERROR << "Cant get transactions in block: " + << get_block_hash(blk); - return false; - } + return false; + } - return true; + return true; + }, std::cref(blk), std::ref(blk_txs), + std::ref(missed_txs)); + + return future_result.get(); } + bool CurrentBlockchainStatus::get_txs( vector const& txs_to_get, vector& txs, vector& missed_txs) { - if (!mcore->get_transactions(txs_to_get, txs, missed_txs)) - { - OMERROR << "CurrentBlockchainStatus::get_txs: " - "cant get transactions!"; - return false; - } - return true; + auto future_result = thread_pool->submit( + [this](auto const& txs_to_get, + auto& txs, auto& missed_txs) + -> bool + { + if (!this->mcore->get_transactions( + txs_to_get, txs, missed_txs)) + { + OMERROR << "CurrentBlockchainStatus::get_txs: " + "cant get transactions!"; + return false; + } + + return true; + + }, std::cref(txs_to_get), std::ref(txs), + std::ref(missed_txs)); + + return future_result.get(); } bool CurrentBlockchainStatus::tx_exist(const crypto::hash& tx_hash) { - return mcore->have_tx(tx_hash); + auto future_result = thread_pool->submit( + [this](auto const& tx_hash) -> bool + { + return this->mcore->have_tx(tx_hash); + }, std::cref(tx_hash)); + + return future_result.get(); } bool @@ -183,7 +233,14 @@ CurrentBlockchainStatus::tx_exist( const crypto::hash& tx_hash, uint64_t& tx_index) { - return mcore->tx_exists(tx_hash, tx_index); + auto future_result = thread_pool->submit( + [this](auto const& tx_hash, + auto& tx_index) -> bool + { + return this->mcore->tx_exists(tx_hash, tx_index); + }, std::cref(tx_hash), std::ref(tx_index)); + + return future_result.get(); } @@ -201,6 +258,20 @@ CurrentBlockchainStatus::tx_exist( return false; } +tx_out_index +CurrentBlockchainStatus::get_output_tx_and_index( + uint64_t amount, uint64_t index) const +{ + auto future_result = thread_pool->submit( + [this](auto amount, auto index) + -> tx_out_index + { + return this->mcore + ->get_output_tx_and_index(amount, index); + }, amount, index); + + return future_result.get(); +} bool CurrentBlockchainStatus::get_tx_with_output( @@ -214,7 +285,7 @@ CurrentBlockchainStatus::get_tx_with_output( { // get pair pair where first is tx hash // and second is local index of the output i in that tx - tx_out_idx = mcore->get_output_tx_and_index(amount, output_idx); + tx_out_idx = get_output_tx_and_index(amount, output_idx); } catch (const OUTPUT_DNE& e) { @@ -231,7 +302,7 @@ CurrentBlockchainStatus::get_tx_with_output( output_idx_in_tx = tx_out_idx.second; - if (!mcore->get_tx(tx_out_idx.first, tx)) + if (!get_tx(tx_out_idx.first, tx)) { OMERROR << "Cant get tx: " << tx_out_idx.first; @@ -247,17 +318,28 @@ CurrentBlockchainStatus::get_output_keys( const vector& absolute_offsets, vector& outputs) { - try - { - mcore->get_output_key(amount, absolute_offsets, outputs); - return true; - } - catch (const OUTPUT_DNE& e) - { - OMERROR << "get_output_keys: " << e.what(); - } + auto future_result = thread_pool->submit( + [this](auto const& amount, + auto const& absolute_offsets, + auto& outputs) -> bool + { + try + { + this->mcore->get_output_key(amount, + absolute_offsets, outputs); + return true; + } + catch (const OUTPUT_DNE& e) + { + OMERROR << "get_output_keys: " << e.what(); + } - return false; + return false; + + }, std::cref(amount), std::cref(absolute_offsets), + std::ref(outputs)); + + return future_result.get(); } @@ -287,24 +369,49 @@ CurrentBlockchainStatus::get_amount_specific_indices( const crypto::hash& tx_hash, vector& out_indices) { - try - { - // this index is lmdb index of a tx, not tx hash - uint64_t tx_index; + auto future_result = thread_pool->submit( + [this](auto const& tx_hash, auto& out_indices) + -> bool + { + try + { + // this index is lmdb index of a tx, not tx hash + uint64_t tx_index; + + if (this->mcore->tx_exists(tx_hash, tx_index)) + { + out_indices = this->mcore + ->get_tx_amount_output_indices(tx_index); + + return true; + } + } + catch(const exception& e) + { + OMERROR << e.what(); + } + return false; + + }, std::cref(tx_hash), std::ref(out_indices)); + + return future_result.get(); +} - if (mcore->tx_exists(tx_hash, tx_index)) +bool +CurrentBlockchainStatus::get_output_histogram( + COMMAND_RPC_GET_OUTPUT_HISTOGRAM::request& req, + COMMAND_RPC_GET_OUTPUT_HISTOGRAM::response& res) const +{ + auto future_result = thread_pool->submit( + [this](auto const& req, auto& res) + -> bool { - out_indices = mcore->get_tx_amount_output_indices(tx_index); + return this->mcore + ->get_output_histogram(req, res); - return true; - } - } - catch(const exception& e) - { - cerr << e.what() << endl; - } + }, std::cref(req), std::ref(res)); - return false; + return future_result.get(); } unique_ptr @@ -312,7 +419,8 @@ CurrentBlockchainStatus::create_random_outputs_object( vector const& amounts, uint64_t outs_count) const { - return make_unique(*mcore, amounts, outs_count); + return make_unique( + this, amounts, outs_count); } bool @@ -334,6 +442,26 @@ CurrentBlockchainStatus::get_random_outputs( return true; } +bool +CurrentBlockchainStatus::get_outs( + COMMAND_RPC_GET_OUTPUTS_BIN::request const& req, + COMMAND_RPC_GET_OUTPUTS_BIN::response& res) const +{ + auto future_result = thread_pool->submit( + [this](auto const& req, auto& res) + -> bool + { + if (!this->mcore->get_outs(req, res)) + { + OMERROR << "mcore->get_outs(req, res) failed"; + return false; + } + return true; + + }, std::cref(req), std::ref(res)); + + return future_result.get(); +} bool CurrentBlockchainStatus::get_output( @@ -344,13 +472,11 @@ CurrentBlockchainStatus::get_output( COMMAND_RPC_GET_OUTPUTS_BIN::request req; COMMAND_RPC_GET_OUTPUTS_BIN::response res; - req.outputs.push_back(get_outputs_out {amount, global_output_index}); + req.outputs.push_back( + get_outputs_out {amount, global_output_index}); - if (!mcore->get_outs(req, res)) - { - OMERROR << "mcore->get_outs(req, res) failed"; + if (!get_outs(req, res)) return false; - } output_info = res.outs.at(0); @@ -362,8 +488,7 @@ CurrentBlockchainStatus::get_dynamic_per_kb_fee_estimate() const { const double byte_to_kbyte_factor = 1024; - uint64_t fee_per_byte = mcore->get_dynamic_base_fee_estimate( - FEE_ESTIMATE_GRACE_BLOCKS); + uint64_t fee_per_byte = get_dynamic_base_fee_estimate(); uint64_t fee_per_kB = static_cast( fee_per_byte * byte_to_kbyte_factor); @@ -374,15 +499,29 @@ CurrentBlockchainStatus::get_dynamic_per_kb_fee_estimate() const uint64_t CurrentBlockchainStatus::get_dynamic_base_fee_estimate() const { - return mcore->get_dynamic_base_fee_estimate( - FEE_ESTIMATE_GRACE_BLOCKS); + auto future_result = thread_pool->submit( + [this]() -> uint64_t + { + return this->mcore + ->get_dynamic_base_fee_estimate( + FEE_ESTIMATE_GRACE_BLOCKS); + }); + + return future_result.get(); } uint64_t CurrentBlockchainStatus::get_tx_unlock_time( crypto::hash const& tx_hash) const { - return mcore->get_tx_unlock_time(tx_hash); + + auto future_result = thread_pool->submit( + [this](auto const& tx_hash) -> uint64_t + { + return this->mcore->get_tx_unlock_time(tx_hash); + }, std::cref(tx_hash)); + + return future_result.get(); } bool @@ -407,12 +546,24 @@ CurrentBlockchainStatus::read_mempool() // get txs in the mempool std::vector mempool_tx_info; vector key_image_infos; + + auto future_result = thread_pool->submit( + [this](auto& mempool_tx_info, auto& key_image_infos) + -> bool + { + if (!this->mcore->get_mempool_txs( + mempool_tx_info, key_image_infos)) + { + OMERROR << "Getting mempool failed "; + return false; + } + return true; - if (!mcore->get_mempool_txs(mempool_tx_info, key_image_infos)) - { - OMERROR << "Getting mempool failed "; + }, std::ref(mempool_tx_info), + std::ref(key_image_infos)); + + if (!future_result.get()) return false; - } // not using this info at present (void) key_image_infos; @@ -584,7 +735,16 @@ CurrentBlockchainStatus::get_output_key( uint64_t amount, uint64_t global_amount_index) { - return mcore->get_output_key(amount, global_amount_index); + + auto future_result = thread_pool->submit( + [this](auto amount, auto global_amount_index) + -> output_data_t + { + return this->mcore->get_output_key( + amount, global_amount_index); + }, amount, global_amount_index); + + return future_result.get(); } bool @@ -803,7 +963,14 @@ CurrentBlockchainStatus::get_tx( crypto::hash const& tx_hash, transaction& tx) { - return mcore->get_tx(tx_hash, tx); + auto future_result = thread_pool->submit( + [this](auto const& tx_hash, + auto& tx) -> bool + { + return this->mcore->get_tx(tx_hash, tx); + }, std::cref(tx_hash), std::ref(tx)); + + return future_result.get(); } @@ -819,7 +986,7 @@ CurrentBlockchainStatus::get_tx( return false; } - return mcore->get_tx(tx_hash, tx); + return get_tx(tx_hash, tx); } bool @@ -830,7 +997,14 @@ CurrentBlockchainStatus::get_tx_block_height( if (!tx_exist(tx_hash)) return false; - tx_height = mcore->get_tx_block_height(tx_hash); + auto future_result = thread_pool->submit( + [this](auto const tx_hash) + -> int64_t + { + return this->mcore->get_tx_block_height(tx_hash); + }, std::cref(tx_hash)); + + tx_height = future_result.get(); return true; } diff --git a/src/CurrentBlockchainStatus.h b/src/CurrentBlockchainStatus.h index b295965..06fc756 100755 --- a/src/CurrentBlockchainStatus.h +++ b/src/CurrentBlockchainStatus.h @@ -14,6 +14,8 @@ #include "db/MySqlAccounts.h" #include "RandomOutputs.h" +#include "../ext/ThreadPool.hpp" + #include #include #include @@ -60,7 +62,8 @@ public: CurrentBlockchainStatus(BlockchainSetup _bc_setup, std::unique_ptr _mcore, - std::unique_ptr _rpc); + std::unique_ptr _rpc, + std::unique_ptr _tp); virtual void monitor_blockchain(); @@ -113,6 +116,10 @@ public: get_tx_with_output(uint64_t output_idx, uint64_t amount, transaction& tx, uint64_t& output_idx_in_tx); + virtual tx_out_index + get_output_tx_and_index(uint64_t amount, + uint64_t index) const; + virtual bool get_output_keys(const uint64_t& amount, const vector& absolute_offsets, @@ -141,6 +148,15 @@ public: RandomOutputs::outs_for_amount_v& found_outputs); + virtual bool + get_output_histogram( + COMMAND_RPC_GET_OUTPUT_HISTOGRAM::request& req, + COMMAND_RPC_GET_OUTPUT_HISTOGRAM::response& res) const; + + virtual bool + get_outs(COMMAND_RPC_GET_OUTPUTS_BIN::request const& req, + COMMAND_RPC_GET_OUTPUTS_BIN::response& res) const; + virtual uint64_t get_dynamic_per_kb_fee_estimate() const; @@ -211,7 +227,8 @@ public: get_tx(string const& tx_hash_str, transaction& tx); virtual bool - get_tx_block_height(crypto::hash const& tx_hash, int64_t& tx_height); + get_tx_block_height(crypto::hash const& tx_hash, + int64_t& tx_height); virtual bool set_new_searched_blk_no(const string& address, @@ -223,7 +240,8 @@ public: virtual bool get_known_outputs_keys(string const& address, - unordered_map& known_outputs_keys); + unordered_map& known_outputs_keys); virtual void clean_search_thread_map(); @@ -297,6 +315,13 @@ protected: // this class is also the only class which can // use talk to monero deamon using RPC. std::unique_ptr rpc; + + // any operation required to use blockchain + // i.e., access through mcore, will be performed + // by threads in this thread_pool. we have to + // have fixed and limited number of threads so that + // the lmdb does not throw MDB_READERS_FULL + std::unique_ptr thread_pool; // vector of mempool transactions that all threads // can refer to @@ -318,6 +343,7 @@ protected: // to synchronize access to mempool_txs vector mutex getting_mempool_txs; + // have this method will make it easier to moc // RandomOutputs in our tests later virtual unique_ptr diff --git a/src/MicroCore.cpp b/src/MicroCore.cpp index ed97ed4..ae5468b 100755 --- a/src/MicroCore.cpp +++ b/src/MicroCore.cpp @@ -135,6 +135,7 @@ MicroCore::get_block_complete_entry(block const& b, block_complete_entry& bce) bool MicroCore::get_tx(crypto::hash const& tx_hash, transaction& tx) const { + if (core_storage.have_tx(tx_hash)) { // get transaction with given hash @@ -238,4 +239,5 @@ MicroCore::init_success() const //} + } diff --git a/src/MicroCore.h b/src/MicroCore.h index 71cbd51..7f506a4 100755 --- a/src/MicroCore.h +++ b/src/MicroCore.h @@ -37,7 +37,7 @@ class MicroCore { network_type nettype; bool initialization_succeded {false}; - + public: // + //get_blocks_range(const uint64_t& h1, const uint64_t& h2) const + //{ + //std::vector> blobblocks; + //std::vector txs; + //vector blocks; + + //{ + //std::lock_guard lock(mtx1); + //cout << "tid: " << std::this_thread::get_id() << endl;; + //core_storage.get_blocks(h1, h2-h1+1, blobblocks, txs); + //} + + //blocks.reserve(blobblocks.size()); + + //for (auto const& bpair: blobblocks) + //blocks.push_back(std::move(bpair.second)); + + //return blocks; + //} virtual uint64_t get_tx_unlock_time(crypto::hash const& tx_hash) const @@ -159,7 +180,6 @@ public: { // tx_hash , index in tx // tx_out_index is std::pair; - core_storage.get_db().get_output_tx_and_index( amount, offsets, indices); } @@ -218,8 +238,8 @@ public: virtual ~MicroCore() = default; }; -} +} #endif //XMREG01_MICROCORE_H diff --git a/src/OpenMoneroRequests.cpp b/src/OpenMoneroRequests.cpp index 857725d..1a2d52b 100755 --- a/src/OpenMoneroRequests.cpp +++ b/src/OpenMoneroRequests.cpp @@ -1285,7 +1285,7 @@ OpenMoneroRequests::import_recent_wallet_request( = boost::lexical_cast( j_request["no_blocks_to_import"].get()); } - catch (boost::bad_lexical_cast& e) + catch (std::exception const& e) { string msg = "Cant parse " + j_request["no_blocks_to_import"].get() @@ -1872,7 +1872,7 @@ OpenMoneroRequests::login_and_start_search_thread( acc.viewkey = view_key; // so we have an account now. Either existing or - // newly created. Thus, we can start a tread + // newly created. Thus, we can start a thread // which will scan for transactions belonging to // that account, using its address and view key. // the thread will scan the blockchain for txs belonging @@ -1884,7 +1884,7 @@ OpenMoneroRequests::login_and_start_search_thread( // any belonging transactions in a loop. // Thus the thread does not need // to do anything except looking for tx and updating mysql - // with relative tx information + // with relevant tx information if (!current_bc_status->search_thread_exist(acc.address)) { @@ -1951,7 +1951,6 @@ OpenMoneroRequests::parse_request( json& j_request, json& j_response) { - try { j_request = body_to_json(body); @@ -1961,7 +1960,6 @@ OpenMoneroRequests::parse_request( for (const auto& v: values_map) { - if (j_request.count(v) == 0) { cerr << v + " value not provided" << endl; @@ -1971,7 +1969,6 @@ OpenMoneroRequests::parse_request( return false; } - } return true; diff --git a/src/OutputInputIdentification.cpp b/src/OutputInputIdentification.cpp index b8ca706..211334c 100755 --- a/src/OutputInputIdentification.cpp +++ b/src/OutputInputIdentification.cpp @@ -34,12 +34,18 @@ OutputInputIdentification::OutputInputIdentification( if (!generate_key_derivation(tx_pub_key, *viewkey, derivation)) { - OMERROR << "Cant get derived key for: " << "\n" - << "pub_tx_key: " << get_tx_pub_key_str() << " and " - << "prv_view_key" << viewkey;; - - throw OutputInputIdentificationException( - "Cant get derived key for a tx"); + OMWARN << "Cant get derived key for tx: " + << pod_to_hex(tx_hash) + << ", pub_tx_key: " + << pod_to_hex(tx_pub_key); + + static_assert(sizeof(derivation) == sizeof(rct::key), + "Mismatched sizes of key_derivation and rct::key"); + + // use identity derivation instead + // solution based on that found in wallet2.cpp in monero + // this will cause the tx output to be effectively skipped + memcpy(&derivation, rct::identity().bytes, sizeof(derivation)); } } diff --git a/src/RandomOutputs.cpp b/src/RandomOutputs.cpp index 0c92602..6c28ea1 100755 --- a/src/RandomOutputs.cpp +++ b/src/RandomOutputs.cpp @@ -1,13 +1,13 @@ #include "RandomOutputs.h" - +#include "CurrentBlockchainStatus.h" namespace xmreg { RandomOutputs::RandomOutputs( - MicroCore const& _mcore, + CurrentBlockchainStatus const* _cbs, vector const& _amounts, uint64_t _outs_count) - : mcore {_mcore}, + : cbs {_cbs}, amounts {_amounts}, outs_count {_outs_count} { @@ -39,7 +39,7 @@ RandomOutputs::get_output_pub_key( req.outputs.push_back(get_outputs_out {amount, global_output_index}); - if (!mcore.get_outs(req, res)) + if (!cbs->get_outs(req, res)) { OMERROR << "mcore->get_outs(req, res) failed"; return false; @@ -65,7 +65,7 @@ RandomOutputs::find_random_outputs() req.min_count = outs_count; req.max_count = 0; - if (!mcore.get_output_histogram(req, res)) + if (!cbs->get_output_histogram(req, res)) { OMERROR << "mcore->get_output_histogram(req, res)"; return false; @@ -80,15 +80,16 @@ RandomOutputs::find_random_outputs() // find histogram_entry for amount that we look // random outputs for auto const hist_entry_it = std::find_if( - begin(res.histogram), end(res.histogram), - [&amount](histogram_entry const& he) - { - return amount == he.amount; - }); + std::begin(res.histogram), std::end(res.histogram), + [&amount](auto const& he) + { + return amount == he.amount; + }); if (hist_entry_it == res.histogram.end()) { - OMERROR << "Could not find amount: it == res.histogram.end()\n"; + OMERROR << "Could not find amount: it " + "== res.histogram.end()\n"; return false; } diff --git a/src/RandomOutputs.h b/src/RandomOutputs.h index d23e922..00b1d20 100755 --- a/src/RandomOutputs.h +++ b/src/RandomOutputs.h @@ -8,6 +8,8 @@ namespace xmreg { +class CurrentBlockchainStatus; + /** * @brief Returns random ouputs for given amounts * @@ -46,7 +48,7 @@ public: using outs_for_amount_v = vector; - RandomOutputs(MicroCore const& _mcore, + RandomOutputs(CurrentBlockchainStatus const* _cbs, vector const& _amounts, uint64_t _outs_count); @@ -59,7 +61,8 @@ public: virtual ~RandomOutputs() = default; protected: - MicroCore const& mcore; + //MicroCore const& mcore; + CurrentBlockchainStatus const* cbs; vector amounts; uint64_t outs_count; diff --git a/src/TxSearch.cpp b/src/TxSearch.cpp index de72eaf..58da5cc 100755 --- a/src/TxSearch.cpp +++ b/src/TxSearch.cpp @@ -86,7 +86,9 @@ TxSearch::operator()() uint64_t h1 = searched_blk_no; uint64_t h2 = std::min(h1 + blocks_lookahead - 1, last_block_height); - vector blocks = current_bc_status->get_blocks_range(h1, h2); + vector blocks; + + blocks = current_bc_status->get_blocks_range(h1, h2); if (blocks.empty()) { @@ -594,8 +596,13 @@ TxSearch::operator()() } //current_timestamp = loop_timestamp; - - searched_blk_no = h2 + 1; + // update this only when this variable is false + // otherwise a new search block value can + // be overwritten to h2, instead of the new value + if (!searched_block_got_updated) + searched_blk_no = h2 + 1; + + searched_block_got_updated = false; } // while(continue_search) @@ -632,6 +639,7 @@ void TxSearch::set_searched_blk_no(uint64_t new_value) { searched_blk_no = new_value; + searched_block_got_updated = true; } uint64_t diff --git a/src/TxSearch.h b/src/TxSearch.h index 4ea0a99..2e38df3 100755 --- a/src/TxSearch.h +++ b/src/TxSearch.h @@ -46,11 +46,16 @@ private: static seconds thread_search_life; // indicate that a thread loop should keep running - bool continue_search {true}; + std::atomic_bool continue_search {true}; // this acctually indicates whether thread loop finished // its execution - bool searching_is_ongoing {false}; + std::atomic_bool searching_is_ongoing {false}; + + // marked true when we set new searched block value + // from other thread. for example, when we import account + // we set it to 0 + std::atomic_bool searched_block_got_updated {false}; // to store last exception thrown in the search thread // using this, a main thread can get info what went wrong here diff --git a/tests/bcstatus_tests.cpp b/tests/bcstatus_tests.cpp index eb9aad4..95f9bfc 100755 --- a/tests/bcstatus_tests.cpp +++ b/tests/bcstatus_tests.cpp @@ -41,8 +41,12 @@ protected: rpc = std::make_unique("dummy deamon url"); rpc_ptr = rpc.get(); + tp = std::make_unique(); + tp_ptr = tp.get(); + bcs = std::make_unique( - bc_setup, std::move(mcore), std::move(rpc)); + bc_setup, std::move(mcore), std::move(rpc), + std::move(tp)); } network_type net_type {network_type::STAGENET}; @@ -50,10 +54,12 @@ protected: xmreg::BlockchainSetup bc_setup; std::unique_ptr mcore; std::unique_ptr rpc; + std::unique_ptr tp; std::unique_ptr bcs; MockMicroCore* mcore_ptr; MockRPCCalls* rpc_ptr; + TP::ThreadPool* tp_ptr; static nlohmann::json config_json; }; @@ -64,7 +70,8 @@ nlohmann::json BCSTATUS_TEST::config_json; TEST_P(BCSTATUS_TEST, DefaultConstruction) { - xmreg::CurrentBlockchainStatus bcs {bc_setup, nullptr, nullptr}; + xmreg::CurrentBlockchainStatus bcs { + bc_setup, nullptr, nullptr, nullptr}; EXPECT_TRUE(true); } diff --git a/tests/mocks.h b/tests/mocks.h index 76246db..c75c2b2 100755 --- a/tests/mocks.h +++ b/tests/mocks.h @@ -180,7 +180,7 @@ class MockCurrentBlockchainStatus : public xmreg::CurrentBlockchainStatus public: MockCurrentBlockchainStatus() : xmreg::CurrentBlockchainStatus(xmreg::BlockchainSetup(), - nullptr, nullptr) + nullptr, nullptr, nullptr) {} MOCK_METHOD3(get_output_keys, diff --git a/tests/mysql_tests.cpp b/tests/mysql_tests.cpp index 34c7164..82b0058 100755 --- a/tests/mysql_tests.cpp +++ b/tests/mysql_tests.cpp @@ -1199,7 +1199,7 @@ public: MockCurrentBlockchainStatus1() : xmreg::CurrentBlockchainStatus( xmreg::BlockchainSetup(), - nullptr, nullptr) + nullptr, nullptr, nullptr) {} bool tx_unlock_state {true}; diff --git a/tests/txsearch_tests.cpp b/tests/txsearch_tests.cpp index f07e604..fa635fa 100755 --- a/tests/txsearch_tests.cpp +++ b/tests/txsearch_tests.cpp @@ -490,8 +490,12 @@ protected: rpc = std::make_unique("dummy deamon url"); rpc_ptr = rpc.get(); + tp = std::make_unique(); + tp_ptr = tp.get(); + bcs = std::make_unique( - bc_setup, std::move(mcore), std::move(rpc)); + bc_setup, std::move(mcore), std::move(rpc), + std::move(tp)); } network_type net_type {network_type::STAGENET}; @@ -500,9 +504,11 @@ protected: std::unique_ptr mcore; std::unique_ptr rpc; std::unique_ptr bcs; + std::unique_ptr tp; MockMicroCore* mcore_ptr; MockRPCCalls* rpc_ptr; + TP::ThreadPool* tp_ptr; static json config_json; };