Started using rpc instead of microcore in threads

new_rpc
moneroexamples 5 years ago
parent 9e3b72fb75
commit 10bd484b55

@ -90,8 +90,8 @@ defaultConf.setGlobally(el::ConfigurationType::ToStandardOutput, "true");
// default format: %datetime %level [%logger] %msg
// we change to add file and func
defaultConf.setGlobally(el::ConfigurationType::Format,
"%datetime
[%levshort,%logger,%fbase:%func:%line] %msg");
"%datetime [%levshort,%logger,%fbase:%func:%line]"
" %msg");
el::Loggers::reconfigureLogger(OPENMONERO_LOG_CATEGORY, defaultConf);

@ -120,15 +120,35 @@ CurrentBlockchainStatus::get_block(uint64_t height, block& blk)
vector<block>
CurrentBlockchainStatus::get_blocks_range(
uint64_t const& h1, uint64_t const& h2)
uint64_t h1, uint64_t h2)
{
try
{
return mcore->get_blocks_range(h1, h2);
}
catch (BLOCK_DNE& e)
catch (std::exception const& e)
{
cerr << e.what() << endl;
OMERROR << e.what();
}
return {};
}
vector<pair<block, vector<transaction>>>
CurrentBlockchainStatus::get_blocks_and_txs_range(
uint64_t h1, uint64_t h2)
{
try
{
if (h1 >= h2)
return {};
return rpc->get_blocks_range(h1, h2);
}
catch (std::exception const& e)
{
OMERROR << e.what();
}
return {};
@ -249,8 +269,9 @@ CurrentBlockchainStatus::get_output_keys(
{
try
{
mcore->get_output_key(amount, absolute_offsets, outputs);
return true;
//mcore->get_output_key(amount, absolute_offsets, outputs);
//return true;
return rpc->get_output_keys(amount, absolute_offsets, outputs);
}
catch (const OUTPUT_DNE& e)
{
@ -292,12 +313,13 @@ CurrentBlockchainStatus::get_amount_specific_indices(
// this index is lmdb index of a tx, not tx hash
uint64_t tx_index;
if (mcore->tx_exists(tx_hash, tx_index))
{
out_indices = mcore->get_tx_amount_output_indices(tx_index);
return true;
}
//if (mcore->tx_exists(tx_hash, tx_index))
// {
//out_indices = mcore->get_tx_amount_output_indices(tx_index);
//return true;
// }
return rpc->get_tx_amount_output_indices(tx_hash, out_indices);
}
catch(const exception& e)
{

@ -88,7 +88,10 @@ public:
get_block(uint64_t height, block &blk);
virtual vector<block>
get_blocks_range(uint64_t const& h1, uint64_t const& h2);
get_blocks_range(uint64_t h1, uint64_t h2);
vector<pair<block, vector<transaction>>>
get_blocks_and_txs_range(uint64_t h1, uint64_t h2);
virtual bool
get_block_txs(const block &blk,

@ -100,6 +100,7 @@ MicroCore::get_block_from_height(uint64_t height, block& blk) const
try
{
std::lock_guard<std::mutex> lock(mtx1);
blk = core_storage.get_db().get_block_from_height(height);
}
catch (const exception& e)
@ -135,6 +136,8 @@ MicroCore::get_block_complete_entry(block const& b, block_complete_entry& bce)
bool
MicroCore::get_tx(crypto::hash const& tx_hash, transaction& tx) const
{
std::lock_guard<std::mutex> lock(mtx1);
if (core_storage.have_tx(tx_hash))
{
// get transaction with given hash
@ -155,6 +158,8 @@ MicroCore::get_output_histogram(
{
try
{
std::lock_guard<std::mutex> lock(mtx1);
histogram = core_storage.get_output_histogram(
amounts,
unlocked,

@ -38,6 +38,16 @@ class MicroCore {
bool initialization_succeded {false};
// will be used to limit access to the blockchain
// these four mutexes should allow only 4 threads
// to query lmdb blockchain at the same time.
// its poor mans solution to MDB_READERS_FULL limit
mutable mutex mtx1;
mutable mutex mtx2;
//mutable mutex mtx1;
//mutable mutex mtx1;
public:
// <amoumt,
@ -71,6 +81,7 @@ public:
vector<uint64_t> const& absolute_offsets,
vector<cryptonote::output_data_t>& outputs)
{
std::lock_guard<std::mutex> lock(mtx1);
core_storage.get_db()
.get_output_key(epee::span<const uint64_t>(&amount, 1),
absolute_offsets, outputs);
@ -80,6 +91,7 @@ public:
get_output_key(uint64_t amount,
uint64_t global_amount_index)
{
std::lock_guard<std::mutex> lock(mtx1);
return core_storage.get_db()
.get_output_key(amount, global_amount_index);
}
@ -90,48 +102,56 @@ public:
std::vector<transaction>& txs,
std::vector<crypto::hash>& missed_txs) const
{
std::lock_guard<std::mutex> lock(mtx1);
return core_storage.get_transactions(txs_ids, txs, missed_txs);
}
virtual std::vector<block>
get_blocks_range(const uint64_t& h1, const uint64_t& h2) const
{
std::lock_guard<std::mutex> lock(mtx1);
return core_storage.get_db().get_blocks_range(h1, h2);
}
virtual uint64_t
get_tx_unlock_time(crypto::hash const& tx_hash) const
{
std::lock_guard<std::mutex> lock(mtx1);
return core_storage.get_db().get_tx_unlock_time(tx_hash);
}
virtual bool
have_tx(crypto::hash const& tx_hash) const
{
std::lock_guard<std::mutex> lock(mtx1);
return core_storage.have_tx(tx_hash);
}
virtual bool
tx_exists(crypto::hash const& tx_hash, uint64_t& tx_id) const
{
std::lock_guard<std::mutex> lock(mtx1);
return core_storage.get_db().tx_exists(tx_hash, tx_id);
}
virtual tx_out_index
get_output_tx_and_index(uint64_t const& amount, uint64_t const& index) const
{
std::lock_guard<std::mutex> lock(mtx1);
return core_storage.get_db().get_output_tx_and_index(amount, index);
}
virtual uint64_t
get_tx_block_height(crypto::hash const& tx_hash) const
{
std::lock_guard<std::mutex> lock(mtx1);
return core_storage.get_db().get_tx_block_height(tx_hash);
}
virtual std::vector<uint64_t>
get_tx_amount_output_indices(uint64_t const& tx_id) const
{
std::lock_guard<std::mutex> lock(mtx1);
return core_storage.get_db()
.get_tx_amount_output_indices(tx_id).front();
}
@ -141,6 +161,7 @@ public:
std::vector<tx_info>& tx_infos,
std::vector<spent_key_image_info>& key_image_infos) const
{
std::lock_guard<std::mutex> lock(mtx1);
return m_mempool.get_transactions_and_spent_keys_info(
tx_infos, key_image_infos);
}
@ -148,6 +169,7 @@ public:
virtual uint64_t
get_current_blockchain_height() const
{
std::lock_guard<std::mutex> lock(mtx1);
return core_storage.get_current_blockchain_height();
}
@ -159,6 +181,7 @@ public:
{
// tx_hash , index in tx
// tx_out_index is std::pair<crypto::hash, uint64_t>;
std::lock_guard<std::mutex> lock(mtx1);
core_storage.get_db().get_output_tx_and_index(
amount, offsets, indices);
@ -184,12 +207,14 @@ public:
get_outs(COMMAND_RPC_GET_OUTPUTS_BIN::request const& req,
COMMAND_RPC_GET_OUTPUTS_BIN::response& res) const
{
std::lock_guard<std::mutex> lock(mtx1);
return core_storage.get_outs(req, res);
}
virtual uint64_t
get_dynamic_base_fee_estimate(uint64_t const& grace_blocks) const
{
std::lock_guard<std::mutex> lock(mtx1);
return core_storage.get_dynamic_base_fee_estimate(grace_blocks);
}

@ -1046,6 +1046,8 @@ OpenMoneroRequests::import_wallet_request(
return;
}
OMINFO << xmr_address.substr(0, 6) + ": searched block set to 0";
j_response["request_fulfilled"] = true;
j_response["status"] = "Import will start shortly";
j_response["new_request"] = true;

@ -231,7 +231,7 @@ OutputInputIdentification::identify_inputs(
// just to be sure before we break out of this loop,
// do it only after two misses
if (++search_misses > 2)
if (++search_misses > 1)
break;
}

@ -48,7 +48,7 @@ RPCCalls::commit_tx(
bool r {false};
{
std::lock_guard<std::mutex> guard(m_daemon_rpc_mutex);
// std::lock_guard<std::mutex> guard(m_daemon_rpc_mutex);
r = epee::net_utils::invoke_http_json(
"/sendrawtransaction", req, res,
@ -122,6 +122,186 @@ RPCCalls::get_current_height(uint64_t& current_height)
}
bool
RPCCalls::get_blocks(vector<uint64_t> blk_heights,
COMMAND_RPC_GET_BLOCKS_BY_HEIGHT::response& res)
{
COMMAND_RPC_GET_BLOCKS_BY_HEIGHT::request req;
req.heights = std::move(blk_heights);
bool r {false};
{
std::lock_guard<std::mutex> guard(m_daemon_rpc_mutex);
r = epee::net_utils::invoke_http_bin(
"/get_blocks_by_height.bin",
req, res, m_http_client, rpc_timeout);
}
string error_msg;
if (!r || !check_if_response_is_ok(res, error_msg))
{
OMERROR << "Error getting blocks. "
<< error_msg;
return false;
}
return true;
}
std::vector<pair<block, vector<transaction>>>
RPCCalls::get_blocks_range(uint64_t h1, uint64_t h2)
{
//assert(h2>h1);
vector<uint64_t> blk_heights(h2-h1);
std::iota(blk_heights.begin(), blk_heights.end(), h1);
//for(auto h: blk_heights)
//cout << h << ",";
//cout << endl;
COMMAND_RPC_GET_BLOCKS_BY_HEIGHT::response res;
if (!get_blocks(std::move(blk_heights), res))
{
stringstream ss;
ss << "Getting blocks [" << h1
<< ", " << h2 << ") failed!";
throw std::runtime_error(ss.str());
}
vector<pair<block, vector<transaction>>> blocks;
blocks.resize(res.blocks.size());
//for (auto const& blk_ce: res.blocks)
//{
for (uint64_t i = 0; i < res.blocks.size();i++)
{
auto const& blk_ce = res.blocks[i];
block blk;
if (!parse_and_validate_block_from_blob(blk_ce.block, blk))
{
throw std::runtime_error("Cant parse block from blobdata");
}
vector<transaction> txs;
txs.resize(blk_ce.txs.size());
//for (auto const& tx_blob: blk_ce.txs)
for (uint64_t j = 0; j < blk_ce.txs.size(); j++)
{
auto const& tx_blob = blk_ce.txs[j];
if (!parse_and_validate_tx_from_blob(tx_blob, txs[j]))
{
throw std::runtime_error("Cant parse tx from blobdata");
}
}
//blocks.emplace_back(make_pair(std::move(blk), std::move(txs)));
blocks[i].first = std::move(blk);
blocks[i].second = std::move(txs);
}
return blocks;
}
bool
RPCCalls::get_output_keys(uint64_t amount,
vector<uint64_t> const& absolute_offsets,
vector<output_data_t>& outputs)
{
COMMAND_RPC_GET_OUTPUTS_BIN::request req;
COMMAND_RPC_GET_OUTPUTS_BIN::response res;
req.get_txid = false;
for (auto ao: absolute_offsets)
{
req.outputs.push_back({amount, ao});
}
bool r {false};
{
std::lock_guard<std::mutex> guard(m_daemon_rpc_mutex);
r = epee::net_utils::invoke_http_bin(
"/get_outs.bin",
req, res, m_http_client, rpc_timeout);
}
string error_msg;
if (!r || !check_if_response_is_ok(res, error_msg))
{
OMERROR << "Error getting output keys. "
<< error_msg;
return false;
}
for (auto const& out: res.outs)
{
outputs.push_back(output_data_t{});
auto& out_data = outputs.back();
out_data.pubkey = out.key;
out_data.height = out.height;
// out_data.unlock_time = ??? not set
// out_data.commitment = ?? not set
}
return true;
}
bool
RPCCalls::get_tx_amount_output_indices(crypto::hash const& tx_hash,
vector<uint64_t>& out_indices)
{
COMMAND_RPC_GET_TX_GLOBAL_OUTPUTS_INDEXES::request req;
COMMAND_RPC_GET_TX_GLOBAL_OUTPUTS_INDEXES::response res;
req.txid = tx_hash;
bool r {false};
{
std::lock_guard<std::mutex> guard(m_daemon_rpc_mutex);
r = epee::net_utils::invoke_http_bin(
"/get_o_indexes.bin",
req, res, m_http_client, rpc_timeout);
}
string error_msg;
if (!r || !check_if_response_is_ok(res, error_msg))
{
OMERROR << "Error getting output keys. "
<< error_msg;
return false;
}
out_indices = std::move(res.o_indexes);
return true;
}
template <typename Command>
bool
RPCCalls::check_if_response_is_ok(
@ -155,4 +335,22 @@ bool RPCCalls::check_if_response_is_ok<
COMMAND_RPC_GET_HEIGHT::response>(
COMMAND_RPC_GET_HEIGHT::response const& res,
string& error_msg) const;
template
bool RPCCalls::check_if_response_is_ok<
COMMAND_RPC_GET_BLOCKS_BY_HEIGHT::response>(
COMMAND_RPC_GET_BLOCKS_BY_HEIGHT::response const& res,
string& error_msg) const;
template
bool RPCCalls::check_if_response_is_ok<
COMMAND_RPC_GET_OUTPUTS_BIN::response>(
COMMAND_RPC_GET_OUTPUTS_BIN::response const& res,
string& error_msg) const;
template
bool RPCCalls::check_if_response_is_ok<
COMMAND_RPC_GET_TX_GLOBAL_OUTPUTS_INDEXES::response>(
COMMAND_RPC_GET_TX_GLOBAL_OUTPUTS_INDEXES::response const& res,
string& error_msg) const;
}

@ -55,6 +55,22 @@ public:
virtual bool
get_current_height(uint64_t& current_height);
virtual bool
get_blocks(vector<uint64_t> blk_heights,
COMMAND_RPC_GET_BLOCKS_BY_HEIGHT::response& res);
std::vector<pair<block, vector<transaction>>>
get_blocks_range(uint64_t h1, uint64_t h2);
bool
get_output_keys(uint64_t amount,
vector<uint64_t> const& absolute_offsets,
vector<output_data_t>& outputs);
bool
get_tx_amount_output_indices(crypto::hash const& tx_hash,
vector<uint64_t>& out_indices);
virtual ~RPCCalls() = default;
protected:

@ -69,40 +69,51 @@ TxSearch::operator()()
searching_is_ongoing = true;
//auto rpc = std::make_unique<xmreg::RPCCalls>(
// current_bc_status->get_bc_setup().deamon_url);
// we put everything in massive catch, as there are plenty ways in which
// an exceptions can be thrown here. Mostly from mysql.
// but because this is detatch thread, we cant catch them in main thread.
// but because this is thread, we cant catch them in main thread.
// program will blow up if exception is thrown. need to handle exceptions
// here.
try
{
while(continue_search)
{
{
seconds loop_timestamp {current_timestamp};
uint64_t last_block_height = current_bc_status->current_height;
// cout << "new searched_blk_no:" << searched_blk_no << '\n';
uint64_t h1 = searched_blk_no;
uint64_t h2 = std::min(h1 + blocks_lookahead - 1, last_block_height);
//uint64_t h2 = std::min(h1 + blocks_lookahead - 1, last_block_height);
uint64_t h2 = std::min(h1 + blocks_lookahead, last_block_height+1);
//vector<block> blocks = current_bc_status->get_blocks_range(h1, h2);
auto blocks = current_bc_status->get_blocks_and_txs_range(h1, h2);
//auto blocks = rpc->get_blocks_range(h1, h2);
vector<block> blocks = current_bc_status->get_blocks_range(h1, h2);
//cout << "h1,h2, no_blocks:" << h1 << "," << h2 << "," << blocks.size() << '\n';
if (blocks.empty())
{
if (h1 <= h2)
if (h1 < h2)
{
OMERROR << address_prefix
<< ": cant get blocks from " << h1
<< " to " << h2;
<< ": cant get blocks from [" << h1
<< ", " << h2 << ")!";
stop();
}
else
{
OMINFO << address_prefix
<< ": waiting for new block. "
"Last scanned was " << h2;
"Last scanned was " << h2 - 1;
}
std::this_thread::sleep_for(
@ -137,24 +148,24 @@ TxSearch::operator()()
}
OMINFO << address_prefix + ": analyzing "
<< blocks.size() << " blocks from "
<< h1 << " to " << h2
<< " out of " << last_block_height << " blocks";
vector<crypto::hash> txs_hashes_from_blocks;
vector<transaction> txs_in_blocks;
vector<CurrentBlockchainStatus::txs_tuple_t> txs_data;
if (!current_bc_status->get_txs_in_blocks(blocks,
txs_hashes_from_blocks,
txs_in_blocks,
txs_data))
{
OMERROR << address_prefix
+ ": cant get tx in blocks from "
<< h1 << " to " << h2;
return;
}
<< blocks.size() << " blocks ["
<< h1 << ", " << h2
<< ") out of " << last_block_height;
//vector<crypto::hash> txs_hashes_from_blocks;
//vector<transaction> txs_in_blocks;
//vector<CurrentBlockchainStatus::txs_tuple_t> txs_data;
//if (!current_bc_status->get_txs_in_blocks(blocks,
//txs_hashes_from_blocks,
//txs_in_blocks,
//txs_data))
//{
//OMERROR << address_prefix
//+ ": cant get tx in blocks from "
//<< h1 << " to " << h2;
//return;
//}
// we will only create mysql DateTime object once, anything is found
// in a given block;
@ -179,13 +190,17 @@ TxSearch::operator()()
size_t tx_idx {0};
for (auto const& tx_tuple: txs_data)
uint64_t blk_height {h1};
for (auto const& blk_pair: blocks)
{
crypto::hash const& tx_hash = txs_hashes_from_blocks[tx_idx];
transaction const& tx = txs_in_blocks[tx_idx++];
uint64_t blk_height = std::get<0>(tx_tuple);
uint64_t blk_timestamp = std::get<1>(tx_tuple);
bool is_coinbase = std::get<2>(tx_tuple);
auto blk_timestamp = blk_pair.first.timestamp;
for (auto const& tx: blk_pair.second)
{
//continue;
auto tx_hash = get_transaction_hash(tx);
auto is_coinbase = cryptonote::is_coinbase(tx);
//cout << "\n\n\n" << blk_height << '\n';
@ -258,14 +273,14 @@ TxSearch::operator()()
}
if (!current_bc_status->tx_exist(tx_hash, blockchain_tx_id))
{
OMERROR << "Tx " << oi_identification.get_tx_hash_str()
<< " " << pod_to_hex(tx_hash)
<< " not found in blockchain !";
throw TxSearchException("Cant get tx from blockchain: "
+ pod_to_hex(tx_hash));
}
//if (!current_bc_status->tx_exist(tx_hash, blockchain_tx_id))
//{
//OMERROR << "Tx " << oi_identification.get_tx_hash_str()
//<< " " << pod_to_hex(tx_hash)
//<< " not found in blockchain !";
//throw TxSearchException("Cant get tx from blockchain: "
//+ pod_to_hex(tx_hash));
//}
OMINFO << address_prefix
+ ": found some outputs in block "
@ -574,6 +589,10 @@ TxSearch::operator()()
if (mysql_transaction)
mysql_transaction->commit();
} // end txs
++blk_height;
} // for (auto const& tx_pair: txs_map)
// update scanned_block_height every given interval
@ -581,10 +600,10 @@ TxSearch::operator()()
XmrAccount updated_acc = *acc;
updated_acc.scanned_block_height = h2;
updated_acc.scanned_block_height = h2 - 1;
updated_acc.scanned_block_timestamp
= DateTime(static_cast<time_t>(
blocks.back().timestamp));
blocks.back().first.timestamp));
if (xmr_accounts->update(*acc, updated_acc))
{
@ -594,8 +613,14 @@ TxSearch::operator()()
}
//current_timestamp = loop_timestamp;
// update this only when this variable is false
// otherwise a new search block value can
// be overwritten to h2, instead of the new value
if (!searched_block_got_updated)
searched_blk_no = h2;
searched_blk_no = h2 + 1;
searched_block_got_updated = false;
} // while(continue_search)
@ -632,6 +657,7 @@ void
TxSearch::set_searched_blk_no(uint64_t new_value)
{
searched_blk_no = new_value;
searched_block_got_updated = true;
}
uint64_t
@ -692,7 +718,7 @@ void
TxSearch::find_txs_in_mempool(
TxSearch::pool_txs_t mempool_txs,
json* j_transactions)
{
{
*j_transactions = json::array();
@ -912,5 +938,4 @@ TxSearch::delete_existing_tx_if_exists(string const& tx_hash)
// default value of static veriables
seconds TxSearch::thread_search_life {600};
}

@ -46,11 +46,16 @@ private:
static seconds thread_search_life;
// indicate that a thread loop should keep running
bool continue_search {true};
atomic<bool> continue_search {true};
// this acctually indicates whether thread loop finished
// its execution
bool searching_is_ongoing {false};
atomic<bool> searching_is_ongoing {false};
// marked true when we set new searched block value
// from other thread. for example, when we import account
// we set it to 0
atomic<bool> searched_block_got_updated {false};
// to store last exception thrown in the search thread
// using this, a main thread can get info what went wrong here

Loading…
Cancel
Save