/// @file /// @author rfree (current maintainer/user in monero.cc project - most of code is from CryptoNote) /// @brief This is the original cryptonote protocol network-events handler, modified by us // Copyright (c) 2014-2022, The Monero Project // // All rights reserved. // // Redistribution and use in source and binary forms, with or without modification, are // permitted provided that the following conditions are met: // // 1. Redistributions of source code must retain the above copyright notice, this list of // conditions and the following disclaimer. // // 2. Redistributions in binary form must reproduce the above copyright notice, this list // of conditions and the following disclaimer in the documentation and/or other // materials provided with the distribution. // // 3. Neither the name of the copyright holder nor the names of its contributors may be // used to endorse or promote products derived from this software without specific // prior written permission. // // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF // MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL // THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, // STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF // THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // // Parts of this file are originally copyright (c) 2012-2013 The Cryptonote developers // (may contain code and/or modifications by other developers) // developer rfree: this code is caller of our new network code, and is modded; e.g. for rate limiting #include #include #include "cryptonote_basic/cryptonote_format_utils.h" #include "profile_tools.h" #include "net/network_throttle-detail.hpp" #include "common/pruning.h" #include "common/util.h" #undef MONERO_DEFAULT_LOG_CATEGORY #define MONERO_DEFAULT_LOG_CATEGORY "net.cn" #define MLOG_P2P_MESSAGE(x) MCINFO("net.p2p.msg", context << x) #define MLOGIF_P2P_MESSAGE(init, test, x) \ do { \ const auto level = el::Level::Info; \ const char *cat = "net.p2p.msg"; \ if (ELPP->vRegistry()->allowed(level, cat)) { \ init; \ if (test) \ el::base::Writer(level, el::Color::Default, __FILE__, __LINE__, ELPP_FUNC, el::base::DispatchAction::NormalLog).construct(cat) << x; \ } \ } while(0) #define MLOG_PEER_STATE(x) \ MCINFO(MONERO_DEFAULT_LOG_CATEGORY, context << "[" << epee::string_tools::to_string_hex(context.m_pruning_seed) << "] state: " << x << " in state " << cryptonote::get_protocol_state_string(context.m_state)) #define BLOCK_QUEUE_NSPANS_THRESHOLD 10 // chunks of N blocks #define BLOCK_QUEUE_SIZE_THRESHOLD (100*1024*1024) // MB #define BLOCK_QUEUE_FORCE_DOWNLOAD_NEAR_BLOCKS 1000 #define REQUEST_NEXT_SCHEDULED_SPAN_THRESHOLD_STANDBY (5 * 1000000) // microseconds #define REQUEST_NEXT_SCHEDULED_SPAN_THRESHOLD (30 * 1000000) // microseconds #define IDLE_PEER_KICK_TIME (240 * 1000000) // microseconds #define NON_RESPONSIVE_PEER_KICK_TIME (20 * 1000000) // microseconds #define PASSIVE_PEER_KICK_TIME (60 * 1000000) // microseconds #define DROP_ON_SYNC_WEDGE_THRESHOLD (30 * 1000000000ull) // nanoseconds #define LAST_ACTIVITY_STALL_THRESHOLD (2.0f) // seconds #define DROP_PEERS_ON_SCORE -2 namespace cryptonote { //----------------------------------------------------------------------------------------------------------------------- template t_cryptonote_protocol_handler::t_cryptonote_protocol_handler(t_core& rcore, nodetool::i_p2p_endpoint* p_net_layout, bool offline):m_core(rcore), m_p2p(p_net_layout), m_syncronized_connections_count(0), m_synchronized(offline), m_ask_for_txpool_complement(true), m_stopping(false), m_no_sync(false) { if(!m_p2p) m_p2p = &m_p2p_stub; } //----------------------------------------------------------------------------------------------------------------------- template bool t_cryptonote_protocol_handler::init(const boost::program_options::variables_map& vm) { m_sync_timer.pause(); m_sync_timer.reset(); m_add_timer.pause(); m_add_timer.reset(); m_last_add_end_time = 0; m_sync_spans_downloaded = 0; m_sync_old_spans_downloaded = 0; m_sync_bad_spans_downloaded = 0; m_sync_download_chain_size = 0; m_sync_download_objects_size = 0; m_block_download_max_size = command_line::get_arg(vm, cryptonote::arg_block_download_max_size); m_sync_pruned_blocks = command_line::get_arg(vm, cryptonote::arg_sync_pruned_blocks); return true; } //------------------------------------------------------------------------------------------------------------------------ template bool t_cryptonote_protocol_handler::deinit() { return true; } //------------------------------------------------------------------------------------------------------------------------ template void t_cryptonote_protocol_handler::set_p2p_endpoint(nodetool::i_p2p_endpoint* p2p) { if(p2p) m_p2p = p2p; else m_p2p = &m_p2p_stub; } //------------------------------------------------------------------------------------------------------------------------ template bool t_cryptonote_protocol_handler::on_callback(cryptonote_connection_context& context) { LOG_PRINT_CCONTEXT_L2("callback fired"); CHECK_AND_ASSERT_MES_CC( context.m_callback_request_count > 0, false, "false callback fired, but context.m_callback_request_count=" << context.m_callback_request_count); --context.m_callback_request_count; uint32_t notified = true; if (context.m_idle_peer_notification.compare_exchange_strong(notified, not notified)) { if (context.m_state == cryptonote_connection_context::state_synchronizing && context.m_last_request_time != boost::date_time::not_a_date_time) { const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); const boost::posix_time::time_duration dt = now - context.m_last_request_time; const auto ms = dt.total_microseconds(); if (ms > IDLE_PEER_KICK_TIME || (context.m_expect_response && ms > NON_RESPONSIVE_PEER_KICK_TIME)) { if (context.m_score-- >= 0) { MINFO(context << " kicking idle peer, last update " << (dt.total_microseconds() / 1.e6) << " seconds ago, expecting " << (int)context.m_expect_response); context.m_last_request_time = boost::date_time::not_a_date_time; context.m_expect_response = 0; context.m_expect_height = 0; context.m_requested_objects.clear(); context.m_state = cryptonote_connection_context::state_standby; // we'll go back to adding, then (if we can't), download } else { MINFO(context << "dropping idle peer with negative score"); drop_connection_with_score(context, context.m_expect_response == 0 ? 1 : 5, false); return false; } } } } notified = true; if (context.m_new_stripe_notification.compare_exchange_strong(notified, not notified)) { if (context.m_state == cryptonote_connection_context::state_normal) context.m_state = cryptonote_connection_context::state_synchronizing; } if(context.m_state == cryptonote_connection_context::state_synchronizing && context.m_last_request_time == boost::posix_time::not_a_date_time) { NOTIFY_REQUEST_CHAIN::request r = {}; context.m_needed_objects.clear(); context.m_expect_height = m_core.get_current_blockchain_height(); m_core.get_short_chain_history(r.block_ids); handler_request_blocks_history( r.block_ids ); // change the limit(?), sleep(?) r.prune = m_sync_pruned_blocks; context.m_last_request_time = boost::posix_time::microsec_clock::universal_time(); context.m_expect_response = NOTIFY_RESPONSE_CHAIN_ENTRY::ID; MLOG_P2P_MESSAGE("-->>NOTIFY_REQUEST_CHAIN: m_block_ids.size()=" << r.block_ids.size() ); post_notify(r, context); MLOG_PEER_STATE("requesting chain"); } else if(context.m_state == cryptonote_connection_context::state_standby) { context.m_state = cryptonote_connection_context::state_synchronizing; try_add_next_blocks(context); } return true; } //------------------------------------------------------------------------------------------------------------------------ template void t_cryptonote_protocol_handler::log_connections() { std::stringstream ss; ss.precision(1); double down_sum = 0.0; double down_curr_sum = 0.0; double up_sum = 0.0; double up_curr_sum = 0.0; ss << std::setw(30) << std::left << "Remote Host" << std::setw(20) << "Peer id" << std::setw(20) << "Support Flags" << std::setw(30) << "Recv/Sent (inactive,sec)" << std::setw(25) << "State" << std::setw(20) << "Livetime(sec)" << std::setw(12) << "Down (kB/s)" << std::setw(14) << "Down(now)" << std::setw(10) << "Up (kB/s)" << std::setw(13) << "Up(now)" << ENDL; m_p2p->for_each_connection([&](const connection_context& cntxt, nodetool::peerid_type peer_id, uint32_t support_flags) { bool local_ip = cntxt.m_remote_address.is_local(); auto connection_time = time(NULL) - cntxt.m_started; ss << std::setw(30) << std::left << std::string(cntxt.m_is_income ? " [INC]":"[OUT]") + cntxt.m_remote_address.str() << std::setw(20) << nodetool::peerid_to_string(peer_id) << std::setw(20) << std::hex << support_flags << std::setw(30) << std::to_string(cntxt.m_recv_cnt)+ "(" + std::to_string(time(NULL) - cntxt.m_last_recv) + ")" + "/" + std::to_string(cntxt.m_send_cnt) + "(" + std::to_string(time(NULL) - cntxt.m_last_send) + ")" << std::setw(25) << get_protocol_state_string(cntxt.m_state) << std::setw(20) << std::to_string(time(NULL) - cntxt.m_started) << std::setw(12) << std::fixed << (connection_time == 0 ? 0.0 : cntxt.m_recv_cnt / connection_time / 1024) << std::setw(14) << std::fixed << cntxt.m_current_speed_down / 1024 << std::setw(10) << std::fixed << (connection_time == 0 ? 0.0 : cntxt.m_send_cnt / connection_time / 1024) << std::setw(13) << std::fixed << cntxt.m_current_speed_up / 1024 << (local_ip ? "[LAN]" : "") << std::left << (cntxt.m_remote_address.is_loopback() ? "[LOCALHOST]" : "") // 127.0.0.1 << ENDL; if (connection_time > 1) { down_sum += (cntxt.m_recv_cnt / connection_time / 1024); up_sum += (cntxt.m_send_cnt / connection_time / 1024); } down_curr_sum += (cntxt.m_current_speed_down / 1024); up_curr_sum += (cntxt.m_current_speed_up / 1024); return true; }); ss << ENDL << std::setw(125) << " " << std::setw(12) << down_sum << std::setw(14) << down_curr_sum << std::setw(10) << up_sum << std::setw(13) << up_curr_sum << ENDL; LOG_PRINT_L0("Connections: " << ENDL << ss.str()); } //------------------------------------------------------------------------------------------------------------------------ // Returns a list of connection_info objects describing each open p2p connection //------------------------------------------------------------------------------------------------------------------------ template std::list t_cryptonote_protocol_handler::get_connections() { std::list connections; m_p2p->for_each_connection([&](const connection_context& cntxt, nodetool::peerid_type peer_id, uint32_t support_flags) { connection_info cnx; auto timestamp = time(NULL); cnx.incoming = cntxt.m_is_income ? true : false; cnx.address = cntxt.m_remote_address.str(); cnx.host = cntxt.m_remote_address.host_str(); cnx.ip = ""; cnx.port = ""; if (cntxt.m_remote_address.get_type_id() == epee::net_utils::ipv4_network_address::get_type_id()) { cnx.ip = cnx.host; cnx.port = std::to_string(cntxt.m_remote_address.as().port()); } cnx.rpc_port = cntxt.m_rpc_port; cnx.rpc_credits_per_hash = cntxt.m_rpc_credits_per_hash; cnx.peer_id = nodetool::peerid_to_string(peer_id); cnx.support_flags = support_flags; cnx.recv_count = cntxt.m_recv_cnt; cnx.recv_idle_time = timestamp - std::max(cntxt.m_started, cntxt.m_last_recv); cnx.send_count = cntxt.m_send_cnt; cnx.send_idle_time = timestamp - std::max(cntxt.m_started, cntxt.m_last_send); cnx.state = get_protocol_state_string(cntxt.m_state); cnx.live_time = timestamp - cntxt.m_started; cnx.localhost = cntxt.m_remote_address.is_loopback(); cnx.local_ip = cntxt.m_remote_address.is_local(); auto connection_time = time(NULL) - cntxt.m_started; if (connection_time == 0) { cnx.avg_download = 0; cnx.avg_upload = 0; } else { cnx.avg_download = cntxt.m_recv_cnt / connection_time / 1024; cnx.avg_upload = cntxt.m_send_cnt / connection_time / 1024; } cnx.current_download = cntxt.m_current_speed_down / 1024; cnx.current_upload = cntxt.m_current_speed_up / 1024; cnx.connection_id = epee::string_tools::pod_to_hex(cntxt.m_connection_id); cnx.ssl = cntxt.m_ssl; cnx.height = cntxt.m_remote_blockchain_height; cnx.pruning_seed = cntxt.m_pruning_seed; cnx.address_type = (uint8_t)cntxt.m_remote_address.get_type_id(); connections.push_back(cnx); return true; }); return connections; } //------------------------------------------------------------------------------------------------------------------------ template bool t_cryptonote_protocol_handler::process_payload_sync_data(const CORE_SYNC_DATA& hshd, cryptonote_connection_context& context, bool is_inital) { if(context.m_state == cryptonote_connection_context::state_before_handshake && !is_inital) return true; if(context.m_state == cryptonote_connection_context::state_synchronizing) return true; // from v6, if the peer advertises a top block version, reject if it's not what it should be (will only work if no voting) if (hshd.current_height > 0) { const uint8_t version = m_core.get_ideal_hard_fork_version(hshd.current_height - 1); if (version >= 6 && version != hshd.top_version) { if (version < hshd.top_version && version == m_core.get_ideal_hard_fork_version()) MDEBUG(context << " peer claims higher version than we think (" << (unsigned)hshd.top_version << " for " << (hshd.current_height - 1) << " instead of " << (unsigned)version << ") - we may be forked from the network and a software upgrade may be needed, or that peer is broken or malicious"); return false; } } // reject weird pruning schemes if (hshd.pruning_seed) { const uint32_t log_stripes = tools::get_pruning_log_stripes(hshd.pruning_seed); if (log_stripes != CRYPTONOTE_PRUNING_LOG_STRIPES || tools::get_pruning_stripe(hshd.pruning_seed) > (1u << log_stripes)) { MWARNING(context << " peer claim unexpected pruning seed " << epee::string_tools::to_string_hex(hshd.pruning_seed) << ", disconnecting"); return false; } } if (hshd.current_height < context.m_remote_blockchain_height) { MINFO(context << "Claims " << hshd.current_height << ", claimed " << context.m_remote_blockchain_height << " before"); hit_score(context, 1); } context.m_remote_blockchain_height = hshd.current_height; context.m_pruning_seed = hshd.pruning_seed; uint64_t target = m_core.get_target_blockchain_height(); if (target == 0) target = m_core.get_current_blockchain_height(); if(m_core.have_block(hshd.top_id)) { context.m_state = cryptonote_connection_context::state_normal; if(is_inital && hshd.current_height >= target && target == m_core.get_current_blockchain_height()) on_connection_synchronized(); return true; } // No chain synchronization over hidden networks (tor, i2p, etc.) if(context.m_remote_address.get_zone() != epee::net_utils::zone::public_) { context.m_state = cryptonote_connection_context::state_normal; return true; } if (hshd.current_height > target) { /* As I don't know if accessing hshd from core could be a good practice, I prefer pushing target height to the core at the same time it is pushed to the user. Nz. */ int64_t diff = static_cast(hshd.current_height) - static_cast(m_core.get_current_blockchain_height()); uint64_t abs_diff = std::abs(diff); uint64_t max_block_height = std::max(hshd.current_height,m_core.get_current_blockchain_height()); uint64_t last_block_v1 = m_core.get_nettype() == TESTNET ? 624633 : m_core.get_nettype() == MAINNET ? 1009826 : (uint64_t)-1; uint64_t diff_v2 = max_block_height > last_block_v1 ? std::min(abs_diff, max_block_height - last_block_v1) : 0; MCLOG(is_inital ? el::Level::Info : el::Level::Debug, "global", el::Color::Yellow, context << "Sync data returned a new top block candidate: " << m_core.get_current_blockchain_height() << " -> " << hshd.current_height << " [Your node is " << abs_diff << " blocks (" << tools::get_human_readable_timespan((abs_diff - diff_v2) * DIFFICULTY_TARGET_V1 + diff_v2 * DIFFICULTY_TARGET_V2) << ") " << (0 <= diff ? std::string("behind") : std::string("ahead")) << "] " << ENDL << "SYNCHRONIZATION started"); if (hshd.current_height >= m_core.get_current_blockchain_height() + 5) // don't switch to unsafe mode just for a few blocks { m_core.safesyncmode(false); } if (m_core.get_target_blockchain_height() == 0) // only when sync starts { m_sync_timer.resume(); m_sync_timer.reset(); m_add_timer.pause(); m_add_timer.reset(); m_last_add_end_time = 0; m_sync_spans_downloaded = 0; m_sync_old_spans_downloaded = 0; m_sync_bad_spans_downloaded = 0; m_sync_download_chain_size = 0; m_sync_download_objects_size = 0; } m_core.set_target_blockchain_height((hshd.current_height)); } MINFO(context << "Remote blockchain height: " << hshd.current_height << ", id: " << hshd.top_id); if (m_no_sync) { context.m_state = cryptonote_connection_context::state_normal; return true; } context.m_state = cryptonote_connection_context::state_synchronizing; //let the socket to send response to handshake, but request callback, to let send request data after response LOG_PRINT_CCONTEXT_L2("requesting callback"); ++context.m_callback_request_count; m_p2p->request_callback(context); MLOG_PEER_STATE("requesting callback"); context.m_num_requested = 0; return true; } //------------------------------------------------------------------------------------------------------------------------ template bool t_cryptonote_protocol_handler::get_payload_sync_data(CORE_SYNC_DATA& hshd) { m_core.get_blockchain_top(hshd.current_height, hshd.top_id); hshd.top_version = m_core.get_ideal_hard_fork_version(hshd.current_height); difficulty_type wide_cumulative_difficulty = m_core.get_block_cumulative_difficulty(hshd.current_height); hshd.cumulative_difficulty = (wide_cumulative_difficulty & 0xffffffffffffffff).convert_to(); hshd.cumulative_difficulty_top64 = ((wide_cumulative_difficulty >> 64) & 0xffffffffffffffff).convert_to(); hshd.current_height +=1; hshd.pruning_seed = m_core.get_blockchain_pruning_seed(); return true; } //------------------------------------------------------------------------------------------------------------------------ template bool t_cryptonote_protocol_handler::get_payload_sync_data(epee::byte_slice& data) { CORE_SYNC_DATA hsd = {}; get_payload_sync_data(hsd); epee::serialization::store_t_to_binary(hsd, data); return true; } //------------------------------------------------------------------------------------------------------------------------ template int t_cryptonote_protocol_handler::handle_notify_new_block(int command, NOTIFY_NEW_BLOCK::request& arg, cryptonote_connection_context& context) { MLOGIF_P2P_MESSAGE(crypto::hash hash; cryptonote::block b; bool ret = cryptonote::parse_and_validate_block_from_blob(arg.b.block, b, &hash);, ret, context << "Received NOTIFY_NEW_BLOCK " << hash << " (height " << arg.current_blockchain_height << ", " << arg.b.txs.size() << " txes)"); if(context.m_state != cryptonote_connection_context::state_normal) return 1; if(!is_synchronized()) // can happen if a peer connection goes to normal but another thread still hasn't finished adding queued blocks { LOG_DEBUG_CC(context, "Received new block while syncing, ignored"); return 1; } m_core.pause_mine(); std::vector blocks; blocks.push_back(arg.b); std::vector pblocks; if (!m_core.prepare_handle_incoming_blocks(blocks, pblocks)) { LOG_PRINT_CCONTEXT_L1("Block verification failed: prepare_handle_incoming_blocks failed, dropping connection"); drop_connection(context, false, false); m_core.resume_mine(); return 1; } for(auto tx_blob_it = arg.b.txs.begin(); tx_blob_it!=arg.b.txs.end();tx_blob_it++) { cryptonote::tx_verification_context tvc = AUTO_VAL_INIT(tvc); m_core.handle_incoming_tx(*tx_blob_it, tvc, relay_method::block, true); if(tvc.m_verifivation_failed) { LOG_PRINT_CCONTEXT_L1("Block verification failed: transaction verification failed, dropping connection"); drop_connection(context, false, false); m_core.cleanup_handle_incoming_blocks(); m_core.resume_mine(); return 1; } } block_verification_context bvc = {}; m_core.handle_incoming_block(arg.b.block, pblocks.empty() ? NULL : &pblocks[0], bvc); // got block from handle_notify_new_block if (!m_core.cleanup_handle_incoming_blocks(true)) { LOG_PRINT_CCONTEXT_L0("Failure in cleanup_handle_incoming_blocks"); m_core.resume_mine(); return 1; } m_core.resume_mine(); if(bvc.m_verifivation_failed) { LOG_PRINT_CCONTEXT_L0("Block verification failed, dropping connection"); drop_connection_with_score(context, bvc.m_bad_pow ? P2P_IP_FAILS_BEFORE_BLOCK : 1, false); return 1; } if(bvc.m_added_to_main_chain) { //TODO: Add here announce protocol usage relay_block(arg, context); }else if(bvc.m_marked_as_orphaned) { context.m_needed_objects.clear(); context.m_state = cryptonote_connection_context::state_synchronizing; NOTIFY_REQUEST_CHAIN::request r = {}; context.m_expect_height = m_core.get_current_blockchain_height(); m_core.get_short_chain_history(r.block_ids); r.prune = m_sync_pruned_blocks; handler_request_blocks_history( r.block_ids ); // change the limit(?), sleep(?) context.m_last_request_time = boost::posix_time::microsec_clock::universal_time(); context.m_expect_response = NOTIFY_RESPONSE_CHAIN_ENTRY::ID; MLOG_P2P_MESSAGE("-->>NOTIFY_REQUEST_CHAIN: m_block_ids.size()=" << r.block_ids.size() ); post_notify(r, context); MLOG_PEER_STATE("requesting chain"); } // load json & DNS checkpoints every 10min/hour respectively, // and verify them with respect to what blocks we already have CHECK_AND_ASSERT_MES(m_core.update_checkpoints(), 1, "One or more checkpoints loaded from json or dns conflicted with existing checkpoints."); return 1; } //------------------------------------------------------------------------------------------------------------------------ template int t_cryptonote_protocol_handler::handle_notify_new_fluffy_block(int command, NOTIFY_NEW_FLUFFY_BLOCK::request& arg, cryptonote_connection_context& context) { MLOGIF_P2P_MESSAGE(crypto::hash hash; cryptonote::block b; bool ret = cryptonote::parse_and_validate_block_from_blob(arg.b.block, b, &hash);, ret, context << "Received NOTIFY_NEW_FLUFFY_BLOCK " << hash << " (height " << arg.current_blockchain_height << ", " << arg.b.txs.size() << " txes)"); if(context.m_state != cryptonote_connection_context::state_normal) return 1; if(!is_synchronized()) // can happen if a peer connection goes to normal but another thread still hasn't finished adding queued blocks { LOG_DEBUG_CC(context, "Received new block while syncing, ignored"); return 1; } m_core.pause_mine(); block new_block; transaction miner_tx; if(parse_and_validate_block_from_blob(arg.b.block, new_block)) { // This is a second notification, we must have asked for some missing tx if(!context.m_requested_objects.empty()) { // What we asked for != to what we received .. if(context.m_requested_objects.size() != arg.b.txs.size()) { LOG_ERROR_CCONTEXT ( "NOTIFY_NEW_FLUFFY_BLOCK -> request/response mismatch, " << "block = " << epee::string_tools::pod_to_hex(get_blob_hash(arg.b.block)) << ", requested = " << context.m_requested_objects.size() << ", received = " << new_block.tx_hashes.size() << ", dropping connection" ); drop_connection(context, false, false); m_core.resume_mine(); return 1; } } std::vector have_tx; have_tx.reserve(new_block.tx_hashes.size()); // Instead of requesting missing transactions by hash like BTC, // we do it by index (thanks to a suggestion from moneromooo) because // we're way cooler .. and also because they're smaller than hashes. // // Also, remember to pepper some whitespace changes around to bother // moneromooo ... only because I <3 him. std::vector need_tx_indices; need_tx_indices.reserve(new_block.tx_hashes.size()); transaction tx; crypto::hash tx_hash; for(auto& tx_blob: arg.b.txs) { if(parse_and_validate_tx_from_blob(tx_blob.blob, tx)) { try { if(!get_transaction_hash(tx, tx_hash)) { LOG_PRINT_CCONTEXT_L1 ( "NOTIFY_NEW_FLUFFY_BLOCK: get_transaction_hash failed" << ", dropping connection" ); drop_connection(context, false, false); m_core.resume_mine(); return 1; } } catch(...) { LOG_PRINT_CCONTEXT_L1 ( "NOTIFY_NEW_FLUFFY_BLOCK: get_transaction_hash failed" << ", exception thrown" << ", dropping connection" ); drop_connection(context, false, false); m_core.resume_mine(); return 1; } // hijacking m_requested objects in connection context to patch up // a possible DOS vector pointed out by @monero-moo where peers keep // sending (0...n-1) transactions. // If requested objects is not empty, then we must have asked for // some missing transacionts, make sure that they're all there. // // Can I safely re-use this field? I think so, but someone check me! if(!context.m_requested_objects.empty()) { auto req_tx_it = context.m_requested_objects.find(tx_hash); if(req_tx_it == context.m_requested_objects.end()) { LOG_ERROR_CCONTEXT ( "Peer sent wrong transaction (NOTIFY_NEW_FLUFFY_BLOCK): " << "transaction with id = " << tx_hash << " wasn't requested, " << "dropping connection" ); drop_connection(context, false, false); m_core.resume_mine(); return 1; } context.m_requested_objects.erase(req_tx_it); } // we might already have the tx that the peer // sent in our pool, so don't verify again.. if(!m_core.pool_has_tx(tx_hash)) { MDEBUG("Incoming tx " << tx_hash << " not in pool, adding"); cryptonote::tx_verification_context tvc = AUTO_VAL_INIT(tvc); if(!m_core.handle_incoming_tx(tx_blob, tvc, relay_method::block, true) || tvc.m_verifivation_failed) { LOG_PRINT_CCONTEXT_L1("Block verification failed: transaction verification failed, dropping connection"); drop_connection(context, false, false); m_core.resume_mine(); return 1; } // // future todo: // tx should only not be added to pool if verification failed, but // maybe in the future could not be added for other reasons // according to monero-moo so keep track of these separately .. // } } else { LOG_ERROR_CCONTEXT ( "sent wrong tx: failed to parse and validate transaction: " << epee::string_tools::buff_to_hex_nodelimer(tx_blob.blob) << ", dropping connection" ); drop_connection(context, false, false); m_core.resume_mine(); return 1; } } // The initial size equality check could have been fooled if the sender // gave us the number of transactions we asked for, but not the right // ones. This check make sure the transactions we asked for were the // ones we received. if(context.m_requested_objects.size()) { MERROR ( "NOTIFY_NEW_FLUFFY_BLOCK: peer sent the number of transaction requested" << ", but not the actual transactions requested" << ", context.m_requested_objects.size() = " << context.m_requested_objects.size() << ", dropping connection" ); drop_connection(context, false, false); m_core.resume_mine(); return 1; } size_t tx_idx = 0; for(auto& tx_hash: new_block.tx_hashes) { cryptonote::blobdata txblob; if(m_core.get_pool_transaction(tx_hash, txblob, relay_category::broadcasted)) { have_tx.push_back({txblob, crypto::null_hash}); } else { std::vector tx_ids; std::vector txes; std::vector missing; tx_ids.push_back(tx_hash); if (m_core.get_transactions(tx_ids, txes, missing) && missing.empty()) { if (txes.size() == 1) { have_tx.push_back({tx_to_blob(txes.front()), crypto::null_hash}); } else { MERROR("1 tx requested, none not found, but " << txes.size() << " returned"); m_core.resume_mine(); return 1; } } else { MDEBUG("Tx " << tx_hash << " not found in pool"); need_tx_indices.push_back(tx_idx); } } ++tx_idx; } if(!need_tx_indices.empty()) // drats, we don't have everything.. { // request non-mempool txs MDEBUG("We are missing " << need_tx_indices.size() << " txes for this fluffy block"); for (auto txidx: need_tx_indices) MDEBUG(" tx " << new_block.tx_hashes[txidx]); NOTIFY_REQUEST_FLUFFY_MISSING_TX::request missing_tx_req; missing_tx_req.block_hash = get_block_hash(new_block); missing_tx_req.current_blockchain_height = arg.current_blockchain_height; missing_tx_req.missing_tx_indices = std::move(need_tx_indices); m_core.resume_mine(); MLOG_P2P_MESSAGE("-->>NOTIFY_REQUEST_FLUFFY_MISSING_TX: missing_tx_indices.size()=" << missing_tx_req.missing_tx_indices.size() ); post_notify(missing_tx_req, context); } else // whoo-hoo we've got em all .. { MDEBUG("We have all needed txes for this fluffy block"); block_complete_entry b; b.block = arg.b.block; b.txs = have_tx; std::vector blocks; blocks.push_back(b); std::vector pblocks; if (!m_core.prepare_handle_incoming_blocks(blocks, pblocks)) { LOG_PRINT_CCONTEXT_L0("Failure in prepare_handle_incoming_blocks"); m_core.resume_mine(); return 1; } block_verification_context bvc = {}; m_core.handle_incoming_block(arg.b.block, pblocks.empty() ? NULL : &pblocks[0], bvc); // got block from handle_notify_new_block if (!m_core.cleanup_handle_incoming_blocks(true)) { LOG_PRINT_CCONTEXT_L0("Failure in cleanup_handle_incoming_blocks"); m_core.resume_mine(); return 1; } m_core.resume_mine(); if( bvc.m_verifivation_failed ) { LOG_PRINT_CCONTEXT_L0("Block verification failed, dropping connection"); drop_connection_with_score(context, bvc.m_bad_pow ? P2P_IP_FAILS_BEFORE_BLOCK : 1, false); return 1; } if( bvc.m_added_to_main_chain ) { //TODO: Add here announce protocol usage NOTIFY_NEW_BLOCK::request reg_arg = AUTO_VAL_INIT(reg_arg); reg_arg.current_blockchain_height = arg.current_blockchain_height; reg_arg.b = b; relay_block(reg_arg, context); } else if( bvc.m_marked_as_orphaned ) { context.m_needed_objects.clear(); context.m_state = cryptonote_connection_context::state_synchronizing; NOTIFY_REQUEST_CHAIN::request r = {}; context.m_expect_height = m_core.get_current_blockchain_height(); m_core.get_short_chain_history(r.block_ids); handler_request_blocks_history( r.block_ids ); // change the limit(?), sleep(?) r.prune = m_sync_pruned_blocks; context.m_last_request_time = boost::posix_time::microsec_clock::universal_time(); context.m_expect_response = NOTIFY_RESPONSE_CHAIN_ENTRY::ID; MLOG_P2P_MESSAGE("-->>NOTIFY_REQUEST_CHAIN: m_block_ids.size()=" << r.block_ids.size() ); post_notify(r, context); MLOG_PEER_STATE("requesting chain"); } // load json & DNS checkpoints every 10min/hour respectively, // and verify them with respect to what blocks we already have CHECK_AND_ASSERT_MES(m_core.update_checkpoints(), 1, "One or more checkpoints loaded from json or dns conflicted with existing checkpoints."); } } else { LOG_ERROR_CCONTEXT ( "sent wrong block: failed to parse and validate block: " << epee::string_tools::buff_to_hex_nodelimer(arg.b.block) << ", dropping connection" ); m_core.resume_mine(); drop_connection(context, false, false); return 1; } return 1; } //------------------------------------------------------------------------------------------------------------------------ template int t_cryptonote_protocol_handler::handle_request_fluffy_missing_tx(int command, NOTIFY_REQUEST_FLUFFY_MISSING_TX::request& arg, cryptonote_connection_context& context) { MLOG_P2P_MESSAGE("Received NOTIFY_REQUEST_FLUFFY_MISSING_TX (" << arg.missing_tx_indices.size() << " txes), block hash " << arg.block_hash); if (context.m_state == cryptonote_connection_context::state_before_handshake) { LOG_ERROR_CCONTEXT("Requested fluffy tx before handshake, dropping connection"); drop_connection(context, false, false); return 1; } std::vector> local_blocks; std::vector local_txs; block b; if (!m_core.get_block_by_hash(arg.block_hash, b)) { LOG_ERROR_CCONTEXT("failed to find block: " << arg.block_hash << ", dropping connection"); drop_connection(context, false, false); return 1; } std::vector txids; txids.reserve(b.tx_hashes.size()); NOTIFY_NEW_FLUFFY_BLOCK::request fluffy_response; fluffy_response.b.block = t_serializable_object_to_blob(b); fluffy_response.current_blockchain_height = arg.current_blockchain_height; std::vector seen(b.tx_hashes.size(), false); for(auto& tx_idx: arg.missing_tx_indices) { if(tx_idx < b.tx_hashes.size()) { MDEBUG(" tx " << b.tx_hashes[tx_idx]); if (seen[tx_idx]) { LOG_ERROR_CCONTEXT ( "Failed to handle request NOTIFY_REQUEST_FLUFFY_MISSING_TX" << ", request is asking for duplicate tx " << ", tx index = " << tx_idx << ", block tx count " << b.tx_hashes.size() << ", block_height = " << arg.current_blockchain_height << ", dropping connection" ); drop_connection(context, true, false); return 1; } txids.push_back(b.tx_hashes[tx_idx]); seen[tx_idx] = true; } else { LOG_ERROR_CCONTEXT ( "Failed to handle request NOTIFY_REQUEST_FLUFFY_MISSING_TX" << ", request is asking for a tx whose index is out of bounds " << ", tx index = " << tx_idx << ", block tx count " << b.tx_hashes.size() << ", block_height = " << arg.current_blockchain_height << ", dropping connection" ); drop_connection(context, false, false); return 1; } } std::vector txs; std::vector missed; if (!m_core.get_transactions(txids, txs, missed)) { LOG_ERROR_CCONTEXT("Failed to handle request NOTIFY_REQUEST_FLUFFY_MISSING_TX, " << "failed to get requested transactions"); drop_connection(context, false, false); return 1; } if (!missed.empty() || txs.size() != txids.size()) { LOG_ERROR_CCONTEXT("Failed to handle request NOTIFY_REQUEST_FLUFFY_MISSING_TX, " << missed.size() << " requested transactions not found" << ", dropping connection"); drop_connection(context, false, false); return 1; } for(auto& tx: txs) { fluffy_response.b.txs.push_back({t_serializable_object_to_blob(tx), crypto::null_hash}); } MLOG_P2P_MESSAGE ( "-->>NOTIFY_RESPONSE_FLUFFY_MISSING_TX: " << ", txs.size()=" << fluffy_response.b.txs.size() << ", rsp.current_blockchain_height=" << fluffy_response.current_blockchain_height ); post_notify(fluffy_response, context); return 1; } //------------------------------------------------------------------------------------------------------------------------ template int t_cryptonote_protocol_handler::handle_notify_get_txpool_complement(int command, NOTIFY_GET_TXPOOL_COMPLEMENT::request& arg, cryptonote_connection_context& context) { MLOG_P2P_MESSAGE("Received NOTIFY_GET_TXPOOL_COMPLEMENT (" << arg.hashes.size() << " txes)"); if(context.m_state != cryptonote_connection_context::state_normal) return 1; std::vector> local_blocks; std::vector local_txs; std::vector txes; if (!m_core.get_txpool_complement(arg.hashes, txes)) { LOG_ERROR_CCONTEXT("failed to get txpool complement"); return 1; } NOTIFY_NEW_TRANSACTIONS::request new_txes; new_txes.txs = std::move(txes); MLOG_P2P_MESSAGE ( "-->>NOTIFY_NEW_TRANSACTIONS: " << ", txs.size()=" << new_txes.txs.size() ); post_notify(new_txes, context); return 1; } //------------------------------------------------------------------------------------------------------------------------ template int t_cryptonote_protocol_handler::handle_notify_new_transactions(int command, NOTIFY_NEW_TRANSACTIONS::request& arg, cryptonote_connection_context& context) { MLOG_P2P_MESSAGE("Received NOTIFY_NEW_TRANSACTIONS (" << arg.txs.size() << " txes)"); for (const auto &blob: arg.txs) MLOGIF_P2P_MESSAGE(cryptonote::transaction tx; crypto::hash hash; bool ret = cryptonote::parse_and_validate_tx_from_blob(blob, tx, hash);, ret, "Including transaction " << hash); if(context.m_state != cryptonote_connection_context::state_normal) return 1; // while syncing, core will lock for a long time, so we ignore // those txes as they aren't really needed anyway, and avoid a // long block before replying if(!is_synchronized()) { LOG_DEBUG_CC(context, "Received new tx while syncing, ignored"); return 1; } /* If the txes were received over i2p/tor, the default is to "forward" with a randomized delay to further enhance the "white noise" behavior, potentially making it harder for ISP-level spies to determine which inbound link sent the tx. If the sender disabled "white noise" over i2p/tor, then the sender is "fluffing" (to only outbound) i2p/tor connections with the `dandelionpp_fluff` flag set. The receiver (hidden service) will immediately fluff in that scenario (i.e. this assumes that a sybil spy will be unable to link an IP to an i2p/tor connection). */ const epee::net_utils::zone zone = context.m_remote_address.get_zone(); relay_method tx_relay = zone == epee::net_utils::zone::public_ ? relay_method::stem : relay_method::forward; std::vector stem_txs{}; std::vector fluff_txs{}; if (arg.dandelionpp_fluff) { tx_relay = relay_method::fluff; fluff_txs.reserve(arg.txs.size()); } else stem_txs.reserve(arg.txs.size()); for (auto& tx : arg.txs) { tx_verification_context tvc{}; if (!m_core.handle_incoming_tx({tx, crypto::null_hash}, tvc, tx_relay, true)) { LOG_PRINT_CCONTEXT_L1("Tx verification failed, dropping connection"); drop_connection(context, false, false); return 1; } switch (tvc.m_relay) { case relay_method::local: case relay_method::stem: stem_txs.push_back(std::move(tx)); break; case relay_method::block: case relay_method::fluff: fluff_txs.push_back(std::move(tx)); break; default: case relay_method::forward: // not supposed to happen here case relay_method::none: break; } } if (!stem_txs.empty()) { //TODO: add announce usage here arg.dandelionpp_fluff = false; arg.txs = std::move(stem_txs); relay_transactions(arg, context.m_connection_id, context.m_remote_address.get_zone(), relay_method::stem); } if (!fluff_txs.empty()) { //TODO: add announce usage here arg.dandelionpp_fluff = true; arg.txs = std::move(fluff_txs); relay_transactions(arg, context.m_connection_id, context.m_remote_address.get_zone(), relay_method::fluff); } return 1; } //------------------------------------------------------------------------------------------------------------------------ template int t_cryptonote_protocol_handler::handle_request_get_objects(int command, NOTIFY_REQUEST_GET_OBJECTS::request& arg, cryptonote_connection_context& context) { if (context.m_state == cryptonote_connection_context::state_before_handshake) { LOG_ERROR_CCONTEXT("Requested objects before handshake, dropping connection"); drop_connection(context, false, false); return 1; } MLOG_P2P_MESSAGE("Received NOTIFY_REQUEST_GET_OBJECTS (" << arg.blocks.size() << " blocks)"); if (arg.blocks.size() > CURRENCY_PROTOCOL_MAX_OBJECT_REQUEST_COUNT) { LOG_ERROR_CCONTEXT( "Requested objects count is too big (" << arg.blocks.size() << ") expected not more then " << CURRENCY_PROTOCOL_MAX_OBJECT_REQUEST_COUNT); drop_connection(context, false, false); return 1; } NOTIFY_RESPONSE_GET_OBJECTS::request rsp; if(!m_core.handle_get_objects(arg, rsp, context)) { LOG_ERROR_CCONTEXT("failed to handle request NOTIFY_REQUEST_GET_OBJECTS, dropping connection"); drop_connection(context, false, false); return 1; } context.m_last_request_time = boost::posix_time::microsec_clock::universal_time(); MLOG_P2P_MESSAGE("-->>NOTIFY_RESPONSE_GET_OBJECTS: blocks.size()=" << rsp.blocks.size() << ", rsp.m_current_blockchain_height=" << rsp.current_blockchain_height << ", missed_ids.size()=" << rsp.missed_ids.size()); post_notify(rsp, context); //handler_response_blocks_now(sizeof(rsp)); // XXX //handler_response_blocks_now(200); return 1; } //------------------------------------------------------------------------------------------------------------------------ template double t_cryptonote_protocol_handler::get_avg_block_size() { CRITICAL_REGION_LOCAL(m_buffer_mutex); if (m_avg_buffer.empty()) { MWARNING("m_avg_buffer.size() == 0"); return 500; } double avg = 0; for (const auto &element : m_avg_buffer) avg += element; return avg / m_avg_buffer.size(); } template int t_cryptonote_protocol_handler::handle_response_get_objects(int command, NOTIFY_RESPONSE_GET_OBJECTS::request& arg, cryptonote_connection_context& context) { MLOG_P2P_MESSAGE("Received NOTIFY_RESPONSE_GET_OBJECTS (" << arg.blocks.size() << " blocks)"); MLOG_PEER_STATE("received objects"); boost::posix_time::ptime request_time = context.m_last_request_time; context.m_last_request_time = boost::date_time::not_a_date_time; if (context.m_expect_response != NOTIFY_RESPONSE_GET_OBJECTS::ID) { LOG_ERROR_CCONTEXT("Got NOTIFY_RESPONSE_GET_OBJECTS out of the blue, dropping connection"); drop_connection(context, true, false); return 1; } context.m_expect_response = 0; // calculate size of request size_t size = 0; size_t blocks_size = 0; for (const auto &element : arg.blocks) { blocks_size += element.block.size(); for (const auto &tx : element.txs) blocks_size += tx.blob.size(); } size += blocks_size; for (const auto &element : arg.missed_ids) size += sizeof(element.data); size += sizeof(arg.current_blockchain_height); { CRITICAL_REGION_LOCAL(m_buffer_mutex); m_avg_buffer.push_back(size); } ++m_sync_spans_downloaded; m_sync_download_objects_size += size; MDEBUG(context << " downloaded " << size << " bytes worth of blocks"); /*using namespace boost::chrono; auto point = steady_clock::now(); auto time_from_epoh = point.time_since_epoch(); auto sec = duration_cast< seconds >( time_from_epoh ).count();*/ //epee::net_utils::network_throttle_manager::get_global_throttle_inreq().logger_handle_net("log/dr-monero/net/req-all.data", sec, get_avg_block_size()); if(arg.blocks.empty()) { LOG_ERROR_CCONTEXT("sent wrong NOTIFY_HAVE_OBJECTS: no blocks"); drop_connection(context, true, false); ++m_sync_bad_spans_downloaded; return 1; } if(context.m_last_response_height > arg.current_blockchain_height) { LOG_ERROR_CCONTEXT("sent wrong NOTIFY_HAVE_OBJECTS: arg.m_current_blockchain_height=" << arg.current_blockchain_height << " < m_last_response_height=" << context.m_last_response_height << ", dropping connection"); drop_connection(context, false, false); ++m_sync_bad_spans_downloaded; return 1; } if (arg.current_blockchain_height < context.m_remote_blockchain_height) { MINFO(context << "Claims " << arg.current_blockchain_height << ", claimed " << context.m_remote_blockchain_height << " before"); hit_score(context, 1); } context.m_remote_blockchain_height = arg.current_blockchain_height; if (context.m_remote_blockchain_height > m_core.get_target_blockchain_height()) m_core.set_target_blockchain_height(context.m_remote_blockchain_height); std::vector block_hashes; block_hashes.reserve(arg.blocks.size()); const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); uint64_t start_height = std::numeric_limits::max(); cryptonote::block b; for(const block_complete_entry& block_entry: arg.blocks) { if (m_stopping) { return 1; } crypto::hash block_hash; if(!parse_and_validate_block_from_blob(block_entry.block, b, block_hash)) { LOG_ERROR_CCONTEXT("sent wrong block: failed to parse and validate block: " << epee::string_tools::buff_to_hex_nodelimer(block_entry.block) << ", dropping connection"); drop_connection(context, false, false); ++m_sync_bad_spans_downloaded; return 1; } if (b.miner_tx.vin.size() != 1 || b.miner_tx.vin.front().type() != typeid(txin_gen)) { LOG_ERROR_CCONTEXT("sent wrong block: block: miner tx does not have exactly one txin_gen input" << epee::string_tools::buff_to_hex_nodelimer(block_entry.block) << ", dropping connection"); drop_connection(context, false, false); ++m_sync_bad_spans_downloaded; return 1; } if (start_height == std::numeric_limits::max()) { start_height = boost::get(b.miner_tx.vin[0]).height; if (start_height > context.m_expect_height) { LOG_ERROR_CCONTEXT("sent block ahead of expected height, dropping connection"); drop_connection(context, false, false); ++m_sync_bad_spans_downloaded; return 1; } } auto req_it = context.m_requested_objects.find(block_hash); if(req_it == context.m_requested_objects.end()) { LOG_ERROR_CCONTEXT("sent wrong NOTIFY_RESPONSE_GET_OBJECTS: block with id=" << epee::string_tools::pod_to_hex(get_blob_hash(block_entry.block)) << " wasn't requested, dropping connection"); drop_connection(context, false, false); ++m_sync_bad_spans_downloaded; return 1; } if(b.tx_hashes.size() != block_entry.txs.size()) { LOG_ERROR_CCONTEXT("sent wrong NOTIFY_RESPONSE_GET_OBJECTS: block with id=" << epee::string_tools::pod_to_hex(get_blob_hash(block_entry.block)) << ", tx_hashes.size()=" << b.tx_hashes.size() << " mismatch with block_complete_entry.m_txs.size()=" << block_entry.txs.size() << ", dropping connection"); drop_connection(context, false, false); ++m_sync_bad_spans_downloaded; return 1; } context.m_requested_objects.erase(req_it); block_hashes.push_back(block_hash); } if(!context.m_requested_objects.empty()) { MERROR(context << "returned not all requested objects (context.m_requested_objects.size()=" << context.m_requested_objects.size() << "), dropping connection"); drop_connection(context, false, false); ++m_sync_bad_spans_downloaded; return 1; } const bool pruned_ok = should_ask_for_pruned_data(context, start_height, arg.blocks.size(), true); if (!pruned_ok) { // if we don't want pruned data, check we did not get any for (block_complete_entry& block_entry: arg.blocks) { if (block_entry.pruned) { MERROR(context << "returned a pruned block, dropping connection"); drop_connection(context, false, false); ++m_sync_bad_spans_downloaded; return 1; } if (block_entry.block_weight) { MERROR(context << "returned a block weight for a non pruned block, dropping connection"); drop_connection(context, false, false); ++m_sync_bad_spans_downloaded; return 1; } for (const tx_blob_entry &tx_entry: block_entry.txs) { if (tx_entry.prunable_hash != crypto::null_hash) { MERROR(context << "returned at least one pruned object which we did not expect, dropping connection"); drop_connection(context, false, false); ++m_sync_bad_spans_downloaded; return 1; } } } } else { // we accept pruned data, check that if we got some, then no weights are zero for (block_complete_entry& block_entry: arg.blocks) { if (block_entry.block_weight == 0 && block_entry.pruned) { MERROR(context << "returned at least one pruned block with 0 weight, dropping connection"); drop_connection(context, false, false); ++m_sync_bad_spans_downloaded; return 1; } } } { MLOG_YELLOW(el::Level::Debug, context << " Got NEW BLOCKS inside of " << __FUNCTION__ << ": size: " << arg.blocks.size() << ", blocks: " << start_height << " - " << (start_height + arg.blocks.size() - 1) << " (pruning seed " << epee::string_tools::to_string_hex(context.m_pruning_seed) << ")"); // add that new span to the block queue const boost::posix_time::time_duration dt = now - request_time; const float rate = size * 1e6 / (dt.total_microseconds() + 1); MDEBUG(context << " adding span: " << arg.blocks.size() << " at height " << start_height << ", " << dt.total_microseconds()/1e6 << " seconds, " << (rate/1024) << " kB/s, size now " << (m_block_queue.get_data_size() + blocks_size) / 1048576.f << " MB"); m_block_queue.add_blocks(start_height, arg.blocks, context.m_connection_id, context.m_remote_address, rate, blocks_size); const crypto::hash last_block_hash = cryptonote::get_block_hash(b); context.m_last_known_hash = last_block_hash; if (!m_core.get_test_drop_download() || !m_core.get_test_drop_download_height()) { // DISCARD BLOCKS for testing return 1; } } try_add_next_blocks(context); return 1; } // Get an estimate for the remaining sync time from given current to target blockchain height, in seconds template uint64_t t_cryptonote_protocol_handler::get_estimated_remaining_sync_seconds(uint64_t current_blockchain_height, uint64_t target_blockchain_height) { // The average sync speed varies so much, even averaged over quite long time periods like 10 minutes, // that using some sliding window would be difficult to implement without often leading to bad estimates. // The simplest strategy - always average sync speed over the maximum available interval i.e. since sync // started at all (from "m_sync_start_time" and "m_sync_start_height") - gives already useful results // and seems to be quite robust. Some quite special cases like "Internet connection suddenly becoming // much faster after syncing already a long time, and staying fast" are not well supported however. if (target_blockchain_height <= current_blockchain_height) { // Syncing stuck, or other special circumstance: Avoid errors, simply give back 0 return 0; } const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); const boost::posix_time::time_duration sync_time = now - m_sync_start_time; cryptonote::network_type nettype = m_core.get_nettype(); // Don't simply use remaining number of blocks for the estimate but "sync weight" as provided by // "cumulative_block_sync_weight" which knows about strongly varying Monero mainnet block sizes uint64_t synced_weight = tools::cumulative_block_sync_weight(nettype, m_sync_start_height, current_blockchain_height - m_sync_start_height); float us_per_weight = (float)sync_time.total_microseconds() / (float)synced_weight; uint64_t remaining_weight = tools::cumulative_block_sync_weight(nettype, current_blockchain_height, target_blockchain_height - current_blockchain_height); float remaining_us = us_per_weight * (float)remaining_weight; return (uint64_t)(remaining_us / 1e6); } // Return a textual remaining sync time estimate, or the empty string if waiting period not yet over template std::string t_cryptonote_protocol_handler::get_periodic_sync_estimate(uint64_t current_blockchain_height, uint64_t target_blockchain_height) { std::string text = ""; const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); boost::posix_time::time_duration period_sync_time = now - m_period_start_time; if (period_sync_time > boost::posix_time::minutes(2)) { // Period is over, time to report another estimate uint64_t remaining_seconds = get_estimated_remaining_sync_seconds(current_blockchain_height, target_blockchain_height); text = tools::get_human_readable_timespan(remaining_seconds); // Start the new period m_period_start_time = now; } return text; } template int t_cryptonote_protocol_handler::try_add_next_blocks(cryptonote_connection_context& context) { bool force_next_span = false; { // We try to lock the sync lock. If we can, it means no other thread is // currently adding blocks, so we do that for as long as we can from the // block queue. Then, we go back to download. const boost::unique_lock sync{m_sync_lock, boost::try_to_lock}; if (!sync.owns_lock()) { MINFO(context << "Failed to lock m_sync_lock, going back to download"); goto skip; } MDEBUG(context << " lock m_sync_lock, adding blocks to chain..."); MLOG_PEER_STATE("adding blocks"); { m_core.pause_mine(); m_add_timer.resume(); bool starting = true; epee::misc_utils::auto_scope_leave_caller scope_exit_handler = epee::misc_utils::create_scope_leave_handler([this, &starting]() { m_add_timer.pause(); m_core.resume_mine(); if (!starting) m_last_add_end_time = tools::get_tick_count(); }); m_sync_start_time = boost::posix_time::microsec_clock::universal_time(); m_sync_start_height = m_core.get_current_blockchain_height(); m_period_start_time = m_sync_start_time; while (1) { const uint64_t previous_height = m_core.get_current_blockchain_height(); uint64_t start_height; std::vector blocks; boost::uuids::uuid span_connection_id; epee::net_utils::network_address span_origin; if (!m_block_queue.get_next_span(start_height, blocks, span_connection_id, span_origin)) { MDEBUG(context << " no next span found, going back to download"); break; } if (blocks.empty()) { MERROR(context << "Next span has no blocks"); m_block_queue.remove_spans(span_connection_id, start_height); continue; } MDEBUG(context << " next span in the queue has blocks " << start_height << "-" << (start_height + blocks.size() - 1) << ", we need " << previous_height); block new_block; crypto::hash last_block_hash; if (!parse_and_validate_block_from_blob(blocks.back().block, new_block, last_block_hash)) { MERROR(context << "Failed to parse block, but it should already have been parsed"); m_block_queue.remove_spans(span_connection_id, start_height); continue; } if (m_core.have_block(last_block_hash)) { const uint64_t subchain_height = start_height + blocks.size(); LOG_DEBUG_CC(context, "These are old blocks, ignoring: blocks " << start_height << " - " << (subchain_height-1) << ", blockchain height " << m_core.get_current_blockchain_height()); m_block_queue.remove_spans(span_connection_id, start_height); ++m_sync_old_spans_downloaded; continue; } if (!parse_and_validate_block_from_blob(blocks.front().block, new_block)) { MERROR(context << "Failed to parse block, but it should already have been parsed"); m_block_queue.remove_spans(span_connection_id, start_height); continue; } bool parent_known = m_core.have_block(new_block.prev_id); if (!parent_known) { // it could be: // - later in the current chain // - later in an alt chain // - orphan // if it was requested, then it'll be resolved later, otherwise it's an orphan bool parent_requested = m_block_queue.requested(new_block.prev_id); if (!parent_requested) { // we might be able to ask for that block directly, as we now can request out of order, // otherwise we continue out of order, unless this block is the one we need, in which // case we request block hashes, though it might be safer to disconnect ? if (start_height > previous_height) { if (should_drop_connection(context, get_next_needed_pruning_stripe().first)) { MDEBUG(context << "Got block with unknown parent which was not requested, but peer does not have that block - dropping connection"); if (!context.m_is_income) m_p2p->add_used_stripe_peer(context); drop_connection(context, false, true); return 1; } MDEBUG(context << "Got block with unknown parent which was not requested, but peer does not have that block - back to download"); goto skip; } // this can happen if a connection was sicced onto a late span, if it did not have those blocks, // since we don't know that at the sic time LOG_ERROR_CCONTEXT("Got block with unknown parent which was not requested - querying block hashes"); m_block_queue.remove_spans(span_connection_id, start_height); context.m_needed_objects.clear(); context.m_last_response_height = 0; goto skip; } // parent was requested, so we wait for it to be retrieved MINFO(context << " parent was requested, we'll get back to it"); break; } const boost::posix_time::ptime start = boost::posix_time::microsec_clock::universal_time(); if (starting) { starting = false; if (m_last_add_end_time) { const uint64_t tnow = tools::get_tick_count(); const uint64_t ns = tools::ticks_to_ns(tnow - m_last_add_end_time); MINFO("Restarting adding block after idle for " << ns/1e9 << " seconds"); } } std::vector pblocks; if (!m_core.prepare_handle_incoming_blocks(blocks, pblocks)) { LOG_ERROR_CCONTEXT("Failure in prepare_handle_incoming_blocks"); drop_connections(span_origin); return 1; } if (!pblocks.empty() && pblocks.size() != blocks.size()) { m_core.cleanup_handle_incoming_blocks(); LOG_ERROR_CCONTEXT("Internal error: blocks.size() != block_entry.txs.size()"); return 1; } uint64_t block_process_time_full = 0, transactions_process_time_full = 0; size_t num_txs = 0, blockidx = 0; for(const block_complete_entry& block_entry: blocks) { if (m_stopping) { m_core.cleanup_handle_incoming_blocks(); return 1; } // process transactions TIME_MEASURE_START(transactions_process_time); num_txs += block_entry.txs.size(); std::vector tvc; m_core.handle_incoming_txs(block_entry.txs, tvc, relay_method::block, true); if (tvc.size() != block_entry.txs.size()) { LOG_ERROR_CCONTEXT("Internal error: tvc.size() != block_entry.txs.size()"); if (!m_core.cleanup_handle_incoming_blocks()) { LOG_PRINT_CCONTEXT_L0("Failure in cleanup_handle_incoming_blocks"); return 1; } return 1; } std::vector::const_iterator it = block_entry.txs.begin(); for (size_t i = 0; i < tvc.size(); ++i, ++it) { if(tvc[i].m_verifivation_failed) { drop_connections(span_origin); if (!m_p2p->for_connection(span_connection_id, [&](cryptonote_connection_context& context, nodetool::peerid_type peer_id, uint32_t f)->bool{ cryptonote::transaction tx; crypto::hash txid; if (it->prunable_hash == crypto::null_hash) { parse_and_validate_tx_from_blob(it->blob, tx, txid); // must succeed if we got here } else { parse_and_validate_tx_base_from_blob(it->blob, tx); // must succeed if we got here txid = get_pruned_transaction_hash(tx, it->prunable_hash); } LOG_ERROR_CCONTEXT("transaction verification failed on NOTIFY_RESPONSE_GET_OBJECTS, tx_id = " << epee::string_tools::pod_to_hex(txid) << ", dropping connection"); drop_connection(context, false, true); return 1; })) LOG_ERROR_CCONTEXT("span connection id not found"); if (!m_core.cleanup_handle_incoming_blocks()) { LOG_PRINT_CCONTEXT_L0("Failure in cleanup_handle_incoming_blocks"); return 1; } // in case the peer had dropped beforehand, remove the span anyway so other threads can wake up and get it m_block_queue.remove_spans(span_connection_id, start_height); return 1; } } TIME_MEASURE_FINISH(transactions_process_time); transactions_process_time_full += transactions_process_time; // process block TIME_MEASURE_START(block_process_time); block_verification_context bvc = {}; m_core.handle_incoming_block(block_entry.block, pblocks.empty() ? NULL : &pblocks[blockidx], bvc, false); // <--- process block if(bvc.m_verifivation_failed) { drop_connections(span_origin); if (!m_p2p->for_connection(span_connection_id, [&](cryptonote_connection_context& context, nodetool::peerid_type peer_id, uint32_t f)->bool{ LOG_PRINT_CCONTEXT_L1("Block verification failed, dropping connection"); drop_connection_with_score(context, bvc.m_bad_pow ? P2P_IP_FAILS_BEFORE_BLOCK : 1, true); return 1; })) LOG_ERROR_CCONTEXT("span connection id not found"); if (!m_core.cleanup_handle_incoming_blocks()) { LOG_PRINT_CCONTEXT_L0("Failure in cleanup_handle_incoming_blocks"); return 1; } // in case the peer had dropped beforehand, remove the span anyway so other threads can wake up and get it m_block_queue.remove_spans(span_connection_id, start_height); return 1; } if(bvc.m_marked_as_orphaned) { drop_connections(span_origin); if (!m_p2p->for_connection(span_connection_id, [&](cryptonote_connection_context& context, nodetool::peerid_type peer_id, uint32_t f)->bool{ LOG_PRINT_CCONTEXT_L1("Block received at sync phase was marked as orphaned, dropping connection"); drop_connection(context, true, true); return 1; })) LOG_ERROR_CCONTEXT("span connection id not found"); if (!m_core.cleanup_handle_incoming_blocks()) { LOG_PRINT_CCONTEXT_L0("Failure in cleanup_handle_incoming_blocks"); return 1; } // in case the peer had dropped beforehand, remove the span anyway so other threads can wake up and get it m_block_queue.remove_spans(span_connection_id, start_height); return 1; } TIME_MEASURE_FINISH(block_process_time); block_process_time_full += block_process_time; ++blockidx; } // each download block MDEBUG(context << "Block process time (" << blocks.size() << " blocks, " << num_txs << " txs): " << block_process_time_full + transactions_process_time_full << " (" << transactions_process_time_full << "/" << block_process_time_full << ") ms"); if (!m_core.cleanup_handle_incoming_blocks()) { LOG_PRINT_CCONTEXT_L0("Failure in cleanup_handle_incoming_blocks"); return 1; } m_block_queue.remove_spans(span_connection_id, start_height); const uint64_t current_blockchain_height = m_core.get_current_blockchain_height(); if (current_blockchain_height > previous_height) { const uint64_t target_blockchain_height = m_core.get_target_blockchain_height(); const boost::posix_time::time_duration dt = boost::posix_time::microsec_clock::universal_time() - start; std::string progress_message = ""; if (current_blockchain_height < target_blockchain_height) { uint64_t completion_percent = (current_blockchain_height * 100 / target_blockchain_height); if (completion_percent == 100) // never show 100% if not actually up to date completion_percent = 99; progress_message = " (" + std::to_string(completion_percent) + "%, " + std::to_string(target_blockchain_height - current_blockchain_height) + " left"; std::string time_message = get_periodic_sync_estimate(current_blockchain_height, target_blockchain_height); if (!time_message.empty()) { uint64_t total_blocks_to_sync = target_blockchain_height - m_sync_start_height; uint64_t total_blocks_synced = current_blockchain_height - m_sync_start_height; progress_message += ", " + std::to_string(total_blocks_synced * 100 / total_blocks_to_sync) + "% of total synced"; progress_message += ", estimated " + time_message + " left"; } progress_message += ")"; } const uint32_t previous_stripe = tools::get_pruning_stripe(previous_height, target_blockchain_height, CRYPTONOTE_PRUNING_LOG_STRIPES); const uint32_t current_stripe = tools::get_pruning_stripe(current_blockchain_height, target_blockchain_height, CRYPTONOTE_PRUNING_LOG_STRIPES); std::string timing_message = ""; if (ELPP->vRegistry()->allowed(el::Level::Info, "sync-info")) timing_message = std::string(" (") + std::to_string(dt.total_microseconds()/1e6) + " sec, " + std::to_string((current_blockchain_height - previous_height) * 1e6 / dt.total_microseconds()) + " blocks/sec), " + std::to_string(m_block_queue.get_data_size() / 1048576.f) + " MB queued in " + std::to_string(m_block_queue.get_num_filled_spans()) + " spans, stripe " + std::to_string(previous_stripe) + " -> " + std::to_string(current_stripe); if (ELPP->vRegistry()->allowed(el::Level::Debug, "sync-info")) timing_message += std::string(": ") + m_block_queue.get_overview(current_blockchain_height); MGINFO_YELLOW("Synced " << current_blockchain_height << "/" << target_blockchain_height << progress_message << timing_message); if (previous_stripe != current_stripe) notify_new_stripe(context, current_stripe); } } } MLOG_PEER_STATE("stopping adding blocks"); if (should_download_next_span(context, false)) { force_next_span = true; } else if (should_drop_connection(context, get_next_needed_pruning_stripe().first)) { if (!context.m_is_income) { m_p2p->add_used_stripe_peer(context); drop_connection(context, false, false); } return 1; } } skip: if (!request_missing_objects(context, true, force_next_span)) { LOG_ERROR_CCONTEXT("Failed to request missing objects, dropping connection"); drop_connection(context, false, false); return 1; } return 1; } //------------------------------------------------------------------------------------------------------------------------ template void t_cryptonote_protocol_handler::notify_new_stripe(cryptonote_connection_context& cntxt, uint32_t stripe) { m_p2p->for_each_connection([&](cryptonote_connection_context& context, nodetool::peerid_type peer_id, uint32_t support_flags)->bool { if (cntxt.m_connection_id == context.m_connection_id) return true; if (context.m_state == cryptonote_connection_context::state_normal) { const uint32_t peer_stripe = tools::get_pruning_stripe(context.m_pruning_seed); if (stripe && peer_stripe && peer_stripe != stripe) return true; context.m_new_stripe_notification = true; LOG_PRINT_CCONTEXT_L2("requesting callback"); ++context.m_callback_request_count; m_p2p->request_callback(context); MLOG_PEER_STATE("requesting callback"); } return true; }); } //------------------------------------------------------------------------------------------------------------------------ template bool t_cryptonote_protocol_handler::on_idle() { m_idle_peer_kicker.do_call(boost::bind(&t_cryptonote_protocol_handler::kick_idle_peers, this)); m_standby_checker.do_call(boost::bind(&t_cryptonote_protocol_handler::check_standby_peers, this)); m_sync_search_checker.do_call(boost::bind(&t_cryptonote_protocol_handler::update_sync_search, this)); return m_core.on_idle(); } //------------------------------------------------------------------------------------------------------------------------ template bool t_cryptonote_protocol_handler::kick_idle_peers() { MTRACE("Checking for idle peers..."); m_p2p->for_each_connection([&](cryptonote_connection_context& context, nodetool::peerid_type peer_id, uint32_t support_flags)->bool { if (context.m_state == cryptonote_connection_context::state_synchronizing && context.m_last_request_time != boost::date_time::not_a_date_time) { const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); const boost::posix_time::time_duration dt = now - context.m_last_request_time; const auto ms = dt.total_microseconds(); if (ms > IDLE_PEER_KICK_TIME || (context.m_expect_response && ms > NON_RESPONSIVE_PEER_KICK_TIME)) { context.m_idle_peer_notification = true; LOG_PRINT_CCONTEXT_L2("requesting callback"); ++context.m_callback_request_count; m_p2p->request_callback(context); MLOG_PEER_STATE("requesting callback"); } } return true; }); return true; } //------------------------------------------------------------------------------------------------------------------------ template bool t_cryptonote_protocol_handler::update_sync_search() { const uint64_t target = m_core.get_target_blockchain_height(); const uint64_t height = m_core.get_current_blockchain_height(); if (target > height) // if we're not synced yet, don't do it return true; MTRACE("Checking for outgoing syncing peers..."); std::unordered_map n_syncing, n_synced; std::unordered_map last_synced_peer_id; std::vector zones; m_p2p->for_each_connection([&](cryptonote_connection_context& context, nodetool::peerid_type peer_id, uint32_t support_flags)->bool { if (!peer_id || context.m_is_income) // only consider connected outgoing peers return true; const epee::net_utils::zone zone = context.m_remote_address.get_zone(); if (n_syncing.find(zone) == n_syncing.end()) { n_syncing[zone] = 0; n_synced[zone] = 0; last_synced_peer_id[zone] = boost::uuids::nil_uuid(); zones.push_back(zone); } if (context.m_state == cryptonote_connection_context::state_synchronizing) ++n_syncing[zone]; if (context.m_state == cryptonote_connection_context::state_normal) { ++n_synced[zone]; if (!context.m_anchor) last_synced_peer_id[zone] = context.m_connection_id; } return true; }); for (const auto& zone : zones) { const unsigned int max_out_peers = get_max_out_peers(zone); MTRACE("[" << epee::net_utils::zone_to_string(zone) << "] " << n_syncing[zone] << " syncing, " << n_synced[zone] << " synced, " << max_out_peers << " max out peers"); // if we're at max out peers, and not enough are syncing, drop the last sync'd non-anchor if (n_synced[zone] + n_syncing[zone] >= max_out_peers && n_syncing[zone] < P2P_DEFAULT_SYNC_SEARCH_CONNECTIONS_COUNT && last_synced_peer_id[zone] != boost::uuids::nil_uuid()) { if (!m_p2p->for_connection(last_synced_peer_id[zone], [&](cryptonote_connection_context& ctx, nodetool::peerid_type peer_id, uint32_t f)->bool{ MINFO(ctx << "dropping synced peer, " << n_syncing[zone] << " syncing, " << n_synced[zone] << " synced, " << max_out_peers << " max out peers"); drop_connection(ctx, false, false); return true; })) MDEBUG("Failed to find peer we wanted to drop"); } } return true; } //------------------------------------------------------------------------------------------------------------------------ template bool t_cryptonote_protocol_handler::check_standby_peers() { m_p2p->for_each_connection([&](cryptonote_connection_context& context, nodetool::peerid_type peer_id, uint32_t support_flags)->bool { if (context.m_state == cryptonote_connection_context::state_standby) { LOG_PRINT_CCONTEXT_L2("requesting callback"); ++context.m_callback_request_count; m_p2p->request_callback(context); } return true; }); return true; } //------------------------------------------------------------------------------------------------------------------------ template int t_cryptonote_protocol_handler::handle_request_chain(int command, NOTIFY_REQUEST_CHAIN::request& arg, cryptonote_connection_context& context) { MLOG_P2P_MESSAGE("Received NOTIFY_REQUEST_CHAIN (" << arg.block_ids.size() << " blocks"); if (context.m_state == cryptonote_connection_context::state_before_handshake) { LOG_ERROR_CCONTEXT("Requested chain before handshake, dropping connection"); drop_connection(context, false, false); return 1; } NOTIFY_RESPONSE_CHAIN_ENTRY::request r; if(!m_core.find_blockchain_supplement(arg.block_ids, !arg.prune, r)) { LOG_ERROR_CCONTEXT("Failed to handle NOTIFY_REQUEST_CHAIN."); return 1; } if (r.m_block_ids.size() >= 2) { cryptonote::block b; if (!m_core.get_block_by_hash(r.m_block_ids[1], b)) { LOG_ERROR_CCONTEXT("Failed to handle NOTIFY_REQUEST_CHAIN: first block not found"); return 1; } r.first_block = cryptonote::block_to_blob(b); } MLOG_P2P_MESSAGE("-->>NOTIFY_RESPONSE_CHAIN_ENTRY: m_start_height=" << r.start_height << ", m_total_height=" << r.total_height << ", m_block_ids.size()=" << r.m_block_ids.size()); post_notify(r, context); return 1; } //------------------------------------------------------------------------------------------------------------------------ template bool t_cryptonote_protocol_handler::should_download_next_span(cryptonote_connection_context& context, bool standby) { std::vector hashes; boost::posix_time::ptime request_time; boost::uuids::uuid connection_id; bool filled; const uint64_t blockchain_height = m_core.get_current_blockchain_height(); if (context.m_remote_blockchain_height <= blockchain_height) return false; const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); const bool has_next_block = tools::has_unpruned_block(blockchain_height, context.m_remote_blockchain_height, context.m_pruning_seed); if (has_next_block) { if (!m_block_queue.has_next_span(blockchain_height, filled, request_time, connection_id)) { MDEBUG(context << " we should download it as no peer reserved it"); return true; } if (!filled) { const long dt = (now - request_time).total_microseconds(); if (dt >= REQUEST_NEXT_SCHEDULED_SPAN_THRESHOLD) { MDEBUG(context << " we should download it as it's not been received yet after " << dt/1e6); return true; } // in standby, be ready to double download early since we're idling anyway // let the fastest peer trigger first const double dl_speed = context.m_max_speed_down; if (standby && dt >= REQUEST_NEXT_SCHEDULED_SPAN_THRESHOLD_STANDBY && dl_speed > 0) { bool download = false; if (m_p2p->for_connection(connection_id, [&](cryptonote_connection_context& ctx, nodetool::peerid_type peer_id, uint32_t f)->bool{ const time_t nowt = time(NULL); const time_t time_since_last_recv = nowt - ctx.m_last_recv; const float last_activity = std::min((float)time_since_last_recv, dt/1e6f); const bool stalled = last_activity > LAST_ACTIVITY_STALL_THRESHOLD; if (stalled) { MDEBUG(context << " we should download it as the downloading peer is stalling for " << nowt - ctx.m_last_recv << " seconds"); download = true; return true; } // estimate the standby peer can give us 80% of its max speed // and let it download if that speed is > N times as fast as the current one // N starts at 10 after REQUEST_NEXT_SCHEDULED_SPAN_THRESHOLD_STANDBY, // decreases to 1.25 at REQUEST_NEXT_SCHEDULED_SPAN_THRESHOLD, // so that at times goes without the download being done, a retry becomes easier const float max_multiplier = 10.f; const float min_multiplier = 1.25f; float multiplier = max_multiplier; if (dt >= REQUEST_NEXT_SCHEDULED_SPAN_THRESHOLD_STANDBY) { multiplier = max_multiplier - (dt-REQUEST_NEXT_SCHEDULED_SPAN_THRESHOLD_STANDBY) * (max_multiplier - min_multiplier) / (REQUEST_NEXT_SCHEDULED_SPAN_THRESHOLD - REQUEST_NEXT_SCHEDULED_SPAN_THRESHOLD_STANDBY); multiplier = std::min(max_multiplier, std::max(min_multiplier, multiplier)); } if (dl_speed * .8f > ctx.m_current_speed_down * multiplier) { MDEBUG(context << " we should download it as we are substantially faster (" << dl_speed << " vs " << ctx.m_current_speed_down << ", multiplier " << multiplier << " after " << dt/1e6 << " seconds)"); download = true; return true; } return true; })) { if (download) return true; } else { MWARNING(context << " we should download it as the downloading peer is unexpectedly not known to us"); return true; } } } } return false; } //------------------------------------------------------------------------------------------------------------------------ template bool t_cryptonote_protocol_handler::should_drop_connection(cryptonote_connection_context& context, uint32_t next_stripe) { if (context.m_anchor) { MDEBUG(context << "This is an anchor peer, not dropping"); return false; } if (context.m_pruning_seed == 0) { MDEBUG(context << "This peer is not striped, not dropping"); return false; } const uint32_t peer_stripe = tools::get_pruning_stripe(context.m_pruning_seed); if (next_stripe == peer_stripe) { MDEBUG(context << "This peer has needed stripe " << peer_stripe << ", not dropping"); return false; } const uint32_t local_stripe = tools::get_pruning_stripe(m_core.get_blockchain_pruning_seed()); if (m_sync_pruned_blocks && local_stripe && next_stripe != local_stripe) { MDEBUG(context << "We can sync pruned blocks off this peer, not dropping"); return false; } if (!context.m_needed_objects.empty()) { const uint64_t next_available_block_height = context.m_last_response_height - context.m_needed_objects.size() + 1; if (tools::has_unpruned_block(next_available_block_height, context.m_remote_blockchain_height, context.m_pruning_seed)) { MDEBUG(context << "This peer has unpruned next block at height " << next_available_block_height << ", not dropping"); return false; } } if (next_stripe > 0) { unsigned int n_out_peers = 0, n_peers_on_next_stripe = 0; m_p2p->for_each_connection([&](cryptonote_connection_context& ctx, nodetool::peerid_type peer_id, uint32_t support_flags)->bool{ if (!ctx.m_is_income) ++n_out_peers; if (ctx.m_state >= cryptonote_connection_context::state_synchronizing && tools::get_pruning_stripe(ctx.m_pruning_seed) == next_stripe) ++n_peers_on_next_stripe; return true; }); // TODO: investigate tallying by zone and comparing to max out peers by zone const unsigned int max_out_peers = get_max_out_peers(epee::net_utils::zone::public_); const uint32_t distance = (peer_stripe + (1<= max_out_peers && n_peers_on_next_stripe == 0) || (distance > 1 && n_peers_on_next_stripe <= 2) || distance > 2) { MDEBUG(context << "we want seed " << next_stripe << ", and either " << n_out_peers << " is at max out peers (" << max_out_peers << ") or distance " << distance << " from " << next_stripe << " to " << peer_stripe << " is too large and we have only " << n_peers_on_next_stripe << " peers on next seed, dropping connection to make space"); return true; } } MDEBUG(context << "End of checks, not dropping"); return false; } //------------------------------------------------------------------------------------------------------------------------ template size_t t_cryptonote_protocol_handler::skip_unneeded_hashes(cryptonote_connection_context& context, bool check_block_queue) const { // take out blocks we already have size_t skip = 0; while (skip < context.m_needed_objects.size() && (m_core.have_block(context.m_needed_objects[skip].first) || (check_block_queue && m_block_queue.have(context.m_needed_objects[skip].first)))) { // if we're popping the last hash, record it so we can ask again from that hash, // this prevents never being able to progress on peers we get old hash lists from if (skip + 1 == context.m_needed_objects.size()) context.m_last_known_hash = context.m_needed_objects[skip].first; ++skip; } if (skip > 0) { MDEBUG(context << "skipping " << skip << "/" << context.m_needed_objects.size() << " blocks"); context.m_needed_objects = std::vector>(context.m_needed_objects.begin() + skip, context.m_needed_objects.end()); } return skip; } //------------------------------------------------------------------------------------------------------------------------ template bool t_cryptonote_protocol_handler::should_ask_for_pruned_data(cryptonote_connection_context& context, uint64_t first_block_height, uint64_t nblocks, bool check_block_weights) const { if (!m_sync_pruned_blocks) return false; if (!m_core.is_within_compiled_block_hash_area(first_block_height + nblocks - 1)) return false; const uint32_t local_stripe = tools::get_pruning_stripe(m_core.get_blockchain_pruning_seed()); if (local_stripe == 0) return false; // don't request pre-bulletprooof pruned blocks, we can't reconstruct their weight (yet) static const uint64_t bp_fork_height = m_core.get_earliest_ideal_height_for_version(HF_VERSION_SMALLER_BP + 1); if (first_block_height < bp_fork_height) return false; // assumes the span size is less or equal to the stripe size bool full_data_needed = tools::get_pruning_stripe(first_block_height, context.m_remote_blockchain_height, CRYPTONOTE_PRUNING_LOG_STRIPES) == local_stripe || tools::get_pruning_stripe(first_block_height + nblocks - 1, context.m_remote_blockchain_height, CRYPTONOTE_PRUNING_LOG_STRIPES) == local_stripe; if (full_data_needed) return false; if (check_block_weights && !m_core.has_block_weights(first_block_height, nblocks)) return false; return true; } //------------------------------------------------------------------------------------------------------------------------ template bool t_cryptonote_protocol_handler::request_missing_objects(cryptonote_connection_context& context, bool check_having_blocks, bool force_next_span) { // flush stale spans std::set live_connections; m_p2p->for_each_connection([&](cryptonote_connection_context& context, nodetool::peerid_type peer_id, uint32_t support_flags)->bool{ live_connections.insert(context.m_connection_id); return true; }); m_block_queue.flush_stale_spans(live_connections); // if we don't need to get next span, and the block queue is full enough, wait a bit bool start_from_current_chain = false; if (!force_next_span) { do { size_t nspans = m_block_queue.get_num_filled_spans(); size_t size = m_block_queue.get_data_size(); const uint64_t bc_height = m_core.get_current_blockchain_height(); const auto next_needed_pruning_stripe = get_next_needed_pruning_stripe(); const uint32_t add_stripe = tools::get_pruning_stripe(bc_height, context.m_remote_blockchain_height, CRYPTONOTE_PRUNING_LOG_STRIPES); const uint32_t peer_stripe = tools::get_pruning_stripe(context.m_pruning_seed); const uint32_t local_stripe = tools::get_pruning_stripe(m_core.get_blockchain_pruning_seed()); const size_t block_queue_size_threshold = m_block_download_max_size ? m_block_download_max_size : BLOCK_QUEUE_SIZE_THRESHOLD; bool queue_proceed = nspans < BLOCK_QUEUE_NSPANS_THRESHOLD || size < block_queue_size_threshold; // get rid of blocks we already requested, or already have if (skip_unneeded_hashes(context, true) && context.m_needed_objects.empty() && context.m_num_requested == 0) { if (context.m_remote_blockchain_height > m_block_queue.get_next_needed_height(bc_height)) { MERROR(context << "Nothing we can request from this peer, and we did not request anything previously"); return false; } MDEBUG(context << "Nothing to get from this peer, and it's not ahead of us, all done"); context.m_state = cryptonote_connection_context::state_normal; if (m_core.get_current_blockchain_height() >= m_core.get_target_blockchain_height()) on_connection_synchronized(); return true; } uint64_t next_needed_height = m_block_queue.get_next_needed_height(bc_height); uint64_t next_block_height; if (context.m_needed_objects.empty()) next_block_height = next_needed_height; else next_block_height = context.m_last_response_height - context.m_needed_objects.size() + 1; bool stripe_proceed_main = ((m_sync_pruned_blocks && local_stripe && add_stripe != local_stripe) || add_stripe == 0 || peer_stripe == 0 || add_stripe == peer_stripe) && (next_block_height < bc_height + BLOCK_QUEUE_FORCE_DOWNLOAD_NEAR_BLOCKS || next_needed_height < bc_height + BLOCK_QUEUE_FORCE_DOWNLOAD_NEAR_BLOCKS); bool stripe_proceed_secondary = tools::has_unpruned_block(next_block_height, context.m_remote_blockchain_height, context.m_pruning_seed); bool proceed = stripe_proceed_main || (queue_proceed && stripe_proceed_secondary); if (!stripe_proceed_main && !stripe_proceed_secondary && should_drop_connection(context, tools::get_pruning_stripe(next_block_height, context.m_remote_blockchain_height, CRYPTONOTE_PRUNING_LOG_STRIPES))) { if (!context.m_is_income) m_p2p->add_used_stripe_peer(context); return false; // drop outgoing connections } MDEBUG(context << "proceed " << proceed << " (queue " << queue_proceed << ", stripe " << stripe_proceed_main << "/" << stripe_proceed_secondary << "), " << next_needed_pruning_stripe.first << "-" << next_needed_pruning_stripe.second << " needed, bc add stripe " << add_stripe << ", we have " << peer_stripe << "), bc_height " << bc_height); MDEBUG(context << " - next_block_height " << next_block_height << ", seed " << epee::string_tools::to_string_hex(context.m_pruning_seed) << ", next_needed_height "<< next_needed_height); MDEBUG(context << " - last_response_height " << context.m_last_response_height << ", m_needed_objects size " << context.m_needed_objects.size()); // if we're waiting for next span, try to get it before unblocking threads below, // or a runaway downloading of future spans might happen if (stripe_proceed_main && should_download_next_span(context, true)) { MDEBUG(context << " we should try for that next span too, we think we could get it faster, resuming"); force_next_span = true; MLOG_PEER_STATE("resuming"); break; } if (proceed) { if (context.m_state != cryptonote_connection_context::state_standby) { LOG_DEBUG_CC(context, "Block queue is " << nspans << " and " << size << ", resuming"); MLOG_PEER_STATE("resuming"); } break; } // this one triggers if all threads are in standby, which should not happen, // but happened at least once, so we unblock at least one thread if so boost::unique_lock sync{m_sync_lock, boost::try_to_lock}; if (sync.owns_lock()) { bool filled = false; boost::posix_time::ptime time; boost::uuids::uuid connection_id; if (m_block_queue.has_next_span(m_core.get_current_blockchain_height(), filled, time, connection_id) && filled) { LOG_DEBUG_CC(context, "No other thread is adding blocks, and next span needed is ready, resuming"); MLOG_PEER_STATE("resuming"); context.m_state = cryptonote_connection_context::state_standby; ++context.m_callback_request_count; m_p2p->request_callback(context); return true; } else { sync.unlock(); // if this has gone on for too long, drop incoming connection to guard against some wedge state if (!context.m_is_income) { const uint64_t now = tools::get_tick_count(); const uint64_t dt = now - m_last_add_end_time; if (m_last_add_end_time && tools::ticks_to_ns(dt) >= DROP_ON_SYNC_WEDGE_THRESHOLD) { MDEBUG(context << "ns " << tools::ticks_to_ns(dt) << " from " << m_last_add_end_time << " and " << now); MDEBUG(context << "Block addition seems to have wedged, dropping connection"); return false; } } } } if (context.m_state != cryptonote_connection_context::state_standby) { if (!queue_proceed) LOG_DEBUG_CC(context, "Block queue is " << nspans << " and " << size << ", pausing"); else if (!stripe_proceed_main && !stripe_proceed_secondary) LOG_DEBUG_CC(context, "We do not have the stripe required to download another block, pausing"); context.m_state = cryptonote_connection_context::state_standby; MLOG_PEER_STATE("pausing"); } return true; } while(0); context.m_state = cryptonote_connection_context::state_synchronizing; } MDEBUG(context << " request_missing_objects: check " << check_having_blocks << ", force_next_span " << force_next_span << ", m_needed_objects " << context.m_needed_objects.size() << " lrh " << context.m_last_response_height << ", chain " << m_core.get_current_blockchain_height() << ", pruning seed " << epee::string_tools::to_string_hex(context.m_pruning_seed)); if(context.m_needed_objects.size() || force_next_span) { //we know objects that we need, request this objects NOTIFY_REQUEST_GET_OBJECTS::request req; bool is_next = false; size_t count = 0; const size_t count_limit = m_core.get_block_sync_size(m_core.get_current_blockchain_height()); std::pair span = std::make_pair(0, 0); if (force_next_span) { if (span.second == 0) { std::vector hashes; boost::uuids::uuid span_connection_id; boost::posix_time::ptime time; span = m_block_queue.get_next_span_if_scheduled(hashes, span_connection_id, time); if (span.second > 0) { is_next = true; req.blocks.reserve(hashes.size()); for (const auto &hash: hashes) { req.blocks.push_back(hash); context.m_requested_objects.insert(hash); } m_block_queue.reset_next_span_time(); } } } if (span.second == 0) { MDEBUG(context << " span size is 0"); if (context.m_last_response_height + 1 < context.m_needed_objects.size()) { MERROR(context << " ERROR: inconsistent context: lrh " << context.m_last_response_height << ", nos " << context.m_needed_objects.size()); context.m_needed_objects.clear(); context.m_last_response_height = 0; goto skip; } if (skip_unneeded_hashes(context, false) && context.m_needed_objects.empty() && context.m_num_requested == 0) { if (context.m_remote_blockchain_height > m_block_queue.get_next_needed_height(m_core.get_current_blockchain_height())) { MERROR(context << "Nothing we can request from this peer, and we did not request anything previously"); return false; } MDEBUG(context << "Nothing to get from this peer, and it's not ahead of us, all done"); context.m_state = cryptonote_connection_context::state_normal; if (m_core.get_current_blockchain_height() >= m_core.get_target_blockchain_height()) on_connection_synchronized(); return true; } const uint64_t first_block_height = context.m_last_response_height - context.m_needed_objects.size() + 1; static const uint64_t bp_fork_height = m_core.get_earliest_ideal_height_for_version(8); bool sync_pruned_blocks = m_sync_pruned_blocks && first_block_height >= bp_fork_height && m_core.get_blockchain_pruning_seed(); span = m_block_queue.reserve_span(first_block_height, context.m_last_response_height, count_limit, context.m_connection_id, context.m_remote_address, sync_pruned_blocks, m_core.get_blockchain_pruning_seed(), context.m_pruning_seed, context.m_remote_blockchain_height, context.m_needed_objects); MDEBUG(context << " span from " << first_block_height << ": " << span.first << "/" << span.second); if (span.second > 0) { const uint32_t stripe = tools::get_pruning_stripe(span.first, context.m_remote_blockchain_height, CRYPTONOTE_PRUNING_LOG_STRIPES); if (context.m_pruning_seed && stripe != tools::get_pruning_stripe(context.m_pruning_seed)) { MDEBUG(context << " starting early on next seed (" << span.first << " with stripe " << stripe << ", context seed " << epee::string_tools::to_string_hex(context.m_pruning_seed) << ")"); } } } if (span.second == 0 && !force_next_span) { MDEBUG(context << " still no span reserved, we may be in the corner case of next span scheduled and everything else scheduled/filled"); std::vector hashes; boost::uuids::uuid span_connection_id; boost::posix_time::ptime time; span = m_block_queue.get_next_span_if_scheduled(hashes, span_connection_id, time); if (span.second > 0 && !tools::has_unpruned_block(span.first, context.m_remote_blockchain_height, context.m_pruning_seed)) span = std::make_pair(0, 0); if (span.second > 0) { is_next = true; req.blocks.reserve(hashes.size()); for (const auto &hash: hashes) { req.blocks.push_back(hash); ++count; context.m_requested_objects.insert(hash); // that's atrocious O(n) wise, but this is rare auto i = std::find_if(context.m_needed_objects.begin(), context.m_needed_objects.end(), [&hash](const std::pair &o) { return o.first == hash; }); if (i != context.m_needed_objects.end()) context.m_needed_objects.erase(i); } } } MDEBUG(context << " span: " << span.first << "/" << span.second << " (" << span.first << " - " << (span.first + span.second - 1) << ")"); if (span.second > 0) { if (!is_next) { const uint64_t first_context_block_height = context.m_last_response_height - context.m_needed_objects.size() + 1; uint64_t skip = span.first - first_context_block_height; if (skip > context.m_needed_objects.size()) { MERROR("ERROR: skip " << skip << ", m_needed_objects " << context.m_needed_objects.size() << ", first_context_block_height" << first_context_block_height); return false; } if (skip > 0) context.m_needed_objects = std::vector>(context.m_needed_objects.begin() + skip, context.m_needed_objects.end()); if (context.m_needed_objects.size() < span.second) { MERROR("ERROR: span " << span.first << "/" << span.second << ", m_needed_objects " << context.m_needed_objects.size()); return false; } req.blocks.reserve(req.blocks.size() + span.second); for (size_t n = 0; n < span.second; ++n) { req.blocks.push_back(context.m_needed_objects[n].first); ++count; context.m_requested_objects.insert(context.m_needed_objects[n].first); } context.m_needed_objects = std::vector>(context.m_needed_objects.begin() + span.second, context.m_needed_objects.end()); } req.prune = should_ask_for_pruned_data(context, span.first, span.second, true); // if we need to ask for full data and that peer does not have the right stripe, we can't ask it if (!req.prune && context.m_pruning_seed) { const uint32_t peer_stripe = tools::get_pruning_stripe(context.m_pruning_seed); const uint32_t first_stripe = tools::get_pruning_stripe(span.first, context.m_remote_blockchain_height, CRYPTONOTE_PRUNING_LOG_STRIPES); const uint32_t last_stripe = tools::get_pruning_stripe(span.first + span.second - 1, context.m_remote_blockchain_height, CRYPTONOTE_PRUNING_LOG_STRIPES); if (((first_stripe && peer_stripe != first_stripe) || (last_stripe && peer_stripe != last_stripe)) && !m_sync_pruned_blocks) { MDEBUG(context << "We need full data, but the peer does not have it, dropping peer"); return false; } } context.m_last_request_time = boost::posix_time::microsec_clock::universal_time(); context.m_expect_height = span.first; context.m_expect_response = NOTIFY_RESPONSE_GET_OBJECTS::ID; MLOG_P2P_MESSAGE("-->>NOTIFY_REQUEST_GET_OBJECTS: blocks.size()=" << req.blocks.size() << "requested blocks count=" << count << " / " << count_limit << " from " << span.first << ", first hash " << req.blocks.front()); //epee::net_utils::network_throttle_manager::get_global_throttle_inreq().logger_handle_net("log/dr-monero/net/req-all.data", sec, get_avg_block_size()); MDEBUG("Asking for " << (req.prune ? "pruned" : "full") << " data, start/end " << tools::get_pruning_stripe(span.first, context.m_remote_blockchain_height, CRYPTONOTE_PRUNING_LOG_STRIPES) << "/" << tools::get_pruning_stripe(span.first + span.second - 1, context.m_remote_blockchain_height, CRYPTONOTE_PRUNING_LOG_STRIPES) << ", ours " << tools::get_pruning_stripe(m_core.get_blockchain_pruning_seed()) << ", peer stripe " << tools::get_pruning_stripe(context.m_pruning_seed)); context.m_num_requested += req.blocks.size(); post_notify(req, context); MLOG_PEER_STATE("requesting objects"); return true; } // we can do nothing, so drop this peer to make room for others unless we think we've downloaded all we need const uint64_t blockchain_height = m_core.get_current_blockchain_height(); if (std::max(blockchain_height, m_block_queue.get_next_needed_height(blockchain_height)) >= m_core.get_target_blockchain_height()) { context.m_state = cryptonote_connection_context::state_normal; MLOG_PEER_STATE("Nothing to do for now, switching to normal state"); return true; } MLOG_PEER_STATE("We can download nothing from this peer, dropping"); return false; } skip: context.m_needed_objects.clear(); // we might have been called from the "received chain entry" handler, and end up // here because we can't use any of those blocks (maybe because all of them are // actually already requested). In this case, if we can add blocks instead, do so if (m_core.get_current_blockchain_height() < m_core.get_target_blockchain_height()) { const boost::unique_lock sync{m_sync_lock, boost::try_to_lock}; if (sync.owns_lock()) { uint64_t start_height; std::vector blocks; boost::uuids::uuid span_connection_id; epee::net_utils::network_address span_origin; if (m_block_queue.get_next_span(start_height, blocks, span_connection_id, span_origin, true)) { LOG_DEBUG_CC(context, "No other thread is adding blocks, resuming"); MLOG_PEER_STATE("will try to add blocks next"); context.m_state = cryptonote_connection_context::state_standby; ++context.m_callback_request_count; m_p2p->request_callback(context); return true; } } } if(context.m_last_response_height < context.m_remote_blockchain_height-1) {//we have to fetch more objects ids, request blockchain entry NOTIFY_REQUEST_CHAIN::request r = {}; context.m_expect_height = m_core.get_current_blockchain_height(); m_core.get_short_chain_history(r.block_ids); CHECK_AND_ASSERT_MES(!r.block_ids.empty(), false, "Short chain history is empty"); if (!start_from_current_chain) { // we'll want to start off from where we are on that peer, which may not be added yet if (context.m_last_known_hash != crypto::null_hash && r.block_ids.front() != context.m_last_known_hash) { context.m_expect_height = std::numeric_limits::max(); r.block_ids.push_front(context.m_last_known_hash); } } handler_request_blocks_history( r.block_ids ); // change the limit(?), sleep(?) r.prune = m_sync_pruned_blocks; //std::string blob; // for calculate size of request //epee::serialization::store_t_to_binary(r, blob); //epee::net_utils::network_throttle_manager::get_global_throttle_inreq().logger_handle_net("log/dr-monero/net/req-all.data", sec, get_avg_block_size()); //LOG_PRINT_CCONTEXT_L1("r = " << 200); context.m_last_request_time = boost::posix_time::microsec_clock::universal_time(); context.m_expect_response = NOTIFY_RESPONSE_CHAIN_ENTRY::ID; MLOG_P2P_MESSAGE("-->>NOTIFY_REQUEST_CHAIN: m_block_ids.size()=" << r.block_ids.size() << ", start_from_current_chain " << start_from_current_chain); post_notify(r, context); MLOG_PEER_STATE("requesting chain"); }else { CHECK_AND_ASSERT_MES(context.m_last_response_height == context.m_remote_blockchain_height-1 && !context.m_needed_objects.size() && !context.m_requested_objects.size(), false, "request_missing_blocks final condition failed!" << "\r\nm_last_response_height=" << context.m_last_response_height << "\r\nm_remote_blockchain_height=" << context.m_remote_blockchain_height << "\r\nm_needed_objects.size()=" << context.m_needed_objects.size() << "\r\nm_requested_objects.size()=" << context.m_requested_objects.size() << "\r\non connection [" << epee::net_utils::print_connection_context_short(context)<< "]"); context.m_state = cryptonote_connection_context::state_normal; if (context.m_remote_blockchain_height >= m_core.get_target_blockchain_height()) { if (m_core.get_current_blockchain_height() >= m_core.get_target_blockchain_height()) on_connection_synchronized(); } else { MINFO(context << " we've reached this peer's blockchain height (theirs " << context.m_remote_blockchain_height << ", our target " << m_core.get_target_blockchain_height()); } } return true; } //------------------------------------------------------------------------------------------------------------------------ template bool t_cryptonote_protocol_handler::on_connection_synchronized() { bool val_expected = false; uint64_t current_blockchain_height = m_core.get_current_blockchain_height(); if(!m_core.is_within_compiled_block_hash_area(current_blockchain_height) && m_synchronized.compare_exchange_strong(val_expected, true)) { if ((current_blockchain_height > m_sync_start_height) && (m_sync_spans_downloaded > 0)) { uint64_t synced_blocks = current_blockchain_height - m_sync_start_height; // Report only after syncing an "interesting" number of blocks: if (synced_blocks > 20) { const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); uint64_t synced_seconds = (now - m_sync_start_time).total_seconds(); if (synced_seconds == 0) { synced_seconds = 1; } float blocks_per_second = (1000 * synced_blocks / synced_seconds) / 1000.0f; MGINFO_YELLOW("Synced " << synced_blocks << " blocks in " << tools::get_human_readable_timespan(synced_seconds) << " (" << blocks_per_second << " blocks per second)"); } } MGINFO_YELLOW(ENDL << "**********************************************************************" << ENDL << "You are now synchronized with the network. You may now start monero-wallet-cli." << ENDL << ENDL << "Use the \"help\" command to see the list of available commands." << ENDL << "**********************************************************************"); m_sync_timer.pause(); if (ELPP->vRegistry()->allowed(el::Level::Info, "sync-info")) { const uint64_t sync_time = m_sync_timer.value(); const uint64_t add_time = m_add_timer.value(); if (sync_time && add_time) { MCLOG_YELLOW(el::Level::Info, "sync-info", "Sync time: " << sync_time/1e9/60 << " min, idle time " << (100.f * (1.0f - add_time / (float)sync_time)) << "%" << ", " << (10 * m_sync_download_objects_size / 1024 / 1024) / 10.f << " + " << (10 * m_sync_download_chain_size / 1024 / 1024) / 10.f << " MB downloaded, " << 100.0f * m_sync_old_spans_downloaded / m_sync_spans_downloaded << "% old spans, " << 100.0f * m_sync_bad_spans_downloaded / m_sync_spans_downloaded << "% bad spans"); } } m_core.on_synchronized(); } m_core.safesyncmode(true); m_p2p->clear_used_stripe_peers(); // ask for txpool complement from any suitable node if we did not yet val_expected = true; if (m_ask_for_txpool_complement.compare_exchange_strong(val_expected, false)) { m_p2p->for_each_connection([&](cryptonote_connection_context& context, nodetool::peerid_type peer_id, uint32_t support_flags)->bool { if(context.m_state < cryptonote_connection_context::state_synchronizing) { MDEBUG(context << "not ready, ignoring"); return true; } if (!request_txpool_complement(context)) { MERROR(context << "Failed to request txpool complement"); return true; } return false; }); } return true; } //------------------------------------------------------------------------------------------------------------------------ template size_t t_cryptonote_protocol_handler::get_synchronizing_connections_count() { size_t count = 0; m_p2p->for_each_connection([&](cryptonote_connection_context& context, nodetool::peerid_type peer_id, uint32_t support_flags)->bool{ if(context.m_state == cryptonote_connection_context::state_synchronizing) ++count; return true; }); return count; } //------------------------------------------------------------------------------------------------------------------------ template int t_cryptonote_protocol_handler::handle_response_chain_entry(int command, NOTIFY_RESPONSE_CHAIN_ENTRY::request& arg, cryptonote_connection_context& context) { MLOG_P2P_MESSAGE("Received NOTIFY_RESPONSE_CHAIN_ENTRY: m_block_ids.size()=" << arg.m_block_ids.size() << ", m_start_height=" << arg.start_height << ", m_total_height=" << arg.total_height); MLOG_PEER_STATE("received chain"); if (context.m_expect_response != NOTIFY_RESPONSE_CHAIN_ENTRY::ID) { LOG_ERROR_CCONTEXT("Got NOTIFY_RESPONSE_CHAIN_ENTRY out of the blue, dropping connection"); drop_connection(context, true, false); return 1; } context.m_expect_response = 0; if (arg.start_height + 1 > context.m_expect_height) // we expect an overlapping block { LOG_ERROR_CCONTEXT("Got NOTIFY_RESPONSE_CHAIN_ENTRY past expected height, dropping connection"); drop_connection(context, true, false); return 1; } context.m_last_request_time = boost::date_time::not_a_date_time; m_sync_download_chain_size += arg.m_block_ids.size() * sizeof(crypto::hash); if(!arg.m_block_ids.size()) { LOG_ERROR_CCONTEXT("sent empty m_block_ids, dropping connection"); drop_connection(context, true, false); return 1; } if (arg.total_height < arg.m_block_ids.size() || arg.start_height > arg.total_height - arg.m_block_ids.size()) { LOG_ERROR_CCONTEXT("sent invalid start/nblocks/height, dropping connection"); drop_connection(context, true, false); return 1; } if (!arg.m_block_weights.empty() && arg.m_block_weights.size() != arg.m_block_ids.size()) { LOG_ERROR_CCONTEXT("sent invalid block weight array, dropping connection"); drop_connection(context, true, false); return 1; } MDEBUG(context << "first block hash " << arg.m_block_ids.front() << ", last " << arg.m_block_ids.back()); if (arg.total_height >= CRYPTONOTE_MAX_BLOCK_NUMBER || arg.m_block_ids.size() > BLOCKS_IDS_SYNCHRONIZING_MAX_COUNT) { LOG_ERROR_CCONTEXT("sent wrong NOTIFY_RESPONSE_CHAIN_ENTRY, with total_height=" << arg.total_height << " and block_ids=" << arg.m_block_ids.size()); drop_connection(context, false, false); return 1; } if (arg.total_height < context.m_remote_blockchain_height) { MINFO(context << "Claims " << arg.total_height << ", claimed " << context.m_remote_blockchain_height << " before"); hit_score(context, 1); } context.m_remote_blockchain_height = arg.total_height; context.m_last_response_height = arg.start_height + arg.m_block_ids.size()-1; if(context.m_last_response_height > context.m_remote_blockchain_height) { LOG_ERROR_CCONTEXT("sent wrong NOTIFY_RESPONSE_CHAIN_ENTRY, with m_total_height=" << arg.total_height << ", m_start_height=" << arg.start_height << ", m_block_ids.size()=" << arg.m_block_ids.size()); drop_connection(context, false, false); return 1; } uint64_t n_use_blocks = m_core.prevalidate_block_hashes(arg.start_height, arg.m_block_ids, arg.m_block_weights); if (n_use_blocks == 0 || n_use_blocks + HASH_OF_HASHES_STEP <= arg.m_block_ids.size()) { LOG_ERROR_CCONTEXT("Most blocks are invalid, dropping connection"); drop_connection(context, true, false); return 1; } context.m_needed_objects.clear(); context.m_needed_objects.reserve(arg.m_block_ids.size()); uint64_t added = 0; std::unordered_set blocks_found; bool first = true; bool expect_unknown = false; for (size_t i = 0; i < arg.m_block_ids.size(); ++i) { if (!blocks_found.insert(arg.m_block_ids[i]).second) { LOG_ERROR_CCONTEXT("Duplicate blocks in chain entry response, dropping connection"); drop_connection_with_score(context, 5, false); return 1; } int where; const bool have_block = m_core.have_block_unlocked(arg.m_block_ids[i], &where); if (first) { if (!have_block && !m_block_queue.requested(arg.m_block_ids[i]) && !m_block_queue.have(arg.m_block_ids[i])) { LOG_ERROR_CCONTEXT("First block hash is unknown, dropping connection"); drop_connection_with_score(context, 5, false); return 1; } if (!have_block) expect_unknown = true; } if (!first) { // after the first, blocks may be known or unknown, but if they are known, // they should be at the same height if on the main chain if (have_block) { switch (where) { default: case HAVE_BLOCK_INVALID: LOG_ERROR_CCONTEXT("Block is invalid or known without known type, dropping connection"); drop_connection(context, true, false); return 1; case HAVE_BLOCK_MAIN_CHAIN: if (expect_unknown) { LOG_ERROR_CCONTEXT("Block is on the main chain, but we did not expect a known block, dropping connection"); drop_connection_with_score(context, 5, false); return 1; } if (m_core.get_block_id_by_height(arg.start_height + i) != arg.m_block_ids[i]) { LOG_ERROR_CCONTEXT("Block is on the main chain, but not at the expected height, dropping connection"); drop_connection_with_score(context, 5, false); return 1; } break; case HAVE_BLOCK_ALT_CHAIN: if (expect_unknown) { LOG_ERROR_CCONTEXT("Block is on the main chain, but we did not expect a known block, dropping connection"); drop_connection_with_score(context, 5, false); return 1; } break; } } else expect_unknown = true; } const uint64_t block_weight = arg.m_block_weights.empty() ? 0 : arg.m_block_weights[i]; context.m_needed_objects.push_back(std::make_pair(arg.m_block_ids[i], block_weight)); if (++added == n_use_blocks) break; first = false; } context.m_last_response_height -= arg.m_block_ids.size() - n_use_blocks; if (!request_missing_objects(context, false)) { LOG_ERROR_CCONTEXT("Failed to request missing objects, dropping connection"); drop_connection(context, false, false); return 1; } if (arg.total_height > m_core.get_target_blockchain_height()) m_core.set_target_blockchain_height(arg.total_height); context.m_num_requested = 0; return 1; } //------------------------------------------------------------------------------------------------------------------------ template bool t_cryptonote_protocol_handler::relay_block(NOTIFY_NEW_BLOCK::request& arg, cryptonote_connection_context& exclude_context) { NOTIFY_NEW_FLUFFY_BLOCK::request fluffy_arg = AUTO_VAL_INIT(fluffy_arg); fluffy_arg.current_blockchain_height = arg.current_blockchain_height; std::vector fluffy_txs; fluffy_arg.b = arg.b; fluffy_arg.b.txs = fluffy_txs; // sort peers between fluffy ones and others std::vector> fullConnections, fluffyConnections; m_p2p->for_each_connection([this, &exclude_context, &fullConnections, &fluffyConnections](connection_context& context, nodetool::peerid_type peer_id, uint32_t support_flags) { // peer_id also filters out connections before handshake if (peer_id && exclude_context.m_connection_id != context.m_connection_id && context.m_remote_address.get_zone() == epee::net_utils::zone::public_) { if(m_core.fluffy_blocks_enabled() && (support_flags & P2P_SUPPORT_FLAG_FLUFFY_BLOCKS)) { LOG_DEBUG_CC(context, "PEER SUPPORTS FLUFFY BLOCKS - RELAYING THIN/COMPACT WHATEVER BLOCK"); fluffyConnections.push_back({context.m_remote_address.get_zone(), context.m_connection_id}); } else { LOG_DEBUG_CC(context, "PEER DOESN'T SUPPORT FLUFFY BLOCKS - RELAYING FULL BLOCK"); fullConnections.push_back({context.m_remote_address.get_zone(), context.m_connection_id}); } } return true; }); // send fluffy ones first, we want to encourage people to run that if (!fluffyConnections.empty()) { epee::levin::message_writer fluffyBlob{32 * 1024}; epee::serialization::store_t_to_binary(fluffy_arg, fluffyBlob.buffer); m_p2p->relay_notify_to_list(NOTIFY_NEW_FLUFFY_BLOCK::ID, std::move(fluffyBlob), std::move(fluffyConnections)); } if (!fullConnections.empty()) { epee::levin::message_writer fullBlob{128 * 1024}; epee::serialization::store_t_to_binary(arg, fullBlob.buffer); m_p2p->relay_notify_to_list(NOTIFY_NEW_BLOCK::ID, std::move(fullBlob), std::move(fullConnections)); } return true; } //------------------------------------------------------------------------------------------------------------------------ template bool t_cryptonote_protocol_handler::relay_transactions(NOTIFY_NEW_TRANSACTIONS::request& arg, const boost::uuids::uuid& source, epee::net_utils::zone zone, relay_method tx_relay) { /* Push all outgoing transactions to this function. The behavior needs to identify how the transaction is going to be relayed, and then update the local mempool before doing the relay. The code was already updating the DB twice on received transactions - it is difficult to workaround this due to the internal design. */ return m_p2p->send_txs(std::move(arg.txs), zone, source, tx_relay) != epee::net_utils::zone::invalid; } //------------------------------------------------------------------------------------------------------------------------ template bool t_cryptonote_protocol_handler::request_txpool_complement(cryptonote_connection_context &context) { NOTIFY_GET_TXPOOL_COMPLEMENT::request r = {}; if (!m_core.get_pool_transaction_hashes(r.hashes, false)) { MERROR("Failed to get txpool hashes"); return false; } MLOG_P2P_MESSAGE("-->>NOTIFY_GET_TXPOOL_COMPLEMENT: hashes.size()=" << r.hashes.size() ); post_notify(r, context); MLOG_PEER_STATE("requesting txpool complement"); return true; } //------------------------------------------------------------------------------------------------------------------------ template void t_cryptonote_protocol_handler::hit_score(cryptonote_connection_context &context, int32_t score) { if (score <= 0) { MERROR("Negative score hit"); return; } context.m_score -= score; if (context.m_score <= DROP_PEERS_ON_SCORE) drop_connection_with_score(context, 5, false); } //------------------------------------------------------------------------------------------------------------------------ template std::string t_cryptonote_protocol_handler::get_peers_overview() const { std::stringstream ss; const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); m_p2p->for_each_connection([&](const connection_context &ctx, nodetool::peerid_type peer_id, uint32_t support_flags) { const uint32_t stripe = tools::get_pruning_stripe(ctx.m_pruning_seed); char state_char = cryptonote::get_protocol_state_char(ctx.m_state); ss << stripe + state_char; if (ctx.m_last_request_time != boost::date_time::not_a_date_time) ss << (((now - ctx.m_last_request_time).total_microseconds() > IDLE_PEER_KICK_TIME) ? "!" : "?"); ss << + " "; return true; }); return ss.str(); } //------------------------------------------------------------------------------------------------------------------------ template std::pair t_cryptonote_protocol_handler::get_next_needed_pruning_stripe() const { const uint64_t want_height_from_blockchain = m_core.get_current_blockchain_height(); const uint64_t want_height_from_block_queue = m_block_queue.get_next_needed_height(want_height_from_blockchain); const uint64_t want_height = std::max(want_height_from_blockchain, want_height_from_block_queue); uint64_t blockchain_height = m_core.get_target_blockchain_height(); // if we don't know the remote chain size yet, assume infinitely large so we get the right stripe if we're not near the tip if (blockchain_height == 0) blockchain_height = CRYPTONOTE_MAX_BLOCK_NUMBER; const uint32_t next_pruning_stripe = tools::get_pruning_stripe(want_height, blockchain_height, CRYPTONOTE_PRUNING_LOG_STRIPES); if (next_pruning_stripe == 0) return std::make_pair(0, 0); // if we already have a few peers on this stripe, but none on next one, try next one unsigned int n_next = 0, n_subsequent = 0, n_others = 0; const uint32_t subsequent_pruning_stripe = 1 + next_pruning_stripe % (1<for_each_connection([&](const connection_context &context, nodetool::peerid_type peer_id, uint32_t support_flags) { if (context.m_state >= cryptonote_connection_context::state_synchronizing) { if (context.m_pruning_seed == 0 || tools::get_pruning_stripe(context.m_pruning_seed) == next_pruning_stripe) ++n_next; else if (tools::get_pruning_stripe(context.m_pruning_seed) == subsequent_pruning_stripe) ++n_subsequent; else ++n_others; } return true; }); // TODO: investigate tallying by zone and comparing to max out peers by zone const unsigned int max_out_peers = get_max_out_peers(epee::net_utils::zone::public_); const bool use_next = (n_next > max_out_peers / 2 && n_subsequent <= 1) || (n_next > 2 && n_subsequent == 0); const uint32_t ret_stripe = use_next ? subsequent_pruning_stripe: next_pruning_stripe; MIDEBUG(const std::string po = get_peers_overview(), "get_next_needed_pruning_stripe: want height " << want_height << " (" << want_height_from_blockchain << " from blockchain, " << want_height_from_block_queue << " from block queue), stripe " << next_pruning_stripe << " (" << n_next << "/" << max_out_peers << " on it and " << n_subsequent << " on " << subsequent_pruning_stripe << ", " << n_others << " others) -> " << ret_stripe << " (+" << (ret_stripe - next_pruning_stripe + (1 << CRYPTONOTE_PRUNING_LOG_STRIPES)) % (1 << CRYPTONOTE_PRUNING_LOG_STRIPES) << "), current peers " << po); return std::make_pair(next_pruning_stripe, ret_stripe); } //------------------------------------------------------------------------------------------------------------------------ template bool t_cryptonote_protocol_handler::needs_new_sync_connections(epee::net_utils::zone zone) const { const uint64_t target = m_core.get_target_blockchain_height(); const uint64_t height = m_core.get_current_blockchain_height(); if (target && target <= height) return false; size_t n_out_peers = 0; m_p2p->for_each_connection([&](cryptonote_connection_context& ctx, nodetool::peerid_type peer_id, uint32_t support_flags)->bool{ if (!ctx.m_is_income && ctx.m_remote_address.get_zone() == zone) ++n_out_peers; return true; }); if (n_out_peers >= get_max_out_peers(zone)) return false; return true; } //------------------------------------------------------------------------------------------------------------------------ template bool t_cryptonote_protocol_handler::is_busy_syncing() { const boost::unique_lock sync{m_sync_lock, boost::try_to_lock}; return !sync.owns_lock(); } //------------------------------------------------------------------------------------------------------------------------ template void t_cryptonote_protocol_handler::drop_connection_with_score(cryptonote_connection_context &context, unsigned score, bool flush_all_spans) { LOG_DEBUG_CC(context, "dropping connection id " << context.m_connection_id << " (pruning seed " << epee::string_tools::to_string_hex(context.m_pruning_seed) << "), score " << score << ", flush_all_spans " << flush_all_spans); if (score > 0) m_p2p->add_host_fail(context.m_remote_address, score); m_block_queue.flush_spans(context.m_connection_id, flush_all_spans); m_p2p->drop_connection(context); } //------------------------------------------------------------------------------------------------------------------------ template void t_cryptonote_protocol_handler::drop_connection(cryptonote_connection_context &context, bool add_fail, bool flush_all_spans) { return drop_connection_with_score(context, add_fail ? 1 : 0, flush_all_spans); } //------------------------------------------------------------------------------------------------------------------------ template void t_cryptonote_protocol_handler::drop_connections(const epee::net_utils::network_address address) { MWARNING("dropping connections to " << address.str()); m_p2p->add_host_fail(address, 5); std::vector drop; m_p2p->for_each_connection([&](const connection_context& cntxt, nodetool::peerid_type peer_id, uint32_t support_flags) { if (address.is_same_host(cntxt.m_remote_address)) drop.push_back(cntxt.m_connection_id); return true; }); for (const boost::uuids::uuid &id: drop) { m_block_queue.flush_spans(id, true); m_p2p->for_connection(id, [&](cryptonote_connection_context& context, nodetool::peerid_type peer_id, uint32_t f)->bool{ drop_connection(context, true, false); return true; }); } } //------------------------------------------------------------------------------------------------------------------------ template void t_cryptonote_protocol_handler::on_connection_close(cryptonote_connection_context &context) { uint64_t target = 0; m_p2p->for_each_connection([&](const connection_context& cntxt, nodetool::peerid_type peer_id, uint32_t support_flags) { if (cntxt.m_state >= cryptonote_connection_context::state_synchronizing && cntxt.m_connection_id != context.m_connection_id) target = std::max(target, cntxt.m_remote_blockchain_height); return true; }); const uint64_t previous_target = m_core.get_target_blockchain_height(); if (target < previous_target) { MINFO("Target height decreasing from " << previous_target << " to " << target); m_core.set_target_blockchain_height(target); if (target == 0 && context.m_state > cryptonote_connection_context::state_before_handshake && !m_stopping) { MCWARNING("global", "monerod is now disconnected from the network"); m_ask_for_txpool_complement = true; } } m_block_queue.flush_spans(context.m_connection_id, false); MLOG_PEER_STATE("closed"); } //------------------------------------------------------------------------------------------------------------------------ template void t_cryptonote_protocol_handler::stop() { m_stopping = true; m_core.stop(); } } // namespace