From e5214a2ca22cecf123bcff1ab441ed0415d08a6f Mon Sep 17 00:00:00 2001 From: Lee Clagett Date: Mon, 16 Mar 2020 23:59:26 +0000 Subject: [PATCH] Adding ZMQ/Pub support for txpool_add and chain_main events --- ZMQ.md | 61 ++ src/common/notify.cpp | 2 +- src/common/notify.h | 7 +- src/cryptonote_basic/events.h | 46 ++ src/cryptonote_basic/fwd.h | 36 ++ src/cryptonote_core/blockchain.cpp | 31 +- src/cryptonote_core/blockchain.h | 9 +- src/cryptonote_core/cryptonote_core.cpp | 38 +- src/cryptonote_core/cryptonote_core.h | 15 + src/daemon/command_line_args.h | 4 + src/daemon/daemon.cpp | 71 ++- src/daemon/daemon.h | 3 - src/daemon/main.cpp | 1 + src/net/zmq.cpp | 14 - src/net/zmq.h | 22 + src/rpc/CMakeLists.txt | 22 +- src/rpc/fwd.h | 37 ++ src/rpc/zmq_pub.cpp | 478 ++++++++++++++++ src/rpc/zmq_pub.h | 110 ++++ src/rpc/zmq_server.cpp | 171 ++++-- src/rpc/zmq_server.h | 23 +- src/serialization/json_object.h | 2 +- tests/unit_tests/CMakeLists.txt | 1 + tests/unit_tests/json_serialization.cpp | 19 +- tests/unit_tests/json_serialization.h | 42 ++ tests/unit_tests/zmq_rpc.cpp | 722 ++++++++++++++++++++++++ 26 files changed, 1860 insertions(+), 127 deletions(-) create mode 100644 ZMQ.md create mode 100644 src/cryptonote_basic/events.h create mode 100644 src/cryptonote_basic/fwd.h create mode 100644 src/rpc/fwd.h create mode 100644 src/rpc/zmq_pub.cpp create mode 100644 src/rpc/zmq_pub.h create mode 100644 tests/unit_tests/json_serialization.h diff --git a/ZMQ.md b/ZMQ.md new file mode 100644 index 000000000..9128ff2ad --- /dev/null +++ b/ZMQ.md @@ -0,0 +1,61 @@ +# The Current/Future Status of ZMQ in Monero + +## ZMQ Pub/Sub +Client `ZMQ_SUB` sockets must "subscribe" to topics before it receives any data. +This allows filtering on the server side, so network traffic is reduced. Monero +allows for filtering on: (1) format, (2) context, and (3) event. + + * **format** refers to the _wire_ format (i.e. JSON) used to send event + information. + * **context** allows for a reduction in fields for the event, so the + daemon doesn't waste cycles serializing fields that get ignored. + * **event** refers to status changes occurring within the daemon (i.e. new + block to main chain). + + * Formats: + * `json` + * Contexts: + * `full` - the entire block or transaction is transmitted (the hash can be + computed remotely). + * `minimal` - the bare minimum for a remote client to react to an event is + sent. + * Events: + * `chain_main` - changes to the primary/main blockchain. + * `txpool_add` - new _publicly visible_ transactions in the mempool. + Includes previously unseen transactions in a block but _not_ the + `miner_tx`. Does not "re-publish" after a reorg. Includes `do_not_relay` + transactions. + +The subscription topics are formatted as `format-context-event`, with prefix +matching supported by both Monero and ZMQ. The `format`, `context` and `event` +will _never_ have hyphens or colons in their name. For example, subscribing to +`json-minimal-chain_main` will send minimal information in JSON when changes +to the main/primary blockchain occur. Whereas, subscribing to `json-minimal` +will send minimal information in JSON on all available events supported by the +daemon. + +The Monero daemon will ensure that events prefixed by `chain` will be sent in +"chain-order" - the `prev_id` (hash) field will _always_ refer to a previous +block. On rollbacks/reorgs, the event will reference an earlier block in the +chain instead of the last block. The Monero daemon also ensures that +`txpool_add` events are sent before `chain_*` events - the `chain_*` messages +will only serialize miner transactions since the other transactions were +previously published via `txpool_add`. This prevents transactions from being +serialized twice, even when the transaction was first observed in a block. + +ZMQ Pub/Sub will drop messages if the network is congested, so the above rules +for send order are used for detecting lost messages. A missing gap in `height` +or `prev_id` for `chain_*` events indicates a lost pub message. Missing +`txpool_add` messages can only be detected at the next `chain_` message. + +Since blockchain events can be dropped, clients will likely want to have a +timeout against `chain_main` events. The `GetLastBlockHeader` RPC is useful +for checking the current chain state. Dropped messages should be rare in most +conditions. + +The Monero daemon will send a `txpool_add` pub exactly once for each +transaction, even after a reorg or restarts. Clients should use the +`GetTransactionPool` after a reorg to get all transactions that have been put +back into the tx pool or been invalidated due to a double-spend. + + diff --git a/src/common/notify.cpp b/src/common/notify.cpp index e2df5096d..f31100214 100644 --- a/src/common/notify.cpp +++ b/src/common/notify.cpp @@ -62,7 +62,7 @@ static void replace(std::vector &v, const char *tag, const char *s) boost::replace_all(str, tag, s); } -int Notify::notify(const char *tag, const char *s, ...) +int Notify::notify(const char *tag, const char *s, ...) const { std::vector margs = args; diff --git a/src/common/notify.h b/src/common/notify.h index f813e8def..65d4e1072 100644 --- a/src/common/notify.h +++ b/src/common/notify.h @@ -38,8 +38,12 @@ class Notify { public: Notify(const char *spec); + Notify(const Notify&) = default; + Notify(Notify&&) = default; + Notify& operator=(const Notify&) = default; + Notify& operator=(Notify&&) = default; - int notify(const char *tag, const char *s, ...); + int notify(const char *tag, const char *s, ...) const; private: std::string filename; @@ -47,3 +51,4 @@ private: }; } + diff --git a/src/cryptonote_basic/events.h b/src/cryptonote_basic/events.h new file mode 100644 index 000000000..6c6742215 --- /dev/null +++ b/src/cryptonote_basic/events.h @@ -0,0 +1,46 @@ +// Copyright (c) 2020, The Monero Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +#include "crypto/hash.h" +#include "cryptonote_basic/cryptonote_basic.h" + +namespace cryptonote +{ + /*! Transactions are expensive to move or copy (lots of 32-byte internal + buffers). This allows `cryptonote::core` to do a single notification for + a vector of transactions, without having to move/copy duplicate or invalid + transactions. */ + struct txpool_event + { + cryptonote::transaction tx; + crypto::hash hash; + bool res; //!< Listeners must ignore `tx` when this is false. + }; +} diff --git a/src/cryptonote_basic/fwd.h b/src/cryptonote_basic/fwd.h new file mode 100644 index 000000000..d54223461 --- /dev/null +++ b/src/cryptonote_basic/fwd.h @@ -0,0 +1,36 @@ +// Copyright (c) 2020, The Monero Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +namespace cryptonote +{ + struct block; + class transaction; + struct txpool_event; +} diff --git a/src/cryptonote_core/blockchain.cpp b/src/cryptonote_core/blockchain.cpp index 7851b0f6a..b0726981c 100644 --- a/src/cryptonote_core/blockchain.cpp +++ b/src/cryptonote_core/blockchain.cpp @@ -1234,10 +1234,15 @@ bool Blockchain::switch_to_alternative_blockchain(std::list reorg_notify->notify("%s", std::to_string(split_height).c_str(), "%h", std::to_string(m_db->height()).c_str(), "%n", std::to_string(m_db->height() - split_height).c_str(), "%d", std::to_string(discarded_blocks).c_str(), NULL); - std::shared_ptr block_notify = m_block_notify; - if (block_notify) - for (const auto &bei: alt_chain) - block_notify->notify("%s", epee::string_tools::pod_to_hex(get_block_hash(bei.bl)).c_str(), NULL); + for (const auto& notifier : m_block_notifiers) + { + std::size_t notify_height = split_height; + for (const auto& bei: alt_chain) + { + notifier(notify_height, {std::addressof(bei.bl), 1}); + ++notify_height; + } + } MGINFO_GREEN("REORGANIZE SUCCESS! on height: " << split_height << ", new blockchain size: " << m_db->height()); return true; @@ -4236,12 +4241,9 @@ leave: get_difficulty_for_next_block(); // just to cache it invalidate_block_template_cache(); - if (notify) - { - std::shared_ptr block_notify = m_block_notify; - if (block_notify) - block_notify->notify("%s", epee::string_tools::pod_to_hex(id).c_str(), NULL); - } + + for (const auto& notifier: m_block_notifiers) + notifier(new_height - 1, {std::addressof(bl), 1}); return true; } @@ -5132,6 +5134,15 @@ void Blockchain::set_user_options(uint64_t maxthreads, bool sync_on_blocks, uint m_max_prepare_blocks_threads = maxthreads; } +void Blockchain::add_block_notify(boost::function)>&& notify) +{ + if (notify) + { + CRITICAL_REGION_LOCAL(m_blockchain_lock); + m_block_notifiers.push_back(std::move(notify)); + } +} + void Blockchain::safesyncmode(const bool onoff) { /* all of this is no-op'd if the user set a specific diff --git a/src/cryptonote_core/blockchain.h b/src/cryptonote_core/blockchain.h index fb7e5c4f8..703dd6400 100644 --- a/src/cryptonote_core/blockchain.h +++ b/src/cryptonote_core/blockchain.h @@ -30,6 +30,7 @@ #pragma once #include +#include #include #include #include @@ -764,7 +765,7 @@ namespace cryptonote * * @param notify the notify object to call at every new block */ - void set_block_notify(const std::shared_ptr ¬ify) { m_block_notify = notify; } + void add_block_notify(boost::function)> &¬ify); /** * @brief sets a reorg notify object to call for every reorg @@ -1125,7 +1126,11 @@ namespace cryptonote bool m_batch_success; - std::shared_ptr m_block_notify; + /* `boost::function` is used because the implementation never allocates if + the callable object has a single `std::shared_ptr` or `std::weap_ptr` + internally. Whereas, the libstdc++ `std::function` will allocate. */ + + std::vector)>> m_block_notifiers; std::shared_ptr m_reorg_notify; // for prepare_handle_incoming_blocks diff --git a/src/cryptonote_core/cryptonote_core.cpp b/src/cryptonote_core/cryptonote_core.cpp index 141e54459..ce9c8878d 100644 --- a/src/cryptonote_core/cryptonote_core.cpp +++ b/src/cryptonote_core/cryptonote_core.cpp @@ -41,6 +41,7 @@ using namespace epee; #include "common/download.h" #include "common/threadpool.h" #include "common/command_line.h" +#include "cryptonote_basic/events.h" #include "warnings.h" #include "crypto/crypto.h" #include "cryptonote_config.h" @@ -51,6 +52,7 @@ using namespace epee; #include "ringct/rctTypes.h" #include "blockchain_db/blockchain_db.h" #include "ringct/rctSigs.h" +#include "rpc/zmq_pub.h" #include "common/notify.h" #include "hardforks/hardforks.h" #include "version.h" @@ -262,6 +264,13 @@ namespace cryptonote { m_blockchain_storage.set_enforce_dns_checkpoints(enforce_dns); } + //----------------------------------------------------------------------------------- + void core::set_txpool_listener(boost::function)> zmq_pub) + { + CRITICAL_REGION_LOCAL(m_incoming_tx_lock); + m_zmq_pub = std::move(zmq_pub); + } + //----------------------------------------------------------------------------------------------- bool core::update_checkpoints(const bool skip_dns /* = false */) { @@ -614,7 +623,20 @@ namespace cryptonote try { if (!command_line::is_arg_defaulted(vm, arg_block_notify)) - m_blockchain_storage.set_block_notify(std::shared_ptr(new tools::Notify(command_line::get_arg(vm, arg_block_notify).c_str()))); + { + struct hash_notify + { + tools::Notify cmdline; + + void operator()(std::uint64_t, epee::span blocks) const + { + for (const block bl : blocks) + cmdline.notify("%s", epee::string_tools::pod_to_hex(get_block_hash(bl)).c_str(), NULL); + } + }; + + m_blockchain_storage.add_block_notify(hash_notify{{command_line::get_arg(vm, arg_block_notify).c_str()}}); + } } catch (const std::exception &e) { @@ -957,8 +979,7 @@ namespace cryptonote return false; } - struct result { bool res; cryptonote::transaction tx; crypto::hash hash; }; - std::vector results(tx_blobs.size()); + std::vector results(tx_blobs.size()); CRITICAL_REGION_LOCAL(m_incoming_tx_lock); @@ -1023,6 +1044,7 @@ namespace cryptonote if (!tx_info.empty()) handle_incoming_tx_accumulated_batch(tx_info, tx_relay == relay_method::block); + bool valid_events = false; bool ok = true; it = tx_blobs.begin(); for (size_t i = 0; i < tx_blobs.size(); i++, ++it) { @@ -1045,10 +1067,18 @@ namespace cryptonote {MERROR_VER("Transaction verification impossible: " << results[i].hash);} if(tvc[i].m_added_to_pool) + { MDEBUG("tx added: " << results[i].hash); + valid_events = true; + } + else + results[i].res = false; } - return ok; + if (valid_events && m_zmq_pub && matches_category(tx_relay, relay_category::legacy)) + m_zmq_pub(std::move(results)); + + return ok; CATCH_ENTRY_L0("core::handle_incoming_txs()", false); } //----------------------------------------------------------------------------------------------- diff --git a/src/cryptonote_core/cryptonote_core.h b/src/cryptonote_core/cryptonote_core.h index 6a9ffda92..a53596c2c 100644 --- a/src/cryptonote_core/cryptonote_core.h +++ b/src/cryptonote_core/cryptonote_core.h @@ -32,9 +32,11 @@ #include +#include #include #include +#include "cryptonote_basic/fwd.h" #include "cryptonote_core/i_core_events.h" #include "cryptonote_protocol/cryptonote_protocol_handler_common.h" #include "cryptonote_protocol/enums.h" @@ -48,6 +50,7 @@ #include "warnings.h" #include "crypto/hash.h" #include "span.h" +#include "rpc/fwd.h" PUSH_WARNINGS DISABLE_VS_WARNINGS(4355) @@ -445,6 +448,13 @@ namespace cryptonote */ void set_enforce_dns_checkpoints(bool enforce_dns); + /** + * @brief set a listener for txes being added to the txpool + * + * @param callable to notify, or empty function to disable. + */ + void set_txpool_listener(boost::function)> zmq_pub); + /** * @brief set whether or not to enable or disable DNS checkpoints * @@ -1098,7 +1108,12 @@ namespace cryptonote bool m_fluffy_blocks_enabled; bool m_offline; + /* `boost::function` is used because the implementation never allocates if + the callable object has a single `std::shared_ptr` or `std::weap_ptr` + internally. Whereas, the libstdc++ `std::function` will allocate. */ + std::shared_ptr m_block_rate_notify; + boost::function)> m_zmq_pub; }; } diff --git a/src/daemon/command_line_args.h b/src/daemon/command_line_args.h index 0ce987bcc..6c3e163e6 100644 --- a/src/daemon/command_line_args.h +++ b/src/daemon/command_line_args.h @@ -121,6 +121,10 @@ namespace daemon_args return val; } }; + const command_line::arg_descriptor> arg_zmq_pub = { + "zmq-pub" + , "Address for ZMQ pub - tcp://ip:port or ipc://path" + }; const command_line::arg_descriptor arg_zmq_rpc_disabled = { "no-zmq" diff --git a/src/daemon/daemon.cpp b/src/daemon/daemon.cpp index 96db8712b..99430b2b0 100644 --- a/src/daemon/daemon.cpp +++ b/src/daemon/daemon.cpp @@ -34,10 +34,12 @@ #include "misc_log_ex.h" #include "daemon/daemon.h" #include "rpc/daemon_handler.h" +#include "rpc/zmq_pub.h" #include "rpc/zmq_server.h" #include "common/password.h" #include "common/util.h" +#include "cryptonote_basic/events.h" #include "daemon/core.h" #include "daemon/p2p.h" #include "daemon/protocol.h" @@ -56,6 +58,17 @@ using namespace epee; namespace daemonize { +struct zmq_internals +{ + explicit zmq_internals(t_core& core, t_p2p& p2p) + : rpc_handler{core.get(), p2p.get()} + , server{rpc_handler} + {} + + cryptonote::rpc::DaemonHandler rpc_handler; + cryptonote::rpc::ZmqServer server; +}; + struct t_internals { private: t_protocol protocol; @@ -63,6 +76,7 @@ public: t_core core; t_p2p p2p; std::vector> rpcs; + std::unique_ptr zmq; t_internals( boost::program_options::variables_map const & vm @@ -70,6 +84,7 @@ public: : core{vm} , protocol{vm, core, command_line::get_arg(vm, cryptonote::arg_offline)} , p2p{vm, protocol} + , zmq{nullptr} { // Handle circular dependencies protocol.set_p2p_endpoint(p2p.get()); @@ -86,6 +101,28 @@ public: auto restricted_rpc_port = command_line::get_arg(vm, restricted_rpc_port_arg); rpcs.emplace_back(new t_rpc{vm, core, p2p, true, restricted_rpc_port, "restricted", true}); } + + if (!command_line::get_arg(vm, daemon_args::arg_zmq_rpc_disabled)) + { + zmq.reset(new zmq_internals{core, p2p}); + + const std::string zmq_port = command_line::get_arg(vm, daemon_args::arg_zmq_rpc_bind_port); + const std::string zmq_address = command_line::get_arg(vm, daemon_args::arg_zmq_rpc_bind_ip); + + if (!zmq->server.init_rpc(zmq_address, zmq_port)) + throw std::runtime_error{"Failed to add TCP socket(" + zmq_address + ":" + zmq_port + ") to ZMQ RPC Server"}; + + std::shared_ptr shared; + const std::vector zmq_pub = command_line::get_arg(vm, daemon_args::arg_zmq_pub); + if (!zmq_pub.empty() && !(shared = zmq->server.init_pub(epee::to_span(zmq_pub)))) + throw std::runtime_error{"Failed to initialize zmq_pub"}; + + if (shared) + { + core.get().get_blockchain_storage().add_block_notify(cryptonote::listener::zmq_pub::chain_main{shared}); + core.get().set_txpool_listener(cryptonote::listener::zmq_pub::txpool_add{shared}); + } + } } }; @@ -103,9 +140,6 @@ t_daemon::t_daemon( : mp_internals{new t_internals{vm}}, public_rpc_port(public_rpc_port) { - zmq_rpc_bind_port = command_line::get_arg(vm, daemon_args::arg_zmq_rpc_bind_port); - zmq_rpc_bind_address = command_line::get_arg(vm, daemon_args::arg_zmq_rpc_bind_ip); - zmq_rpc_disabled = command_line::get_arg(vm, daemon_args::arg_zmq_rpc_disabled); } t_daemon::~t_daemon() = default; @@ -169,31 +203,8 @@ bool t_daemon::run(bool interactive) rpc_commands->start_handling(std::bind(&daemonize::t_daemon::stop_p2p, this)); } - cryptonote::rpc::DaemonHandler rpc_daemon_handler(mp_internals->core.get(), mp_internals->p2p.get()); - cryptonote::rpc::ZmqServer zmq_server(rpc_daemon_handler); - - if (!zmq_rpc_disabled) - { - if (!zmq_server.addTCPSocket(zmq_rpc_bind_address, zmq_rpc_bind_port)) - { - LOG_ERROR(std::string("Failed to add TCP Socket (") + zmq_rpc_bind_address - + ":" + zmq_rpc_bind_port + ") to ZMQ RPC Server"); - - if (rpc_commands) - rpc_commands->stop_handling(); - - for(auto& rpc : mp_internals->rpcs) - rpc->stop(); - - return false; - } - - MINFO("Starting ZMQ server..."); - zmq_server.run(); - - MINFO(std::string("ZMQ server started at ") + zmq_rpc_bind_address - + ":" + zmq_rpc_bind_port + "."); - } + if (mp_internals->zmq) + mp_internals->zmq->server.run(); else MINFO("ZMQ server disabled"); @@ -208,8 +219,8 @@ bool t_daemon::run(bool interactive) if (rpc_commands) rpc_commands->stop_handling(); - if (!zmq_rpc_disabled) - zmq_server.stop(); + if (mp_internals->zmq) + mp_internals->zmq->server.stop(); for(auto& rpc : mp_internals->rpcs) rpc->stop(); diff --git a/src/daemon/daemon.h b/src/daemon/daemon.h index bb7fdfebd..2eb2019ce 100644 --- a/src/daemon/daemon.h +++ b/src/daemon/daemon.h @@ -44,9 +44,6 @@ private: private: std::unique_ptr mp_internals; uint16_t public_rpc_port; - std::string zmq_rpc_bind_address; - std::string zmq_rpc_bind_port; - bool zmq_rpc_disabled; public: t_daemon( boost::program_options::variables_map const & vm, diff --git a/src/daemon/main.cpp b/src/daemon/main.cpp index dfc35470e..f2ae6dcc3 100644 --- a/src/daemon/main.cpp +++ b/src/daemon/main.cpp @@ -154,6 +154,7 @@ int main(int argc, char const * argv[]) command_line::add_arg(core_settings, daemon_args::arg_public_node); command_line::add_arg(core_settings, daemon_args::arg_zmq_rpc_bind_ip); command_line::add_arg(core_settings, daemon_args::arg_zmq_rpc_bind_port); + command_line::add_arg(core_settings, daemon_args::arg_zmq_pub); command_line::add_arg(core_settings, daemon_args::arg_zmq_rpc_disabled); daemonizer::init_options(hidden_options, visible_options); diff --git a/src/net/zmq.cpp b/src/net/zmq.cpp index 1a0edb4b9..15560ca7e 100644 --- a/src/net/zmq.cpp +++ b/src/net/zmq.cpp @@ -158,20 +158,6 @@ namespace zmq return unsigned(max_out) < added ? max_out : int(added); } }; - - template - expect retry_op(F op, T&&... args) noexcept(noexcept(op(args...))) - { - for (;;) - { - if (0 <= op(args...)) - return success(); - - const int error = zmq_errno(); - if (error != EINTR) - return make_error_code(error); - } - } } // anonymous expect receive(void* const socket, const int flags) diff --git a/src/net/zmq.h b/src/net/zmq.h index 8c587ed7c..fa4ef2fc9 100644 --- a/src/net/zmq.h +++ b/src/net/zmq.h @@ -26,6 +26,8 @@ // STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF // THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +#pragma once + #include #include #include @@ -105,6 +107,26 @@ namespace zmq //! Unique ZMQ socket handle, calls `zmq_close` on destruction. using socket = std::unique_ptr; + /*! Retry a ZMQ function on `EINTR` errors. `F` must return an int with + values less than 0 on error. + + \param op The ZMQ function to execute + retry + \param args Forwarded to `op`. Must be resuable in case of retry. + \return All errors except for `EINTR`. */ + template + expect retry_op(F op, T&&... args) noexcept(noexcept(op(args...))) + { + for (;;) + { + if (0 <= op(args...)) + return success(); + + const int error = zmq_errno(); + if (error != EINTR) + return make_error_code(error); + } + } + /*! Read all parts of the next message on `socket`. Blocks until the entire next message (all parts) are read, or until `zmq_term` is called on the `zmq_context` associated with `socket`. If the context is terminated, diff --git a/src/rpc/CMakeLists.txt b/src/rpc/CMakeLists.txt index 35195bd98..19298c969 100644 --- a/src/rpc/CMakeLists.txt +++ b/src/rpc/CMakeLists.txt @@ -45,8 +45,11 @@ set(daemon_messages_sources message.cpp daemon_messages.cpp) +set(rpc_pub_sources zmq_pub.cpp) + set(daemon_rpc_server_sources daemon_handler.cpp + zmq_pub.cpp zmq_server.cpp) @@ -59,8 +62,9 @@ set(rpc_headers rpc_version_str.h rpc_handler.h) -set(daemon_rpc_server_headers) +set(rpc_pub_headers zmq_pub.h) +set(daemon_rpc_server_headers) set(rpc_daemon_private_headers bootstrap_daemon.h @@ -83,6 +87,8 @@ set(daemon_rpc_server_private_headers monero_private_headers(rpc ${rpc_private_headers}) +set(rpc_pub_private_headers) + monero_private_headers(daemon_rpc_server ${daemon_rpc_server_private_headers}) @@ -97,6 +103,11 @@ monero_add_library(rpc ${rpc_headers} ${rpc_private_headers}) +monero_add_library(rpc_pub + ${rpc_pub_sources} + ${rpc_pub_headers} + ${rpc_pub_private_headers}) + monero_add_library(daemon_messages ${daemon_messages_sources} ${daemon_messages_headers} @@ -131,6 +142,14 @@ target_link_libraries(rpc PRIVATE ${EXTRA_LIBRARIES}) +target_link_libraries(rpc_pub + PUBLIC + epee + net + cryptonote_basic + serialization + ${Boost_THREAD_LIBRARY}) + target_link_libraries(daemon_messages LINK_PRIVATE cryptonote_core @@ -142,6 +161,7 @@ target_link_libraries(daemon_messages target_link_libraries(daemon_rpc_server LINK_PRIVATE rpc + rpc_pub cryptonote_core cryptonote_protocol version diff --git a/src/rpc/fwd.h b/src/rpc/fwd.h new file mode 100644 index 000000000..72537f5a5 --- /dev/null +++ b/src/rpc/fwd.h @@ -0,0 +1,37 @@ +// Copyright (c) 2019-2020, The Monero Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +namespace cryptonote +{ + namespace listener + { + class zmq_pub; + } +} diff --git a/src/rpc/zmq_pub.cpp b/src/rpc/zmq_pub.cpp new file mode 100644 index 000000000..0dffffac6 --- /dev/null +++ b/src/rpc/zmq_pub.cpp @@ -0,0 +1,478 @@ +// Copyright (c) 2020, The Monero Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#include "zmq_pub.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/expect.h" +#include "crypto/crypto.h" +#include "cryptonote_basic/cryptonote_format_utils.h" +#include "cryptonote_basic/events.h" +#include "misc_log_ex.h" +#include "serialization/json_object.h" + +#undef MONERO_DEFAULT_LOG_CATEGORY +#define MONERO_DEFAULT_LOG_CATEGORY "net.zmq" + +namespace +{ + constexpr const char txpool_signal[] = "tx_signal"; + + using chain_writer = void(epee::byte_stream&, std::uint64_t, epee::span); + using txpool_writer = void(epee::byte_stream&, epee::span); + + template + struct context + { + char const* const name; + F* generate_pub; + }; + + template + bool operator<(const context& lhs, const context& rhs) noexcept + { + return std::strcmp(lhs.name, rhs.name) < 0; + } + + template + bool operator<(const context& lhs, const boost::string_ref rhs) noexcept + { + return lhs.name < rhs; + } + + struct is_valid + { + bool operator()(const cryptonote::txpool_event& event) const noexcept + { + return event.res; + } + }; + + template + void verify_sorted(const std::array, N>& elems, const char* name) + { + auto unsorted = std::is_sorted_until(elems.begin(), elems.end()); + if (unsorted != elems.end()) + throw std::logic_error{name + std::string{" array is not properly sorted, see: "} + unsorted->name}; + } + + void write_header(epee::byte_stream& buf, const boost::string_ref name) + { + buf.write(name.data(), name.size()); + buf.put(':'); + } + + //! \return `name:...` where `...` is JSON and `name` is directly copied (no quotes - not JSON). + template + void json_pub(epee::byte_stream& buf, const T value) + { + rapidjson::Writer dest{buf}; + using cryptonote::json::toJsonValue; + toJsonValue(dest, value); + } + + //! Object for "minimal" block serialization + struct minimal_chain + { + const std::uint64_t height; + const epee::span blocks; + }; + + //! Object for "minimal" tx serialization + struct minimal_txpool + { + const cryptonote::transaction& tx; + }; + + void toJsonValue(rapidjson::Writer& dest, const minimal_chain self) + { + namespace adapt = boost::adaptors; + + const auto to_block_id = [](const cryptonote::block& bl) + { + crypto::hash id; + if (!get_block_hash(bl, id)) + MERROR("ZMQ/Pub failure: get_block_hash"); + return id; + }; + + assert(!self.blocks.empty()); // checked in zmq_pub::send_chain_main + + dest.StartObject(); + INSERT_INTO_JSON_OBJECT(dest, first_height, self.height); + INSERT_INTO_JSON_OBJECT(dest, first_prev_id, self.blocks[0].prev_id); + INSERT_INTO_JSON_OBJECT(dest, ids, (self.blocks | adapt::transformed(to_block_id))); + dest.EndObject(); + } + + void toJsonValue(rapidjson::Writer& dest, const minimal_txpool self) + { + crypto::hash id{}; + std::size_t blob_size = 0; + if (!get_transaction_hash(self.tx, id, blob_size)) + { + MERROR("ZMQ/Pub failure: get_transaction_hash"); + return; + } + + dest.StartObject(); + INSERT_INTO_JSON_OBJECT(dest, id, id); + INSERT_INTO_JSON_OBJECT(dest, blob_size, blob_size); + dest.EndObject(); + } + + void json_full_chain(epee::byte_stream& buf, const std::uint64_t height, const epee::span blocks) + { + json_pub(buf, blocks); + } + + void json_minimal_chain(epee::byte_stream& buf, const std::uint64_t height, const epee::span blocks) + { + json_pub(buf, minimal_chain{height, blocks}); + } + + // boost::adaptors are in place "views" - no copy/move takes place + // moving transactions (via sort, etc.), is expensive! + + void json_full_txpool(epee::byte_stream& buf, epee::span txes) + { + namespace adapt = boost::adaptors; + const auto to_full_tx = [](const cryptonote::txpool_event& event) + { + return event.tx; + }; + json_pub(buf, (txes | adapt::filtered(is_valid{}) | adapt::transformed(to_full_tx))); + } + + void json_minimal_txpool(epee::byte_stream& buf, epee::span txes) + { + namespace adapt = boost::adaptors; + const auto to_minimal_tx = [](const cryptonote::txpool_event& event) + { + return minimal_txpool{event.tx}; + }; + json_pub(buf, (txes | adapt::filtered(is_valid{}) | adapt::transformed(to_minimal_tx))); + } + + constexpr const std::array, 2> chain_contexts = + {{ + {u8"json-full-chain_main", json_full_chain}, + {u8"json-minimal-chain_main", json_minimal_chain} + }}; + + constexpr const std::array, 2> txpool_contexts = + {{ + {u8"json-full-txpool_add", json_full_txpool}, + {u8"json-minimal-txpool_add", json_minimal_txpool} + }}; + + template + epee::span> get_range(const std::array, N>& contexts, const boost::string_ref value) + { + const auto not_prefix = [](const boost::string_ref lhs, const context& rhs) + { + return !(boost::string_ref{rhs.name}.starts_with(lhs)); + }; + + const auto lower = std::lower_bound(contexts.begin(), contexts.end(), value); + const auto upper = std::upper_bound(lower, contexts.end(), value, not_prefix); + return {lower, std::size_t(upper - lower)}; + } + + template + void add_subscriptions(std::array& subs, const epee::span> range, context const* const first) + { + assert(range.size() <= N); + assert(range.begin() - first <= N - range.size()); + + for (const auto& ctx : range) + { + const std::size_t i = std::addressof(ctx) - first; + subs[i] = std::min(std::numeric_limits::max() - 1, subs[i]) + 1; + } + } + + template + void remove_subscriptions(std::array& subs, const epee::span> range, context const* const first) + { + assert(range.size() <= N); + assert(range.begin() - first <= N - range.size()); + + for (const auto& ctx : range) + { + const std::size_t i = std::addressof(ctx) - first; + subs[i] = std::max(std::size_t(1), subs[i]) - 1; + } + } + + template + std::array make_pubs(const std::array& subs, const std::array, N>& contexts, U&&... args) + { + epee::byte_stream buf{}; + + std::size_t last_offset = 0; + std::array offsets{{}}; + for (std::size_t i = 0; i < N; ++i) + { + if (subs[i]) + { + write_header(buf, contexts[i].name); + contexts[i].generate_pub(buf, std::forward(args)...); + offsets[i] = buf.size() - last_offset; + last_offset = buf.size(); + } + } + + epee::byte_slice bytes{std::move(buf)}; + std::array out; + for (std::size_t i = 0; i < N; ++i) + out[i] = bytes.take_slice(offsets[i]); + + return out; + } + + template + std::size_t send_messages(void* const socket, std::array& messages) + { + std::size_t count = 0; + for (epee::byte_slice& message : messages) + { + if (!message.empty()) + { + const expect sent = net::zmq::send(std::move(message), socket, ZMQ_DONTWAIT); + if (!sent) + MERROR("Failed to send ZMQ/Pub message: " << sent.error().message()); + else + ++count; + } + } + return count; + } + + expect relay_block_pub(void* const relay, void* const pub) noexcept + { + zmq_msg_t msg; + zmq_msg_init(std::addressof(msg)); + MONERO_CHECK(net::zmq::retry_op(zmq_msg_recv, std::addressof(msg), relay, ZMQ_DONTWAIT)); + + const boost::string_ref payload{ + reinterpret_cast(zmq_msg_data(std::addressof(msg))), + zmq_msg_size(std::addressof(msg)) + }; + + if (payload == txpool_signal) + { + zmq_msg_close(std::addressof(msg)); + return false; + } + + // forward block messages (serialized on P2P thread for now) + const expect sent = net::zmq::retry_op(zmq_msg_send, std::addressof(msg), pub, ZMQ_DONTWAIT); + if (!sent) + { + zmq_msg_close(std::addressof(msg)); + return sent.error(); + } + return true; + } +} // anonymous + +namespace cryptonote { namespace listener +{ + +zmq_pub::zmq_pub(void* context) + : relay_(), + chain_subs_{{0}}, + txpool_subs_{{0}}, + sync_() +{ + if (!context) + throw std::logic_error{"ZMQ context cannot be NULL"}; + + verify_sorted(chain_contexts, "chain_contexts"); + verify_sorted(txpool_contexts, "txpool_contexts"); + + relay_.reset(zmq_socket(context, ZMQ_PAIR)); + if (!relay_) + MONERO_ZMQ_THROW("Failed to create relay socket"); + if (zmq_connect(relay_.get(), relay_endpoint()) != 0) + MONERO_ZMQ_THROW("Failed to connect relay socket"); +} + +zmq_pub::~zmq_pub() +{} + +bool zmq_pub::sub_request(boost::string_ref message) +{ + if (!message.empty()) + { + const char tag = message[0]; + message.remove_prefix(1); + + const auto chain_range = get_range(chain_contexts, message); + const auto txpool_range = get_range(txpool_contexts, message); + + if (!chain_range.empty() || !txpool_range.empty()) + { + MDEBUG("Client " << (tag ? "subscribed" : "unsubscribed") << " to " << + chain_range.size() << " chain topic(s) and " << txpool_range.size() << " txpool topic(s)"); + + const boost::lock_guard lock{sync_}; + switch (tag) + { + case 0: + remove_subscriptions(chain_subs_, chain_range, chain_contexts.begin()); + remove_subscriptions(txpool_subs_, txpool_range, txpool_contexts.begin()); + return true; + case 1: + add_subscriptions(chain_subs_, chain_range, chain_contexts.begin()); + add_subscriptions(txpool_subs_, txpool_range, txpool_contexts.begin()); + return true; + default: + break; + } + } + } + MERROR("Invalid ZMQ/Sub message"); + return false; +} + +bool zmq_pub::relay_to_pub(void* const relay, void* const pub) +{ + const expect relayed = relay_block_pub(relay, pub); + if (!relayed) + { + MERROR("Error relaying ZMQ/Pub: " << relayed.error().message()); + return false; + } + + if (!*relayed) + { + std::array subs; + std::vector events; + { + const boost::lock_guard lock{sync_}; + if (txes_.empty()) + return false; + + subs = txpool_subs_; + events = std::move(txes_.front()); + txes_.pop_front(); + } + auto messages = make_pubs(subs, txpool_contexts, epee::to_span(events)); + send_messages(pub, messages); + MDEBUG("Sent txpool ZMQ/Pub"); + } + else + MDEBUG("Sent chain_main ZMQ/Pub"); + + return true; +} + +std::size_t zmq_pub::send_chain_main(const std::uint64_t height, const epee::span blocks) +{ + if (blocks.empty()) + return 0; + + /* Block format only sends one block at a time - multiple block notifications + are less common and only occur on rollbacks. */ + + boost::unique_lock guard{sync_}; + + const auto subs_copy = chain_subs_; + guard.unlock(); + + for (const std::size_t sub : subs_copy) + { + if (sub) + { + /* cryptonote_core/blockchain.cpp cannot "give" us the block like core + does for txpool events. Since copying the block is expensive anyway, + serialization is done right here on the p2p thread (for now). */ + + auto messages = make_pubs(subs_copy, chain_contexts, height, blocks); + guard.lock(); + return send_messages(relay_.get(), messages); + } + } + return 0; +} + +std::size_t zmq_pub::send_txpool_add(std::vector txes) +{ + if (txes.empty()) + return 0; + + const boost::lock_guard lock{sync_}; + for (const std::size_t sub : txpool_subs_) + { + if (sub) + { + const expect sent = net::zmq::retry_op(zmq_send_const, relay_.get(), txpool_signal, sizeof(txpool_signal) - 1, ZMQ_DONTWAIT); + if (sent) + txes_.emplace_back(std::move(txes)); + else + MERROR("ZMQ/Pub failure, relay queue error: " << sent.error().message()); + return bool(sent); + } + } + return 0; +} + +void zmq_pub::chain_main::operator()(const std::uint64_t height, epee::span blocks) const +{ + const std::shared_ptr self = self_.lock(); + if (self) + self->send_chain_main(height, blocks); + else + MERROR("Unable to send ZMQ/Pub - ZMQ server destroyed"); +} + +void zmq_pub::txpool_add::operator()(std::vector txes) const +{ + const std::shared_ptr self = self_.lock(); + if (self) + self->send_txpool_add(std::move(txes)); + else + MERROR("Unable to send ZMQ/Pub - ZMQ server destroyed"); +} + +}} diff --git a/src/rpc/zmq_pub.h b/src/rpc/zmq_pub.h new file mode 100644 index 000000000..02e6b8103 --- /dev/null +++ b/src/rpc/zmq_pub.h @@ -0,0 +1,110 @@ +// Copyright (c) 2020, The Monero Project +// +// All rights reserved. + // +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "cryptonote_basic/fwd.h" +#include "net/zmq.h" +#include "span.h" + +namespace cryptonote { namespace listener +{ +/*! \brief Sends ZMQ PUB messages on cryptonote events + + Clients must ensure that all transaction(s) are notified before any blocks + they are contained in, and must ensure that each block is notified in chain + order. An external lock **must** be held by clients during the entire + txpool check and notification sequence and (a possibly second) lock is held + during the entire block check and notification sequence. Otherwise, events + could be sent in a different order than processed. */ +class zmq_pub +{ + /* Each socket has its own internal queue. So we can only use one socket, else + the messages being published are not guaranteed to be in the same order + pushed. */ + + net::zmq::socket relay_; + std::deque> txes_; + std::array chain_subs_; + std::array txpool_subs_; + boost::mutex sync_; //!< Synchronizes counts in `*_subs_` arrays. + + public: + //! \return Name of ZMQ_PAIR endpoint for pub notifications + static constexpr const char* relay_endpoint() noexcept { return "inproc://pub_relay"; } + + explicit zmq_pub(void* context); + + zmq_pub(const zmq_pub&) = delete; + zmq_pub(zmq_pub&&) = delete; + + ~zmq_pub(); + + zmq_pub& operator=(const zmq_pub&) = delete; + zmq_pub& operator=(zmq_pub&&) = delete; + + //! Process a client subscription request (from XPUB sockets). Thread-safe. + bool sub_request(const boost::string_ref message); + + /*! Forward ZMQ messages sent to `relay` via `send_chain_main` or + `send_txpool_add` to `pub`. Used by `ZmqServer`. */ + bool relay_to_pub(void* relay, void* pub); + + /*! Send a `ZMQ_PUB` notification for a change to the main chain. + Thread-safe. + \return Number of ZMQ messages sent to relay. */ + std::size_t send_chain_main(std::uint64_t height, epee::span blocks); + + /*! Send a `ZMQ_PUB` notification for new tx(es) being added to the local + pool. Thread-safe. + \return Number of ZMQ messages sent to relay. */ + std::size_t send_txpool_add(std::vector txes); + + //! Callable for `send_chain_main` with weak ownership to `zmq_pub` object. + struct chain_main + { + std::weak_ptr self_; + void operator()(std::uint64_t height, epee::span blocks) const; + }; + + //! Callable for `send_txpool_add` with weak ownership to `zmq_pub` object. + struct txpool_add + { + std::weak_ptr self_; + void operator()(std::vector txes) const; + }; + }; +}} diff --git a/src/rpc/zmq_server.cpp b/src/rpc/zmq_server.cpp index 1a9f49c01..6105b7f3a 100644 --- a/src/rpc/zmq_server.cpp +++ b/src/rpc/zmq_server.cpp @@ -29,10 +29,16 @@ #include "zmq_server.h" #include -#include +#include +#include +#include #include #include "byte_slice.h" +#include "rpc/zmq_pub.h" + +#undef MONERO_DEFAULT_LOG_CATEGORY +#define MONERO_DEFAULT_LOG_CATEGORY "net.zmq" namespace cryptonote { @@ -42,14 +48,57 @@ namespace constexpr const int num_zmq_threads = 1; constexpr const std::int64_t max_message_size = 10 * 1024 * 1024; // 10 MiB constexpr const std::chrono::seconds linger_timeout{2}; // wait period for pending out messages -} + + net::zmq::socket init_socket(void* context, int type, epee::span addresses) + { + if (context == nullptr) + throw std::logic_error{"NULL context provided"}; + + net::zmq::socket out{}; + out.reset(zmq_socket(context, type)); + if (!out) + { + MONERO_LOG_ZMQ_ERROR("Failed to create ZMQ socket"); + return nullptr; + } + + if (zmq_setsockopt(out.get(), ZMQ_MAXMSGSIZE, std::addressof(max_message_size), sizeof(max_message_size)) != 0) + { + MONERO_LOG_ZMQ_ERROR("Failed to set maximum incoming message size"); + return nullptr; + } + + static constexpr const int linger_value = std::chrono::milliseconds{linger_timeout}.count(); + if (zmq_setsockopt(out.get(), ZMQ_LINGER, std::addressof(linger_value), sizeof(linger_value)) != 0) + { + MONERO_LOG_ZMQ_ERROR("Failed to set linger timeout"); + return nullptr; + } + + for (const std::string& address : addresses) + { + if (zmq_bind(out.get(), address.c_str()) < 0) + { + MONERO_LOG_ZMQ_ERROR("ZMQ bind failed"); + return nullptr; + } + MINFO("ZMQ now listening at " << address); + } + + return out; + } +} // anonymous namespace rpc { ZmqServer::ZmqServer(RpcHandler& h) : handler(h), - context(zmq_init(num_zmq_threads)) + context(zmq_init(num_zmq_threads)), + rep_socket(nullptr), + pub_socket(nullptr), + relay_socket(nullptr), + shared_state(nullptr) { if (!context) MONERO_ZMQ_THROW("Unable to create ZMQ context"); @@ -64,22 +113,59 @@ void ZmqServer::serve() try { // socket must close before `zmq_term` will exit. - const net::zmq::socket socket = std::move(rep_socket); - if (!socket) + const net::zmq::socket rep = std::move(rep_socket); + const net::zmq::socket pub = std::move(pub_socket); + const net::zmq::socket relay = std::move(relay_socket); + const std::shared_ptr state = std::move(shared_state); + + const unsigned init_count = unsigned(bool(pub)) + bool(relay) + bool(state); + if (!rep || (init_count && init_count != 3)) { - MERROR("ZMQ RPC server reply socket is null"); + MERROR("ZMQ RPC server socket is null"); return; } + MINFO("ZMQ Server started"); + + const int read_flags = pub ? ZMQ_DONTWAIT : 0; + std::array sockets = + {{ + {relay.get(), 0, ZMQ_POLLIN, 0}, + {pub.get(), 0, ZMQ_POLLIN, 0}, + {rep.get(), 0, ZMQ_POLLIN, 0} + }}; + + /* This uses XPUB to watch for subscribers, to reduce CPU cycles for + serialization when the data will be dropped. This is important for block + serialization, which is done on the p2p threads currently (see + zmq_pub.cpp). + + XPUB sockets are not thread-safe, so the p2p thread cannot write into + the socket while we read here for subscribers. A ZMQ_PAIR socket is + used for inproc notification. No data is every copied to kernel, it is + all userspace messaging. */ + while (1) { - const std::string message = MONERO_UNWRAP(net::zmq::receive(socket.get())); - MDEBUG("Received RPC request: \"" << message << "\""); - epee::byte_slice response = handler.handle(message); + if (pub) + MONERO_UNWRAP(net::zmq::retry_op(zmq_poll, sockets.data(), sockets.size(), -1)); - const boost::string_ref response_view{reinterpret_cast(response.data()), response.size()}; - MDEBUG("Sending RPC reply: \"" << response_view << "\""); - MONERO_UNWRAP(net::zmq::send(std::move(response), socket.get())); + if (sockets[0].revents) + state->relay_to_pub(relay.get(), pub.get()); + + if (sockets[1].revents) + state->sub_request(MONERO_UNWRAP(net::zmq::receive(pub.get(), ZMQ_DONTWAIT))); + + if (!pub || sockets[2].revents) + { + const std::string message = MONERO_UNWRAP(net::zmq::receive(rep.get(), read_flags)); + MDEBUG("Received RPC request: \"" << message << "\""); + epee::byte_slice response = handler.handle(message); + + const boost::string_ref response_view{reinterpret_cast(response.data()), response.size()}; + MDEBUG("Sending RPC reply: \"" << response_view << "\""); + MONERO_UNWRAP(net::zmq::send(std::move(response), rep.get())); + } } } catch (const std::system_error& e) @@ -97,38 +183,12 @@ void ZmqServer::serve() } } -bool ZmqServer::addIPCSocket(const boost::string_ref address, const boost::string_ref port) -{ - MERROR("ZmqServer::addIPCSocket not yet implemented!"); - return false; -} - -bool ZmqServer::addTCPSocket(boost::string_ref address, boost::string_ref port) +void* ZmqServer::init_rpc(boost::string_ref address, boost::string_ref port) { if (!context) { MERROR("ZMQ RPC Server already shutdown"); - return false; - } - - rep_socket.reset(zmq_socket(context.get(), ZMQ_REP)); - if (!rep_socket) - { - MONERO_LOG_ZMQ_ERROR("ZMQ RPC Server socket create failed"); - return false; - } - - if (zmq_setsockopt(rep_socket.get(), ZMQ_MAXMSGSIZE, std::addressof(max_message_size), sizeof(max_message_size)) != 0) - { - MONERO_LOG_ZMQ_ERROR("Failed to set maximum incoming message size"); - return false; - } - - static constexpr const int linger_value = std::chrono::milliseconds{linger_timeout}.count(); - if (zmq_setsockopt(rep_socket.get(), ZMQ_LINGER, std::addressof(linger_value), sizeof(linger_value)) != 0) - { - MONERO_LOG_ZMQ_ERROR("Failed to set linger timeout"); - return false; + return nullptr; } if (address.empty()) @@ -141,12 +201,34 @@ bool ZmqServer::addTCPSocket(boost::string_ref address, boost::string_ref port) bind_address += ":"; bind_address.append(port.data(), port.size()); - if (zmq_bind(rep_socket.get(), bind_address.c_str()) < 0) + rep_socket = init_socket(context.get(), ZMQ_REP, {std::addressof(bind_address), 1}); + return bool(rep_socket) ? context.get() : nullptr; +} + +std::shared_ptr ZmqServer::init_pub(epee::span addresses) +{ + try + { + shared_state = std::make_shared(context.get()); + pub_socket = init_socket(context.get(), ZMQ_XPUB, addresses); + if (!pub_socket) + throw std::runtime_error{"Unable to initialize ZMQ_XPUB socket"}; + + const std::string relay_address[] = {listener::zmq_pub::relay_endpoint()}; + relay_socket = init_socket(context.get(), ZMQ_PAIR, relay_address); + if (!relay_socket) + throw std::runtime_error{"Unable to initialize ZMQ_PAIR relay"}; + } + catch (const std::runtime_error& e) { - MONERO_LOG_ZMQ_ERROR("ZMQ RPC Server bind failed"); - return false; + shared_state = nullptr; + pub_socket = nullptr; + relay_socket = nullptr; + MERROR("Failed to create ZMQ/Pub listener: " << e.what()); + return nullptr; } - return true; + + return shared_state; } void ZmqServer::run() @@ -163,7 +245,6 @@ void ZmqServer::stop() run_thread.join(); } - } // namespace cryptonote } // namespace rpc diff --git a/src/rpc/zmq_server.h b/src/rpc/zmq_server.h index 1143db839..ddf44b411 100644 --- a/src/rpc/zmq_server.h +++ b/src/rpc/zmq_server.h @@ -30,10 +30,16 @@ #include #include +#include +#include +#include #include "common/command_line.h" +#include "cryptonote_basic/fwd.h" #include "net/zmq.h" -#include "rpc_handler.h" +#include "rpc/fwd.h" +#include "rpc/rpc_handler.h" +#include "span.h" namespace cryptonote { @@ -41,7 +47,7 @@ namespace cryptonote namespace rpc { -class ZmqServer +class ZmqServer final { public: @@ -49,12 +55,13 @@ class ZmqServer ~ZmqServer(); - static void init_options(boost::program_options::options_description& desc); - void serve(); - bool addIPCSocket(boost::string_ref address, boost::string_ref port); - bool addTCPSocket(boost::string_ref address, boost::string_ref port); + //! \return ZMQ context on success, `nullptr` on failure + void* init_rpc(boost::string_ref address, boost::string_ref port); + + //! \return `nullptr` on errors. + std::shared_ptr init_pub(epee::span addresses); void run(); void stop(); @@ -67,9 +74,11 @@ class ZmqServer boost::thread run_thread; net::zmq::socket rep_socket; + net::zmq::socket pub_socket; + net::zmq::socket relay_socket; + std::shared_ptr shared_state; }; - } // namespace cryptonote } // namespace rpc diff --git a/src/serialization/json_object.h b/src/serialization/json_object.h index 2a9b63b08..e016bef41 100644 --- a/src/serialization/json_object.h +++ b/src/serialization/json_object.h @@ -356,7 +356,7 @@ inline typename std::enable_if::value, void>::type t dest.StartArray(); for (const auto& t : vec) toJsonValue(dest, t); - dest.EndArray(vec.size()); + dest.EndArray(); } template diff --git a/tests/unit_tests/CMakeLists.txt b/tests/unit_tests/CMakeLists.txt index ef0477888..4f1b0c22a 100644 --- a/tests/unit_tests/CMakeLists.txt +++ b/tests/unit_tests/CMakeLists.txt @@ -109,6 +109,7 @@ target_link_libraries(unit_tests cryptonote_protocol cryptonote_core daemon_messages + daemon_rpc_server blockchain_db lmdb_lib rpc diff --git a/tests/unit_tests/json_serialization.cpp b/tests/unit_tests/json_serialization.cpp index 5873d0ab6..1db923f7b 100644 --- a/tests/unit_tests/json_serialization.cpp +++ b/tests/unit_tests/json_serialization.cpp @@ -15,7 +15,7 @@ #include "serialization/json_object.h" -namespace +namespace test { cryptonote::transaction make_miner_transaction(cryptonote::account_public_address const& to) @@ -82,7 +82,10 @@ namespace return tx; } +} +namespace +{ template T test_json(const T& value) { @@ -109,7 +112,7 @@ TEST(JsonSerialization, MinerTransaction) { cryptonote::account_base acct; acct.generate(); - const auto miner_tx = make_miner_transaction(acct.get_keys().m_account_address); + const auto miner_tx = test::make_miner_transaction(acct.get_keys().m_account_address); crypto::hash tx_hash{}; ASSERT_TRUE(cryptonote::get_transaction_hash(miner_tx, tx_hash)); @@ -137,8 +140,8 @@ TEST(JsonSerialization, RegularTransaction) cryptonote::account_base acct2; acct2.generate(); - const auto miner_tx = make_miner_transaction(acct1.get_keys().m_account_address); - const auto tx = make_transaction( + const auto miner_tx = test::make_miner_transaction(acct1.get_keys().m_account_address); + const auto tx = test::make_transaction( acct1.get_keys(), {miner_tx}, {acct2.get_keys().m_account_address}, false, false ); @@ -168,8 +171,8 @@ TEST(JsonSerialization, RingctTransaction) cryptonote::account_base acct2; acct2.generate(); - const auto miner_tx = make_miner_transaction(acct1.get_keys().m_account_address); - const auto tx = make_transaction( + const auto miner_tx = test::make_miner_transaction(acct1.get_keys().m_account_address); + const auto tx = test::make_transaction( acct1.get_keys(), {miner_tx}, {acct2.get_keys().m_account_address}, true, false ); @@ -199,8 +202,8 @@ TEST(JsonSerialization, BulletproofTransaction) cryptonote::account_base acct2; acct2.generate(); - const auto miner_tx = make_miner_transaction(acct1.get_keys().m_account_address); - const auto tx = make_transaction( + const auto miner_tx = test::make_miner_transaction(acct1.get_keys().m_account_address); + const auto tx = test::make_transaction( acct1.get_keys(), {miner_tx}, {acct2.get_keys().m_account_address}, true, true ); diff --git a/tests/unit_tests/json_serialization.h b/tests/unit_tests/json_serialization.h new file mode 100644 index 000000000..2d8267261 --- /dev/null +++ b/tests/unit_tests/json_serialization.h @@ -0,0 +1,42 @@ +// Copyright (c) 2020, The Monero Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +namespace test +{ + cryptonote::transaction make_miner_transaction(cryptonote::account_public_address const& to); + + cryptonote::transaction + make_transaction( + cryptonote::account_keys const& from, + std::vector const& sources, + std::vector const& destinations, + bool rct, + bool bulletproof); +} diff --git a/tests/unit_tests/zmq_rpc.cpp b/tests/unit_tests/zmq_rpc.cpp index af1f1608b..1d065fc45 100644 --- a/tests/unit_tests/zmq_rpc.cpp +++ b/tests/unit_tests/zmq_rpc.cpp @@ -26,11 +26,25 @@ // STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF // THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +#include #include +#include +#include "cryptonote_basic/account.h" +#include "cryptonote_basic/cryptonote_basic.h" +#include "cryptonote_basic/events.h" +#include "cryptonote_basic/cryptonote_format_utils.h" +#include "json_serialization.h" +#include "net/zmq.h" #include "rpc/message.h" +#include "rpc/zmq_pub.h" +#include "rpc/zmq_server.h" #include "serialization/json_object.h" +#define MASSERT(...) \ + if (!(__VA_ARGS__)) \ + return testing::AssertionFailure() << BOOST_PP_STRINGIZE(__VA_ARGS__) + TEST(ZmqFullMessage, InvalidRequest) { EXPECT_THROW( @@ -53,3 +67,711 @@ TEST(ZmqFullMessage, Request) cryptonote::rpc::FullMessage parsed{request, true}; EXPECT_STREQ("foo", parsed.getRequestType().c_str()); } + +namespace +{ + using published_json = std::pair; + + constexpr const char inproc_pub[] = "inproc://dummy_pub"; + + net::zmq::socket create_socket(void* ctx, const char* address) + { + net::zmq::socket sock{zmq_socket(ctx, ZMQ_PAIR)}; + if (!sock) + MONERO_ZMQ_THROW("failed to create socket"); + if (zmq_bind(sock.get(), address) != 0) + MONERO_ZMQ_THROW("socket bind failure"); + return sock; + } + + std::vector get_messages(void* socket, int count = -1) + { + std::vector out; + for ( ; count || count < 0; --count) + { + expect next = net::zmq::receive(socket, (count < 0 ? ZMQ_DONTWAIT : 0)); + if (next == net::zmq::make_error_code(EAGAIN)) + return out; + out.push_back(std::move(*next)); + } + return out; + } + + std::vector get_published(void* socket, int count = -1) + { + std::vector out; + + const auto messages = get_messages(socket, count); + out.reserve(messages.size()); + + for (const std::string& message : messages) + { + const char* split = std::strchr(message.c_str(), ':'); + if (!split) + throw std::runtime_error{"Invalid ZMQ/Pub message"}; + + out.emplace_back(); + out.back().first = {message.c_str(), split}; + if (out.back().second.Parse(split + 1).HasParseError()) + throw std::runtime_error{"Failed to parse ZMQ/Pub message"}; + } + + return out; + } + + testing::AssertionResult compare_full_txpool(epee::span events, const published_json& pub) + { + MASSERT(pub.first == "json-full-txpool_add"); + MASSERT(pub.second.IsArray()); + MASSERT(pub.second.Size() <= events.size()); + + std::size_t i = 0; + for (const cryptonote::txpool_event& event : events) + { + MASSERT(i <= pub.second.Size()); + if (!event.res) + continue; + + cryptonote::transaction tx{}; + cryptonote::json::fromJsonValue(pub.second[i], tx); + + crypto::hash id{}; + MASSERT(cryptonote::get_transaction_hash(event.tx, id)); + MASSERT(cryptonote::get_transaction_hash(tx, id)); + MASSERT(event.tx.hash == tx.hash); + ++i; + } + return testing::AssertionSuccess(); + } + + testing::AssertionResult compare_minimal_txpool(epee::span events, const published_json& pub) + { + MASSERT(pub.first == "json-minimal-txpool_add"); + MASSERT(pub.second.IsArray()); + MASSERT(pub.second.Size() <= events.size()); + + std::size_t i = 0; + for (const cryptonote::txpool_event& event : events) + { + MASSERT(i <= pub.second.Size()); + if (!event.res) + continue; + + std::size_t actual_size = 0; + crypto::hash actual_id{}; + + MASSERT(pub.second[i].IsObject()); + GET_FROM_JSON_OBJECT(pub.second[i], actual_id, id); + GET_FROM_JSON_OBJECT(pub.second[i], actual_size, blob_size); + + std::size_t expected_size = 0; + crypto::hash expected_id{}; + MASSERT(cryptonote::get_transaction_hash(event.tx, expected_id, expected_size)); + MASSERT(expected_size == actual_size); + MASSERT(expected_id == actual_id); + ++i; + } + return testing::AssertionSuccess(); + } + + testing::AssertionResult compare_full_block(const epee::span expected, const published_json& pub) + { + MASSERT(pub.first == "json-full-chain_main"); + MASSERT(pub.second.IsArray()); + + std::vector actual; + cryptonote::json::fromJsonValue(pub.second, actual); + + MASSERT(expected.size() == actual.size()); + + for (std::size_t i = 0; i < expected.size(); ++i) + { + crypto::hash id; + MASSERT(cryptonote::get_block_hash(expected[i], id)); + MASSERT(cryptonote::get_block_hash(actual[i], id)); + MASSERT(expected[i].hash == actual[i].hash); + } + + return testing::AssertionSuccess(); + } + + testing::AssertionResult compare_minimal_block(std::size_t height, const epee::span expected, const published_json& pub) + { + MASSERT(pub.first == "json-minimal-chain_main"); + MASSERT(pub.second.IsObject()); + MASSERT(!expected.empty()); + + std::size_t actual_height = 0; + crypto::hash actual_id{}; + crypto::hash actual_prev_id{}; + std::vector actual_ids{}; + GET_FROM_JSON_OBJECT(pub.second, actual_height, first_height); + GET_FROM_JSON_OBJECT(pub.second, actual_prev_id, first_prev_id); + GET_FROM_JSON_OBJECT(pub.second, actual_ids, ids); + + MASSERT(height == actual_height); + MASSERT(expected[0].prev_id == actual_prev_id); + MASSERT(expected.size() == actual_ids.size()); + + for (std::size_t i = 0; i < expected.size(); ++i) + { + crypto::hash id; + MASSERT(cryptonote::get_block_hash(expected[i], id)); + MASSERT(id == actual_ids[i]); + } + + return testing::AssertionSuccess(); + } + + struct zmq_base : public testing::Test + { + cryptonote::account_base acct; + + zmq_base() + : testing::Test(), acct() + { + acct.generate(); + } + + cryptonote::transaction make_miner_transaction() + { + return test::make_miner_transaction(acct.get_keys().m_account_address); + } + + cryptonote::transaction make_transaction(const std::vector& destinations) + { + return test::make_transaction(acct.get_keys(), {make_miner_transaction()}, destinations, true, true); + } + + cryptonote::transaction make_transaction() + { + cryptonote::account_base temp_account; + temp_account.generate(); + return make_transaction({temp_account.get_keys().m_account_address}); + } + + cryptonote::block make_block() + { + cryptonote::block block{}; + block.major_version = 1; + block.minor_version = 3; + block.timestamp = 100; + block.prev_id = crypto::rand(); + block.nonce = 100; + block.miner_tx = make_miner_transaction(); + return block; + } + }; + + struct zmq_pub : public zmq_base + { + net::zmq::context ctx; + net::zmq::socket relay; + net::zmq::socket dummy_pub; + net::zmq::socket dummy_client; + std::shared_ptr pub; + + zmq_pub() + : zmq_base(), + ctx(zmq_init(1)), + relay(create_socket(ctx.get(), cryptonote::listener::zmq_pub::relay_endpoint())), + dummy_pub(create_socket(ctx.get(), inproc_pub)), + dummy_client(zmq_socket(ctx.get(), ZMQ_PAIR)), + pub(std::make_shared(ctx.get())) + { + if (!dummy_client) + MONERO_ZMQ_THROW("failed to create socket"); + if (zmq_connect(dummy_client.get(), inproc_pub) != 0) + MONERO_ZMQ_THROW("failed to connect to dummy pub"); + } + + virtual void TearDown() override final + { + EXPECT_EQ(0u, get_messages(relay.get()).size()); + EXPECT_EQ(0u, get_messages(dummy_client.get()).size()); + } + + template + bool sub_request(const char (&topic)[N]) + { + return pub->sub_request({topic, N - 1}); + } + }; + + struct dummy_handler final : cryptonote::rpc::RpcHandler + { + dummy_handler() + : cryptonote::rpc::RpcHandler() + {} + + virtual epee::byte_slice handle(const std::string& request) override final + { + throw std::logic_error{"not implemented"}; + } + }; + + struct zmq_server : public zmq_base + { + dummy_handler handler; + cryptonote::rpc::ZmqServer server; + std::shared_ptr pub; + net::zmq::socket sub; + + zmq_server() + : zmq_base(), + handler(), + server(handler), + pub(), + sub() + { + void* ctx = server.init_rpc({}, {}); + if (!ctx) + throw std::runtime_error{"init_rpc failure"}; + + const std::string endpoint = inproc_pub; + pub = server.init_pub({std::addressof(endpoint), 1}); + if (!pub) + throw std::runtime_error{"failed to initiaze zmq/pub"}; + + sub.reset(zmq_socket(ctx, ZMQ_SUB)); + if (!sub) + MONERO_ZMQ_THROW("failed to create socket"); + if (zmq_connect(sub.get(), inproc_pub) != 0) + MONERO_ZMQ_THROW("failed to connect to dummy pub"); + + server.run(); + } + + virtual void TearDown() override final + { + EXPECT_EQ(0u, get_messages(sub.get()).size()); + sub.reset(); + pub.reset(); + server.stop(); + } + + template + void subscribe(const char (&topic)[N]) + { + if (zmq_setsockopt(sub.get(), ZMQ_SUBSCRIBE, topic, N - 1) != 0) + MONERO_ZMQ_THROW("failed to subscribe"); + } + }; +} + +TEST_F(zmq_pub, InvalidContext) +{ + EXPECT_THROW(cryptonote::listener::zmq_pub{nullptr}, std::logic_error); +} + +TEST_F(zmq_pub, NoBlocking) +{ + EXPECT_FALSE(pub->relay_to_pub(relay.get(), dummy_pub.get())); +} + +TEST_F(zmq_pub, DefaultDrop) +{ + EXPECT_EQ(0u, pub->send_txpool_add({{make_transaction(), {}, true}})); + + const cryptonote::block bl = make_block(); + EXPECT_EQ(0u,pub->send_chain_main(5, {std::addressof(bl), 1})); + EXPECT_NO_THROW(cryptonote::listener::zmq_pub::chain_main{pub}(5, {std::addressof(bl), 1})); +} + +TEST_F(zmq_pub, JsonFullTxpool) +{ + static constexpr const char topic[] = "\1json-full-txpool_add"; + + ASSERT_TRUE(sub_request(topic)); + + std::vector events + { + {make_transaction(), {}, true}, {make_transaction(), {}, true} + }; + + EXPECT_NO_THROW(pub->send_txpool_add(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + auto pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front())); + + EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front())); + + events.at(0).res = false; + EXPECT_EQ(1u, pub->send_txpool_add(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front())); + + events.at(0).res = false; + EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front())); +} + +TEST_F(zmq_pub, JsonMinimalTxpool) +{ + static constexpr const char topic[] = "\1json-minimal-txpool_add"; + + ASSERT_TRUE(sub_request(topic)); + + std::vector events + { + {make_transaction(), {}, true}, {make_transaction(), {}, true} + }; + + EXPECT_NO_THROW(pub->send_txpool_add(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + auto pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.front())); + + EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.front())); + + events.at(0).res = false; + EXPECT_EQ(1u, pub->send_txpool_add(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.front())); + + events.at(0).res = false; + EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.front())); +} + +TEST_F(zmq_pub, JsonFullChain) +{ + static constexpr const char topic[] = "\1json-full-chain_main"; + + ASSERT_TRUE(sub_request(topic)); + + const std::array blocks{{make_block(), make_block()}}; + + EXPECT_EQ(1u, pub->send_chain_main(100, epee::to_span(blocks))); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + auto pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_full_block(epee::to_span(blocks), pubs.front())); + + EXPECT_NO_THROW(cryptonote::listener::zmq_pub::chain_main{pub}(533, epee::to_span(blocks))); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_full_block(epee::to_span(blocks), pubs.front())); +} + +TEST_F(zmq_pub, JsonMinimalChain) +{ + static constexpr const char topic[] = "\1json-minimal-chain_main"; + + ASSERT_TRUE(sub_request(topic)); + + const std::array blocks{{make_block(), make_block()}}; + + EXPECT_EQ(1u, pub->send_chain_main(100, epee::to_span(blocks))); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + auto pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_minimal_block(100, epee::to_span(blocks), pubs.front())); + + EXPECT_NO_THROW(cryptonote::listener::zmq_pub::chain_main{pub}(533, epee::to_span(blocks))); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_minimal_block(533, epee::to_span(blocks), pubs.front())); +} + +TEST_F(zmq_pub, JsonFullAll) +{ + static constexpr const char topic[] = "\1json-full"; + + ASSERT_TRUE(sub_request(topic)); + { + std::vector events + { + {make_transaction(), {}, true}, {make_transaction(), {}, true} + }; + + EXPECT_EQ(1u, pub->send_txpool_add(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + auto pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front())); + + EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front())); + + events.at(0).res = false; + EXPECT_NO_THROW(pub->send_txpool_add(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front())); + + events.at(0).res = false; + EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front())); + } + { + const std::array blocks{{make_block(), make_block()}}; + + EXPECT_EQ(1u, pub->send_chain_main(100, epee::to_span(blocks))); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + auto pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_full_block(epee::to_span(blocks), pubs.front())); + + EXPECT_NO_THROW(cryptonote::listener::zmq_pub::chain_main{pub}(533, epee::to_span(blocks))); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_full_block(epee::to_span(blocks), pubs.front())); + } +} + +TEST_F(zmq_pub, JsonMinimalAll) +{ + static constexpr const char topic[] = "\1json-minimal"; + + ASSERT_TRUE(sub_request(topic)); + + { + std::vector events + { + {make_transaction(), {}, true}, {make_transaction(), {}, true} + }; + + EXPECT_EQ(1u, pub->send_txpool_add(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + auto pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.front())); + + EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.front())); + + events.at(0).res = false; + EXPECT_NO_THROW(pub->send_txpool_add(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.front())); + + events.at(0).res = false; + EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.front())); + } + { + const std::array blocks{{make_block(), make_block()}}; + + EXPECT_EQ(1u, pub->send_chain_main(100, epee::to_span(blocks))); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + auto pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_minimal_block(100, epee::to_span(blocks), pubs.front())); + + EXPECT_NO_THROW(cryptonote::listener::zmq_pub::chain_main{pub}(533, epee::to_span(blocks))); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(1u, pubs.size()); + ASSERT_LE(1u, pubs.size()); + EXPECT_TRUE(compare_minimal_block(533, epee::to_span(blocks), pubs.front())); + } +} + +TEST_F(zmq_pub, JsonAll) +{ + static constexpr const char topic[] = "\1json"; + + ASSERT_TRUE(sub_request(topic)); + + { + std::vector events + { + {make_transaction(), {}, true}, {make_transaction(), {}, true} + }; + + EXPECT_EQ(1u, pub->send_txpool_add(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + auto pubs = get_published(dummy_client.get()); + EXPECT_EQ(2u, pubs.size()); + ASSERT_LE(2u, pubs.size()); + EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front())); + EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.back())); + + EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(2u, pubs.size()); + ASSERT_LE(2u, pubs.size()); + EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front())); + EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.back())); + + events.at(0).res = false; + EXPECT_EQ(1u, pub->send_txpool_add(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(2u, pubs.size()); + ASSERT_LE(2u, pubs.size()); + EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front())); + EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.back())); + + events.at(0).res = false; + EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events)); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(2u, pubs.size()); + ASSERT_LE(2u, pubs.size()); + EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front())); + EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.back())); + } + { + const std::array blocks{{make_block()}}; + + EXPECT_EQ(2u, pub->send_chain_main(100, epee::to_span(blocks))); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + auto pubs = get_published(dummy_client.get()); + EXPECT_EQ(2u, pubs.size()); + ASSERT_LE(2u, pubs.size()); + EXPECT_TRUE(compare_full_block(epee::to_span(blocks), pubs.front())); + EXPECT_TRUE(compare_minimal_block(100, epee::to_span(blocks), pubs.back())); + + EXPECT_NO_THROW(cryptonote::listener::zmq_pub::chain_main{pub}(533, epee::to_span(blocks))); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get())); + + pubs = get_published(dummy_client.get()); + EXPECT_EQ(2u, pubs.size()); + ASSERT_LE(2u, pubs.size()); + EXPECT_TRUE(compare_full_block(epee::to_span(blocks), pubs.front())); + EXPECT_TRUE(compare_minimal_block(533, epee::to_span(blocks), pubs.back())); + } +} + +TEST_F(zmq_pub, JsonChainWeakPtrSkip) +{ + static constexpr const char topic[] = "\1json"; + + ASSERT_TRUE(sub_request(topic)); + + const std::array blocks{{make_block()}}; + + pub.reset(); + EXPECT_NO_THROW(cryptonote::listener::zmq_pub::chain_main{pub}(533, epee::to_span(blocks))); +} + +TEST_F(zmq_pub, JsonTxpoolWeakPtrSkip) +{ + static constexpr const char topic[] = "\1json"; + + ASSERT_TRUE(sub_request(topic)); + + std::vector events + { + {make_transaction(), {}, true}, {make_transaction(), {}, true} + }; + + pub.reset(); + EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(std::move(events))); +} + +TEST_F(zmq_server, pub) +{ + subscribe("json-minimal"); + + std::vector events + { + {make_transaction(), {}, true}, {make_transaction(), {}, true} + }; + + const std::array blocks{{make_block()}}; + + ASSERT_EQ(1u, pub->send_txpool_add(events)); + ASSERT_EQ(1u, pub->send_chain_main(200, epee::to_span(blocks))); + + auto pubs = get_published(sub.get(), 2); + EXPECT_EQ(2u, pubs.size()); + ASSERT_LE(2u, pubs.size()); + EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.front())); + EXPECT_TRUE(compare_minimal_block(200, epee::to_span(blocks), pubs.back())); +}