From 10bd484b55f70c8423ba532d079185e0e350855e Mon Sep 17 00:00:00 2001 From: moneroexamples Date: Fri, 25 Jan 2019 12:52:00 +0800 Subject: [PATCH] Started using rpc instead of microcore in threads --- main.cpp | 4 +- src/CurrentBlockchainStatus.cpp | 44 +++++-- src/CurrentBlockchainStatus.h | 5 +- src/MicroCore.cpp | 5 + src/MicroCore.h | 25 ++++ src/OpenMoneroRequests.cpp | 2 + src/OutputInputIdentification.cpp | 2 +- src/RPCCalls.cpp | 200 +++++++++++++++++++++++++++++- src/RPCCalls.h | 16 +++ src/TxSearch.cpp | 115 ++++++++++------- src/TxSearch.h | 9 +- 11 files changed, 364 insertions(+), 63 deletions(-) diff --git a/main.cpp b/main.cpp index 2a76e10..e58e407 100755 --- a/main.cpp +++ b/main.cpp @@ -90,8 +90,8 @@ defaultConf.setGlobally(el::ConfigurationType::ToStandardOutput, "true"); // default format: %datetime %level [%logger] %msg // we change to add file and func defaultConf.setGlobally(el::ConfigurationType::Format, - "%datetime - [%levshort,%logger,%fbase:%func:%line] %msg"); + "%datetime [%levshort,%logger,%fbase:%func:%line]" + " %msg"); el::Loggers::reconfigureLogger(OPENMONERO_LOG_CATEGORY, defaultConf); diff --git a/src/CurrentBlockchainStatus.cpp b/src/CurrentBlockchainStatus.cpp index efcb065..d12f9cc 100755 --- a/src/CurrentBlockchainStatus.cpp +++ b/src/CurrentBlockchainStatus.cpp @@ -120,15 +120,35 @@ CurrentBlockchainStatus::get_block(uint64_t height, block& blk) vector CurrentBlockchainStatus::get_blocks_range( - uint64_t const& h1, uint64_t const& h2) + uint64_t h1, uint64_t h2) { try { return mcore->get_blocks_range(h1, h2); } - catch (BLOCK_DNE& e) + catch (std::exception const& e) { - cerr << e.what() << endl; + OMERROR << e.what(); + } + + return {}; +} + +vector>> +CurrentBlockchainStatus::get_blocks_and_txs_range( + uint64_t h1, uint64_t h2) +{ + try + { + + if (h1 >= h2) + return {}; + + return rpc->get_blocks_range(h1, h2); + } + catch (std::exception const& e) + { + OMERROR << e.what(); } return {}; @@ -249,8 +269,9 @@ CurrentBlockchainStatus::get_output_keys( { try { - mcore->get_output_key(amount, absolute_offsets, outputs); - return true; + //mcore->get_output_key(amount, absolute_offsets, outputs); + //return true; + return rpc->get_output_keys(amount, absolute_offsets, outputs); } catch (const OUTPUT_DNE& e) { @@ -292,12 +313,13 @@ CurrentBlockchainStatus::get_amount_specific_indices( // this index is lmdb index of a tx, not tx hash uint64_t tx_index; - if (mcore->tx_exists(tx_hash, tx_index)) - { - out_indices = mcore->get_tx_amount_output_indices(tx_index); - - return true; - } + //if (mcore->tx_exists(tx_hash, tx_index)) + // { + //out_indices = mcore->get_tx_amount_output_indices(tx_index); + //return true; + // } + + return rpc->get_tx_amount_output_indices(tx_hash, out_indices); } catch(const exception& e) { diff --git a/src/CurrentBlockchainStatus.h b/src/CurrentBlockchainStatus.h index b295965..cb0fc05 100755 --- a/src/CurrentBlockchainStatus.h +++ b/src/CurrentBlockchainStatus.h @@ -88,7 +88,10 @@ public: get_block(uint64_t height, block &blk); virtual vector - get_blocks_range(uint64_t const& h1, uint64_t const& h2); + get_blocks_range(uint64_t h1, uint64_t h2); + + vector>> + get_blocks_and_txs_range(uint64_t h1, uint64_t h2); virtual bool get_block_txs(const block &blk, diff --git a/src/MicroCore.cpp b/src/MicroCore.cpp index ed97ed4..ea7f8fe 100755 --- a/src/MicroCore.cpp +++ b/src/MicroCore.cpp @@ -100,6 +100,7 @@ MicroCore::get_block_from_height(uint64_t height, block& blk) const try { + std::lock_guard lock(mtx1); blk = core_storage.get_db().get_block_from_height(height); } catch (const exception& e) @@ -135,6 +136,8 @@ MicroCore::get_block_complete_entry(block const& b, block_complete_entry& bce) bool MicroCore::get_tx(crypto::hash const& tx_hash, transaction& tx) const { + + std::lock_guard lock(mtx1); if (core_storage.have_tx(tx_hash)) { // get transaction with given hash @@ -155,6 +158,8 @@ MicroCore::get_output_histogram( { try { + std::lock_guard lock(mtx1); + histogram = core_storage.get_output_histogram( amounts, unlocked, diff --git a/src/MicroCore.h b/src/MicroCore.h index 71cbd51..785590e 100755 --- a/src/MicroCore.h +++ b/src/MicroCore.h @@ -38,6 +38,16 @@ class MicroCore { bool initialization_succeded {false}; + + // will be used to limit access to the blockchain + // these four mutexes should allow only 4 threads + // to query lmdb blockchain at the same time. + // its poor mans solution to MDB_READERS_FULL limit + mutable mutex mtx1; + mutable mutex mtx2; + //mutable mutex mtx1; + //mutable mutex mtx1; + public: // const& absolute_offsets, vector& outputs) { + std::lock_guard lock(mtx1); core_storage.get_db() .get_output_key(epee::span(&amount, 1), absolute_offsets, outputs); @@ -80,6 +91,7 @@ public: get_output_key(uint64_t amount, uint64_t global_amount_index) { + std::lock_guard lock(mtx1); return core_storage.get_db() .get_output_key(amount, global_amount_index); } @@ -90,48 +102,56 @@ public: std::vector& txs, std::vector& missed_txs) const { + std::lock_guard lock(mtx1); return core_storage.get_transactions(txs_ids, txs, missed_txs); } virtual std::vector get_blocks_range(const uint64_t& h1, const uint64_t& h2) const { + std::lock_guard lock(mtx1); return core_storage.get_db().get_blocks_range(h1, h2); } virtual uint64_t get_tx_unlock_time(crypto::hash const& tx_hash) const { + std::lock_guard lock(mtx1); return core_storage.get_db().get_tx_unlock_time(tx_hash); } virtual bool have_tx(crypto::hash const& tx_hash) const { + std::lock_guard lock(mtx1); return core_storage.have_tx(tx_hash); } virtual bool tx_exists(crypto::hash const& tx_hash, uint64_t& tx_id) const { + std::lock_guard lock(mtx1); return core_storage.get_db().tx_exists(tx_hash, tx_id); } virtual tx_out_index get_output_tx_and_index(uint64_t const& amount, uint64_t const& index) const { + std::lock_guard lock(mtx1); return core_storage.get_db().get_output_tx_and_index(amount, index); } virtual uint64_t get_tx_block_height(crypto::hash const& tx_hash) const { + std::lock_guard lock(mtx1); return core_storage.get_db().get_tx_block_height(tx_hash); } virtual std::vector get_tx_amount_output_indices(uint64_t const& tx_id) const { + std::lock_guard lock(mtx1); return core_storage.get_db() .get_tx_amount_output_indices(tx_id).front(); } @@ -141,6 +161,7 @@ public: std::vector& tx_infos, std::vector& key_image_infos) const { + std::lock_guard lock(mtx1); return m_mempool.get_transactions_and_spent_keys_info( tx_infos, key_image_infos); } @@ -148,6 +169,7 @@ public: virtual uint64_t get_current_blockchain_height() const { + std::lock_guard lock(mtx1); return core_storage.get_current_blockchain_height(); } @@ -159,6 +181,7 @@ public: { // tx_hash , index in tx // tx_out_index is std::pair; + std::lock_guard lock(mtx1); core_storage.get_db().get_output_tx_and_index( amount, offsets, indices); @@ -184,12 +207,14 @@ public: get_outs(COMMAND_RPC_GET_OUTPUTS_BIN::request const& req, COMMAND_RPC_GET_OUTPUTS_BIN::response& res) const { + std::lock_guard lock(mtx1); return core_storage.get_outs(req, res); } virtual uint64_t get_dynamic_base_fee_estimate(uint64_t const& grace_blocks) const { + std::lock_guard lock(mtx1); return core_storage.get_dynamic_base_fee_estimate(grace_blocks); } diff --git a/src/OpenMoneroRequests.cpp b/src/OpenMoneroRequests.cpp index 857725d..fe7f24d 100755 --- a/src/OpenMoneroRequests.cpp +++ b/src/OpenMoneroRequests.cpp @@ -1046,6 +1046,8 @@ OpenMoneroRequests::import_wallet_request( return; } + OMINFO << xmr_address.substr(0, 6) + ": searched block set to 0"; + j_response["request_fulfilled"] = true; j_response["status"] = "Import will start shortly"; j_response["new_request"] = true; diff --git a/src/OutputInputIdentification.cpp b/src/OutputInputIdentification.cpp index b8ca706..e12a7f3 100755 --- a/src/OutputInputIdentification.cpp +++ b/src/OutputInputIdentification.cpp @@ -231,7 +231,7 @@ OutputInputIdentification::identify_inputs( // just to be sure before we break out of this loop, // do it only after two misses - if (++search_misses > 2) + if (++search_misses > 1) break; } diff --git a/src/RPCCalls.cpp b/src/RPCCalls.cpp index 7f76ec4..77af7e3 100755 --- a/src/RPCCalls.cpp +++ b/src/RPCCalls.cpp @@ -48,7 +48,7 @@ RPCCalls::commit_tx( bool r {false}; { - std::lock_guard guard(m_daemon_rpc_mutex); + // std::lock_guard guard(m_daemon_rpc_mutex); r = epee::net_utils::invoke_http_json( "/sendrawtransaction", req, res, @@ -122,6 +122,186 @@ RPCCalls::get_current_height(uint64_t& current_height) } + +bool +RPCCalls::get_blocks(vector blk_heights, + COMMAND_RPC_GET_BLOCKS_BY_HEIGHT::response& res) +{ + COMMAND_RPC_GET_BLOCKS_BY_HEIGHT::request req; + req.heights = std::move(blk_heights); + + bool r {false}; + + { + std::lock_guard guard(m_daemon_rpc_mutex); + + r = epee::net_utils::invoke_http_bin( + "/get_blocks_by_height.bin", + req, res, m_http_client, rpc_timeout); + } + + string error_msg; + + if (!r || !check_if_response_is_ok(res, error_msg)) + { + OMERROR << "Error getting blocks. " + << error_msg; + return false; + } + + return true; +} + +std::vector>> +RPCCalls::get_blocks_range(uint64_t h1, uint64_t h2) +{ + //assert(h2>h1); + + vector blk_heights(h2-h1); + + std::iota(blk_heights.begin(), blk_heights.end(), h1); + + //for(auto h: blk_heights) + //cout << h << ","; + + //cout << endl; + + COMMAND_RPC_GET_BLOCKS_BY_HEIGHT::response res; + + if (!get_blocks(std::move(blk_heights), res)) + { + stringstream ss; + + ss << "Getting blocks [" << h1 + << ", " << h2 << ") failed!"; + + throw std::runtime_error(ss.str()); + } + + vector>> blocks; + blocks.resize(res.blocks.size()); + + //for (auto const& blk_ce: res.blocks) + //{ + for (uint64_t i = 0; i < res.blocks.size();i++) + { + + auto const& blk_ce = res.blocks[i]; + + block blk; + + if (!parse_and_validate_block_from_blob(blk_ce.block, blk)) + { + throw std::runtime_error("Cant parse block from blobdata"); + } + + vector txs; + txs.resize(blk_ce.txs.size()); + + //for (auto const& tx_blob: blk_ce.txs) + for (uint64_t j = 0; j < blk_ce.txs.size(); j++) + { + auto const& tx_blob = blk_ce.txs[j]; + + if (!parse_and_validate_tx_from_blob(tx_blob, txs[j])) + { + throw std::runtime_error("Cant parse tx from blobdata"); + } + } + + //blocks.emplace_back(make_pair(std::move(blk), std::move(txs))); + blocks[i].first = std::move(blk); + blocks[i].second = std::move(txs); + } + + return blocks; +} + +bool +RPCCalls::get_output_keys(uint64_t amount, + vector const& absolute_offsets, + vector& outputs) +{ + COMMAND_RPC_GET_OUTPUTS_BIN::request req; + COMMAND_RPC_GET_OUTPUTS_BIN::response res; + + req.get_txid = false; + + for (auto ao: absolute_offsets) + { + req.outputs.push_back({amount, ao}); + } + + bool r {false}; + + { + std::lock_guard guard(m_daemon_rpc_mutex); + + r = epee::net_utils::invoke_http_bin( + "/get_outs.bin", + req, res, m_http_client, rpc_timeout); + } + + string error_msg; + + if (!r || !check_if_response_is_ok(res, error_msg)) + { + OMERROR << "Error getting output keys. " + << error_msg; + return false; + } + + + for (auto const& out: res.outs) + { + outputs.push_back(output_data_t{}); + + auto& out_data = outputs.back(); + out_data.pubkey = out.key; + out_data.height = out.height; + // out_data.unlock_time = ??? not set + // out_data.commitment = ?? not set + } + + + return true; + +} + + +bool +RPCCalls::get_tx_amount_output_indices(crypto::hash const& tx_hash, + vector& out_indices) +{ + COMMAND_RPC_GET_TX_GLOBAL_OUTPUTS_INDEXES::request req; + COMMAND_RPC_GET_TX_GLOBAL_OUTPUTS_INDEXES::response res; + + req.txid = tx_hash; + + bool r {false}; + + { + std::lock_guard guard(m_daemon_rpc_mutex); + + r = epee::net_utils::invoke_http_bin( + "/get_o_indexes.bin", + req, res, m_http_client, rpc_timeout); + } + + string error_msg; + + if (!r || !check_if_response_is_ok(res, error_msg)) + { + OMERROR << "Error getting output keys. " + << error_msg; + return false; + } + + out_indices = std::move(res.o_indexes); + + return true; +} + template bool RPCCalls::check_if_response_is_ok( @@ -155,4 +335,22 @@ bool RPCCalls::check_if_response_is_ok< COMMAND_RPC_GET_HEIGHT::response>( COMMAND_RPC_GET_HEIGHT::response const& res, string& error_msg) const; + +template +bool RPCCalls::check_if_response_is_ok< + COMMAND_RPC_GET_BLOCKS_BY_HEIGHT::response>( + COMMAND_RPC_GET_BLOCKS_BY_HEIGHT::response const& res, + string& error_msg) const; + +template +bool RPCCalls::check_if_response_is_ok< + COMMAND_RPC_GET_OUTPUTS_BIN::response>( + COMMAND_RPC_GET_OUTPUTS_BIN::response const& res, + string& error_msg) const; + +template +bool RPCCalls::check_if_response_is_ok< + COMMAND_RPC_GET_TX_GLOBAL_OUTPUTS_INDEXES::response>( + COMMAND_RPC_GET_TX_GLOBAL_OUTPUTS_INDEXES::response const& res, + string& error_msg) const; } diff --git a/src/RPCCalls.h b/src/RPCCalls.h index 9e9b8b4..1dbea47 100755 --- a/src/RPCCalls.h +++ b/src/RPCCalls.h @@ -55,6 +55,22 @@ public: virtual bool get_current_height(uint64_t& current_height); + virtual bool + get_blocks(vector blk_heights, + COMMAND_RPC_GET_BLOCKS_BY_HEIGHT::response& res); + + std::vector>> + get_blocks_range(uint64_t h1, uint64_t h2); + + bool + get_output_keys(uint64_t amount, + vector const& absolute_offsets, + vector& outputs); + + bool + get_tx_amount_output_indices(crypto::hash const& tx_hash, + vector& out_indices); + virtual ~RPCCalls() = default; protected: diff --git a/src/TxSearch.cpp b/src/TxSearch.cpp index de72eaf..c27c17f 100755 --- a/src/TxSearch.cpp +++ b/src/TxSearch.cpp @@ -69,40 +69,51 @@ TxSearch::operator()() searching_is_ongoing = true; + + //auto rpc = std::make_unique( + // current_bc_status->get_bc_setup().deamon_url); + // we put everything in massive catch, as there are plenty ways in which // an exceptions can be thrown here. Mostly from mysql. - // but because this is detatch thread, we cant catch them in main thread. + // but because this is thread, we cant catch them in main thread. // program will blow up if exception is thrown. need to handle exceptions // here. try { while(continue_search) - { + { seconds loop_timestamp {current_timestamp}; uint64_t last_block_height = current_bc_status->current_height; + + // cout << "new searched_blk_no:" << searched_blk_no << '\n'; uint64_t h1 = searched_blk_no; - uint64_t h2 = std::min(h1 + blocks_lookahead - 1, last_block_height); + //uint64_t h2 = std::min(h1 + blocks_lookahead - 1, last_block_height); + uint64_t h2 = std::min(h1 + blocks_lookahead, last_block_height+1); + + //vector blocks = current_bc_status->get_blocks_range(h1, h2); + auto blocks = current_bc_status->get_blocks_and_txs_range(h1, h2); + //auto blocks = rpc->get_blocks_range(h1, h2); - vector blocks = current_bc_status->get_blocks_range(h1, h2); + //cout << "h1,h2, no_blocks:" << h1 << "," << h2 << "," << blocks.size() << '\n'; if (blocks.empty()) { - if (h1 <= h2) + if (h1 < h2) { OMERROR << address_prefix - << ": cant get blocks from " << h1 - << " to " << h2; + << ": cant get blocks from [" << h1 + << ", " << h2 << ")!"; stop(); } else { OMINFO << address_prefix << ": waiting for new block. " - "Last scanned was " << h2; + "Last scanned was " << h2 - 1; } std::this_thread::sleep_for( @@ -137,24 +148,24 @@ TxSearch::operator()() } OMINFO << address_prefix + ": analyzing " - << blocks.size() << " blocks from " - << h1 << " to " << h2 - << " out of " << last_block_height << " blocks"; - - vector txs_hashes_from_blocks; - vector txs_in_blocks; - vector txs_data; - - if (!current_bc_status->get_txs_in_blocks(blocks, - txs_hashes_from_blocks, - txs_in_blocks, - txs_data)) - { - OMERROR << address_prefix - + ": cant get tx in blocks from " - << h1 << " to " << h2; - return; - } + << blocks.size() << " blocks [" + << h1 << ", " << h2 + << ") out of " << last_block_height; + + //vector txs_hashes_from_blocks; + //vector txs_in_blocks; + //vector txs_data; + + //if (!current_bc_status->get_txs_in_blocks(blocks, + //txs_hashes_from_blocks, + //txs_in_blocks, + //txs_data)) + //{ + //OMERROR << address_prefix + //+ ": cant get tx in blocks from " + //<< h1 << " to " << h2; + //return; + //} // we will only create mysql DateTime object once, anything is found // in a given block; @@ -179,13 +190,17 @@ TxSearch::operator()() size_t tx_idx {0}; - for (auto const& tx_tuple: txs_data) + uint64_t blk_height {h1}; + + for (auto const& blk_pair: blocks) { - crypto::hash const& tx_hash = txs_hashes_from_blocks[tx_idx]; - transaction const& tx = txs_in_blocks[tx_idx++]; - uint64_t blk_height = std::get<0>(tx_tuple); - uint64_t blk_timestamp = std::get<1>(tx_tuple); - bool is_coinbase = std::get<2>(tx_tuple); + auto blk_timestamp = blk_pair.first.timestamp; + + for (auto const& tx: blk_pair.second) + { + //continue; + auto tx_hash = get_transaction_hash(tx); + auto is_coinbase = cryptonote::is_coinbase(tx); //cout << "\n\n\n" << blk_height << '\n'; @@ -258,14 +273,14 @@ TxSearch::operator()() } - if (!current_bc_status->tx_exist(tx_hash, blockchain_tx_id)) - { - OMERROR << "Tx " << oi_identification.get_tx_hash_str() - << " " << pod_to_hex(tx_hash) - << " not found in blockchain !"; - throw TxSearchException("Cant get tx from blockchain: " - + pod_to_hex(tx_hash)); - } + //if (!current_bc_status->tx_exist(tx_hash, blockchain_tx_id)) + //{ + //OMERROR << "Tx " << oi_identification.get_tx_hash_str() + //<< " " << pod_to_hex(tx_hash) + //<< " not found in blockchain !"; + //throw TxSearchException("Cant get tx from blockchain: " + //+ pod_to_hex(tx_hash)); + //} OMINFO << address_prefix + ": found some outputs in block " @@ -574,6 +589,10 @@ TxSearch::operator()() if (mysql_transaction) mysql_transaction->commit(); + } // end txs + + ++blk_height; + } // for (auto const& tx_pair: txs_map) // update scanned_block_height every given interval @@ -581,10 +600,10 @@ TxSearch::operator()() XmrAccount updated_acc = *acc; - updated_acc.scanned_block_height = h2; + updated_acc.scanned_block_height = h2 - 1; updated_acc.scanned_block_timestamp = DateTime(static_cast( - blocks.back().timestamp)); + blocks.back().first.timestamp)); if (xmr_accounts->update(*acc, updated_acc)) { @@ -594,8 +613,14 @@ TxSearch::operator()() } //current_timestamp = loop_timestamp; + + // update this only when this variable is false + // otherwise a new search block value can + // be overwritten to h2, instead of the new value + if (!searched_block_got_updated) + searched_blk_no = h2; - searched_blk_no = h2 + 1; + searched_block_got_updated = false; } // while(continue_search) @@ -632,6 +657,7 @@ void TxSearch::set_searched_blk_no(uint64_t new_value) { searched_blk_no = new_value; + searched_block_got_updated = true; } uint64_t @@ -692,7 +718,7 @@ void TxSearch::find_txs_in_mempool( TxSearch::pool_txs_t mempool_txs, json* j_transactions) -{ +{ *j_transactions = json::array(); @@ -912,5 +938,4 @@ TxSearch::delete_existing_tx_if_exists(string const& tx_hash) // default value of static veriables seconds TxSearch::thread_search_life {600}; - } diff --git a/src/TxSearch.h b/src/TxSearch.h index 4ea0a99..d2b5484 100755 --- a/src/TxSearch.h +++ b/src/TxSearch.h @@ -46,11 +46,16 @@ private: static seconds thread_search_life; // indicate that a thread loop should keep running - bool continue_search {true}; + atomic continue_search {true}; // this acctually indicates whether thread loop finished // its execution - bool searching_is_ongoing {false}; + atomic searching_is_ongoing {false}; + + // marked true when we set new searched block value + // from other thread. for example, when we import account + // we set it to 0 + atomic searched_block_got_updated {false}; // to store last exception thrown in the search thread // using this, a main thread can get info what went wrong here