add --sync-notify option

Triggered when starting/stopping syncing

%s: 1 if starting syncing, 0 if stopping
%h: current chain height
%t: target chain height
pull/8000/head
moneromooo-monero 3 years ago
parent b58a9fb12e
commit cc51354271
No known key found for this signature in database
GPG Key ID: 686F07454D6CEFC3

@ -193,6 +193,12 @@ namespace cryptonote
, "Run a program for each new block, '%s' will be replaced by the block hash"
, ""
};
static const command_line::arg_descriptor<std::string> arg_sync_notify = {
"sync-notify"
, "Run a program when we start or stop syncing, '%h' will be replaced by the "
"current height, '%t' by the target height, '%s' by 1 if synced, 0 if not."
, ""
};
static const command_line::arg_descriptor<bool> arg_prune_blockchain = {
"prune-blockchain"
, "Prune blockchain"
@ -345,6 +351,7 @@ namespace cryptonote
command_line::add_arg(desc, arg_sync_pruned_blocks);
command_line::add_arg(desc, arg_max_txpool_weight);
command_line::add_arg(desc, arg_block_notify);
command_line::add_arg(desc, arg_sync_notify);
command_line::add_arg(desc, arg_prune_blockchain);
command_line::add_arg(desc, arg_reorg_notify);
command_line::add_arg(desc, arg_block_rate_notify);
@ -647,6 +654,28 @@ namespace cryptonote
MERROR("Failed to parse block notify spec: " << e.what());
}
try
{
if (!command_line::is_arg_defaulted(vm, arg_sync_notify))
{
struct sync_notify
{
tools::Notify cmdline;
void operator()(bool syncing, std::uint64_t height, std::uint64_t target) const
{
cmdline.notify("%s", syncing ? "1" : " 0", "%h", std::to_string(height).c_str(), "%t", std::to_string(target).c_str(), NULL);
}
};
add_sync_notify(sync_notify{{command_line::get_arg(vm, arg_sync_notify).c_str()}});
}
}
catch (const std::exception &e)
{
MERROR("Failed to parse sync notify spec: " << e.what());
}
try
{
if (!command_line::is_arg_defaulted(vm, arg_reorg_notify))
@ -1451,6 +1480,32 @@ namespace cryptonote
m_miner.resume();
}
//-----------------------------------------------------------------------------------------------
void core::add_sync_notify(boost::function<void(bool, std::uint64_t, std::uint64_t)>&& notify)
{
if (notify)
{
m_sync_notifiers.push_back(std::move(notify));
}
}
//-----------------------------------------------------------------------------------------------
void core::on_start_syncing(uint64_t target)
{
MINFO("Starting syncing");
const uint64_t current_blockchain_height = get_current_blockchain_height();
if (target >= current_blockchain_height + 5) // don't switch to unsafe mode just for a few blocks
safesyncmode(false);
for (const auto& notifier : m_sync_notifiers)
notifier(true, current_blockchain_height, target);
}
//-----------------------------------------------------------------------------------------------
void core::on_stop_syncing()
{
MINFO("Stopping syncing");
safesyncmode(true);
for (const auto& notifier : m_sync_notifiers)
notifier(false, get_current_blockchain_height(), 0);
}
//-----------------------------------------------------------------------------------------------
block_complete_entry get_block_complete_entry(block& b, tx_memory_pool &pool)
{
block_complete_entry bce;
@ -1537,7 +1592,12 @@ namespace cryptonote
//-----------------------------------------------------------------------------------------------
bool core::add_new_block(const block& b, block_verification_context& bvc)
{
return m_blockchain_storage.add_new_block(b, bvc);
const bool syncing = get_current_blockchain_height() < get_target_blockchain_height();
if (!m_blockchain_storage.add_new_block(b, bvc))
return false;
if (syncing && get_current_blockchain_height() >= get_target_blockchain_height())
on_stop_syncing();
return true;
}
//-----------------------------------------------------------------------------------------------
@ -2025,6 +2085,11 @@ namespace cryptonote
//-----------------------------------------------------------------------------------------------
void core::set_target_blockchain_height(uint64_t target_blockchain_height)
{
const uint64_t height = get_current_blockchain_height();
if (m_target_blockchain_height > height && target_blockchain_height <= height)
on_stop_syncing();
else if (target_blockchain_height > height)
on_start_syncing(target_blockchain_height);
m_target_blockchain_height = target_blockchain_height;
}
//-----------------------------------------------------------------------------------------------

@ -893,6 +893,16 @@ namespace cryptonote
*/
bool get_txpool_complement(const std::vector<crypto::hash> &hashes, std::vector<cryptonote::blobdata> &txes);
/**
* @brief sets a sync notify object to call when we start/stop syncing
*
* @param notify the notify object to call
*/
void add_sync_notify(boost::function<void(bool, uint64_t, uint64_t)> &&notify);
void on_start_syncing(uint64_t target);
void on_stop_syncing();
private:
/**
@ -1131,6 +1141,8 @@ namespace cryptonote
std::shared_ptr<tools::Notify> m_block_rate_notify;
boost::function<void(std::vector<txpool_event>)> m_zmq_pub;
std::vector<boost::function<void(bool, std::uint64_t, std::uint64_t)>> m_sync_notifiers;
};
}

@ -111,6 +111,7 @@ namespace cryptonote
void log_connections();
std::list<connection_info> get_connections();
const block_queue &get_block_queue() const { return m_block_queue; }
bool has_more_blocks_queued() const { return m_block_queue.get_data_size() > 0; }
void stop();
void on_connection_close(cryptonote_connection_context &context);
void set_max_out_peers(unsigned int max) { m_max_out_peers = max; }

@ -407,10 +407,6 @@ namespace cryptonote
<< " [Your node is " << abs_diff << " blocks (" << tools::get_human_readable_timespan((abs_diff - diff_v2) * DIFFICULTY_TARGET_V1 + diff_v2 * DIFFICULTY_TARGET_V2) << ") "
<< (0 <= diff ? std::string("behind") : std::string("ahead"))
<< "] " << ENDL << "SYNCHRONIZATION started");
if (hshd.current_height >= m_core.get_current_blockchain_height() + 5) // don't switch to unsafe mode just for a few blocks
{
m_core.safesyncmode(false);
}
if (m_core.get_target_blockchain_height() == 0) // only when sync starts
{
m_sync_timer.resume();
@ -2479,7 +2475,6 @@ skip:
}
m_core.on_synchronized();
}
m_core.safesyncmode(true);
m_p2p->clear_used_stripe_peers();
// ask for txpool complement from any suitable node if we did not yet

@ -121,6 +121,7 @@ public:
{
core.get().get_blockchain_storage().add_block_notify(cryptonote::listener::zmq_pub::chain_main{shared});
core.get().get_blockchain_storage().add_miner_notify(cryptonote::listener::zmq_pub::miner_data{shared});
core.get().add_sync_notify(cryptonote::listener::zmq_pub::sync{shared});
core.get().set_txpool_listener(cryptonote::listener::zmq_pub::txpool_add{shared});
}
}

@ -57,9 +57,11 @@
namespace
{
constexpr const char txpool_signal[] = "tx_signal";
constexpr const char sync_signal[] = "sync_signal";
using chain_writer = void(epee::byte_stream&, std::uint64_t, epee::span<const cryptonote::block>);
using miner_writer = void(epee::byte_stream&, uint8_t, uint64_t, const crypto::hash&, const crypto::hash&, cryptonote::difficulty_type, uint64_t, uint64_t, const std::vector<cryptonote::tx_block_template_backlog_entry>&);
using sync_writer = void(epee::byte_stream&, bool, std::uint64_t, std::uint64_t);
using txpool_writer = void(epee::byte_stream&, epee::span<const cryptonote::txpool_event>);
template<typename F>
@ -132,6 +134,14 @@ namespace
const std::vector<cryptonote::tx_block_template_backlog_entry>& tx_backlog;
};
//! Object for sync notification serialization
struct minimal_sync
{
const bool syncing;
const std::uint64_t height;
const std::uint64_t target;
};
//! Object for "minimal" tx serialization
struct minimal_txpool
{
@ -187,6 +197,17 @@ namespace
dest.EndObject();
}
void toJsonValue(rapidjson::Writer<epee::byte_stream>& dest, const minimal_sync self)
{
namespace adapt = boost::adaptors;
dest.StartObject();
INSERT_INTO_JSON_OBJECT(dest, syncing, self.syncing);
INSERT_INTO_JSON_OBJECT(dest, height, self.height);
INSERT_INTO_JSON_OBJECT(dest, target, self.target);
dest.EndObject();
}
void json_full_chain(epee::byte_stream& buf, const std::uint64_t height, const epee::span<const cryptonote::block> blocks)
{
json_pub(buf, blocks);
@ -202,6 +223,11 @@ namespace
json_pub(buf, miner_data{major_version, height, prev_id, seed_hash, diff, median_weight, already_generated_coins, tx_backlog});
}
void json_minimal_sync(epee::byte_stream& buf, bool syncing, const std::uint64_t height, const std::uint64_t target)
{
json_pub(buf, minimal_sync{syncing, height, target});
}
// boost::adaptors are in place "views" - no copy/move takes place
// moving transactions (via sort, etc.), is expensive!
@ -236,6 +262,12 @@ namespace
{u8"json-full-miner_data", json_miner_data},
}};
constexpr const std::array<context<sync_writer>, 2> sync_contexts =
{{
{u8"json-full-sync", json_minimal_sync},
{u8"json-minimal-sync", json_minimal_sync},
}};
constexpr const std::array<context<txpool_writer>, 2> txpool_contexts =
{{
{u8"json-full-txpool_add", json_full_txpool},
@ -336,7 +368,7 @@ namespace
zmq_msg_size(std::addressof(msg))
};
if (payload == txpool_signal)
if (payload == txpool_signal || payload == sync_signal)
{
zmq_msg_close(std::addressof(msg));
return false;
@ -360,6 +392,7 @@ zmq_pub::zmq_pub(void* context)
: relay_(),
chain_subs_{{0}},
miner_subs_{{0}},
sync_subs_({0}),
txpool_subs_{{0}},
sync_()
{
@ -368,6 +401,7 @@ zmq_pub::zmq_pub(void* context)
verify_sorted(chain_contexts, "chain_contexts");
verify_sorted(miner_contexts, "miner_contexts");
verify_sorted(sync_contexts, "sync_contexts");
verify_sorted(txpool_contexts, "txpool_contexts");
relay_.reset(zmq_socket(context, ZMQ_PAIR));
@ -389,12 +423,13 @@ bool zmq_pub::sub_request(boost::string_ref message)
const auto chain_range = get_range(chain_contexts, message);
const auto miner_range = get_range(miner_contexts, message);
const auto sync_range = get_range(sync_contexts, message);
const auto txpool_range = get_range(txpool_contexts, message);
if (!chain_range.empty() || !miner_range.empty() || !txpool_range.empty())
if (!chain_range.empty() || !miner_range.empty() || !sync_range.empty() || !txpool_range.empty())
{
MDEBUG("Client " << (tag ? "subscribed" : "unsubscribed") << " to " <<
chain_range.size() << " chain topic(s), " << miner_range.size() << " miner topic(s) and " << txpool_range.size() << " txpool topic(s)");
chain_range.size() << " chain topic(s), " << miner_range.size() << " miner topic(s), " << sync_range.size() << " sync topic(s) and " << txpool_range.size() << " txpool topic(s)");
const boost::lock_guard<boost::mutex> lock{sync_};
switch (tag)
@ -402,11 +437,13 @@ bool zmq_pub::sub_request(boost::string_ref message)
case 0:
remove_subscriptions(chain_subs_, chain_range, chain_contexts.begin());
remove_subscriptions(miner_subs_, miner_range, miner_contexts.begin());
remove_subscriptions(sync_subs_, sync_range, sync_contexts.begin());
remove_subscriptions(txpool_subs_, txpool_range, txpool_contexts.begin());
return true;
case 1:
add_subscriptions(chain_subs_, chain_range, chain_contexts.begin());
add_subscriptions(miner_subs_, miner_range, miner_contexts.begin());
add_subscriptions(sync_subs_, sync_range, sync_contexts.begin());
add_subscriptions(txpool_subs_, txpool_range, txpool_contexts.begin());
return true;
default:
@ -498,6 +535,20 @@ std::size_t zmq_pub::send_miner_data(uint8_t major_version, uint64_t height, con
return 0;
}
std::size_t zmq_pub::send_sync(bool syncing, std::uint64_t height, std::uint64_t target)
{
const boost::lock_guard<boost::mutex> lock{sync_};
for (const std::size_t sub : sync_subs_)
{
if (sub)
{
auto messages = make_pubs(sync_subs_, sync_contexts, syncing, height, target);
return send_messages(relay_.get(), messages);
}
}
return 0;
}
std::size_t zmq_pub::send_txpool_add(std::vector<txpool_event> txes)
{
if (txes.empty())
@ -537,6 +588,15 @@ void zmq_pub::miner_data::operator()(uint8_t major_version, uint64_t height, con
MERROR("Unable to send ZMQ/Pub - ZMQ server destroyed");
}
void zmq_pub::sync::operator()(bool syncing, std::uint64_t height, std::uint64_t target) const
{
const std::shared_ptr<zmq_pub> self = self_.lock();
if (self)
self->send_sync(syncing, height, target);
else
MERROR("Unable to send ZMQ/Pub - ZMQ server destroyed");
}
void zmq_pub::txpool_add::operator()(std::vector<cryptonote::txpool_event> txes) const
{
const std::shared_ptr<zmq_pub> self = self_.lock();

@ -61,6 +61,7 @@ class zmq_pub
std::deque<std::vector<txpool_event>> txes_;
std::array<std::size_t, 2> chain_subs_;
std::array<std::size_t, 1> miner_subs_;
std::array<std::size_t, 2> sync_subs_;
std::array<std::size_t, 2> txpool_subs_;
boost::mutex sync_; //!< Synchronizes counts in `*_subs_` arrays.
@ -95,6 +96,11 @@ class zmq_pub
\return Number of ZMQ messages sent to relay. */
std::size_t send_miner_data(uint8_t major_version, uint64_t height, const crypto::hash& prev_id, const crypto::hash& seed_hash, difficulty_type diff, uint64_t median_weight, uint64_t already_generated_coins, const std::vector<tx_block_template_backlog_entry>& tx_backlog);
/*! Send a `ZMQ_PUB` notification when starting/stopping syncing.
Thread-safe.
\return Number of ZMQ messages sent to relay. */
std::size_t send_sync(bool syncing, std::uint64_t height, std::uint64_t target);
/*! Send a `ZMQ_PUB` notification for new tx(es) being added to the local
pool. Thread-safe.
\return Number of ZMQ messages sent to relay. */
@ -114,6 +120,13 @@ class zmq_pub
void operator()(uint8_t major_version, uint64_t height, const crypto::hash& prev_id, const crypto::hash& seed_hash, difficulty_type diff, uint64_t median_weight, uint64_t already_generated_coins, const std::vector<tx_block_template_backlog_entry>& tx_backlog) const;
};
//! Callable for `send_sync` with weak ownership to `zmq_pub` object.
struct sync
{
std::weak_ptr<zmq_pub> self_;
void operator()(bool, std::uint64_t height, std::uint64_t target) const;
};
//! Callable for `send_txpool_add` with weak ownership to `zmq_pub` object.
struct txpool_add
{

Loading…
Cancel
Save