diff --git a/tests/unit_tests/node_server.cpp b/tests/unit_tests/node_server.cpp index cab600b3d..aa4f3ac0c 100644 --- a/tests/unit_tests/node_server.cpp +++ b/tests/unit_tests/node_server.cpp @@ -35,6 +35,7 @@ #include "cryptonote_core/i_core_events.h" #include "cryptonote_protocol/cryptonote_protocol_handler.h" #include "cryptonote_protocol/cryptonote_protocol_handler.inl" +#include #define MAKE_IPV4_ADDRESS(a,b,c,d) epee::net_utils::ipv4_network_address{MAKE_IP(a,b,c,d),0} #define MAKE_IPV4_ADDRESS_PORT(a,b,c,d,e) epee::net_utils::ipv4_network_address{MAKE_IP(a,b,c,d),e} @@ -906,5 +907,264 @@ TEST(cryptonote_protocol_handler, race_condition) remove_tree(dir); } +TEST(node_server, race_condition) +{ + struct contexts { + using cryptonote = cryptonote::cryptonote_connection_context; + using p2p = nodetool::p2p_connection_context_t; + }; + using context_t = contexts::cryptonote; + using options_t = boost::program_options::variables_map; + using options_description_t = boost::program_options::options_description; + using worker_t = std::thread; + struct protocol_t { + private: + using p2p_endpoint_t = nodetool::i_p2p_endpoint; + using lock_t = std::mutex; + using condition_t = std::condition_variable_any; + using unique_lock_t = std::unique_lock; + p2p_endpoint_t *p2p_endpoint; + lock_t lock; + condition_t condition; + bool started{}; + size_t counter{}; + public: + using payload_t = cryptonote::CORE_SYNC_DATA; + using blob_t = cryptonote::blobdata; + using connection_context = context_t; + using payload_type = payload_t; + using relay_t = cryptonote::relay_method; + using string_t = std::string; + using span_t = epee::span; + using blobs_t = epee::span; + using connections_t = std::list; + using block_queue_t = cryptonote::block_queue; + using stripes_t = std::pair; + using byte_stream_t = epee::byte_stream; + struct core_events_t: cryptonote::i_core_events { + uint64_t get_current_blockchain_height() const override { return {}; } + bool is_synchronized() const override { return {}; } + void on_transactions_relayed(blobs_t blobs, relay_t relay) override {} + }; + int handle_invoke_map(bool is_notify, int command, const span_t in, byte_stream_t &out, context_t &context, bool &handled) { + return {}; + } + bool on_idle() { + if (not p2p_endpoint) + return {}; + { + unique_lock_t guard(lock); + if (not started) + started = true; + else + return {}; + } + std::vector txs(128 / 64 * 1024 * 1024, blob_t(1, 'x')); + worker_t worker([this]{ + p2p_endpoint->for_each_connection( + [this](context_t &, uint64_t, uint32_t){ + { + unique_lock_t guard(lock); + ++counter; + condition.notify_all(); + condition.wait(guard, [this]{ return counter >= 3; }); + } + std::this_thread::sleep_for(std::chrono::milliseconds(8)); + return false; + } + ); + }); + { + unique_lock_t guard(lock); + ++counter; + condition.notify_all(); + condition.wait(guard, [this]{ return counter >= 3; }); + ++counter; + condition.notify_all(); + condition.wait(guard, [this]{ return counter >= 5; }); + } + p2p_endpoint->send_txs( + std::move(txs), + epee::net_utils::zone::public_, + {}, + relay_t::fluff + ); + worker.join(); + return {}; + } + bool init(const options_t &options) { return {}; } + bool deinit() { return {}; } + void set_p2p_endpoint(p2p_endpoint_t *p2p_endpoint) { + this->p2p_endpoint = p2p_endpoint; + } + bool process_payload_sync_data(const payload_t &payload, contexts::p2p &context, bool is_inital) { + context.m_state = context_t::state_normal; + context.m_needed_objects.resize(512 * 1024); + { + unique_lock_t guard(lock); + ++counter; + condition.notify_all(); + condition.wait(guard, [this]{ return counter >= 3; }); + ++counter; + condition.notify_all(); + condition.wait(guard, [this]{ return counter >= 5; }); + } + return true; + } + bool get_payload_sync_data(blob_t &blob) { return {}; } + bool get_payload_sync_data(payload_t &payload) { return {}; } + bool on_callback(context_t &context) { return {}; } + core_events_t &get_core(){ static core_events_t core_events; return core_events;} + void log_connections() {} + connections_t get_connections() { return {}; } + const block_queue_t &get_block_queue() const { + static block_queue_t block_queue; + return block_queue; + } + void stop() {} + void on_connection_close(context_t &context) {} + void set_max_out_peers(unsigned int max) {} + bool no_sync() const { return {}; } + void set_no_sync(bool value) {} + string_t get_peers_overview() const { return {}; } + stripes_t get_next_needed_pruning_stripe() const { return {}; } + bool needs_new_sync_connections() const { return {}; } + bool is_busy_syncing() { return {}; } + }; + using node_server_t = nodetool::node_server; + auto conduct_test = [](protocol_t &protocol){ + struct messages { + struct core { + using sync = cryptonote::CORE_SYNC_DATA; + }; + using handshake = nodetool::COMMAND_HANDSHAKE_T; + }; + using handler_t = epee::levin::async_protocol_handler; + using connection_t = epee::net_utils::connection; + using connection_ptr = boost::shared_ptr; + using shared_state_t = typename connection_t::shared_state; + using shared_state_ptr = std::shared_ptr; + using io_context_t = boost::asio::io_service; + using work_t = boost::asio::io_service::work; + using work_ptr = std::shared_ptr; + using workers_t = std::vector; + using endpoint_t = boost::asio::ip::tcp::endpoint; + using event_t = epee::simple_event; + struct command_handler_t: epee::levin::levin_commands_handler { + using span_t = epee::span; + using byte_stream_t = epee::byte_stream; + int invoke(int, const span_t, byte_stream_t &, context_t &) override { return {}; } + int notify(int, const span_t, context_t &) override { return {}; } + void callback(context_t &) override {} + void on_connection_new(context_t &) override {} + void on_connection_close(context_t &) override {} + ~command_handler_t() override {} + static void destroy(epee::levin::levin_commands_handler* ptr) { delete ptr; } + }; + io_context_t io_context; + work_ptr work = std::make_shared(io_context); + workers_t workers; + while (workers.size() < 4) { + workers.emplace_back([&io_context]{ + io_context.run(); + }); + } + io_context.post([&]{ + protocol.on_idle(); + }); + io_context.post([&]{ + protocol.on_idle(); + }); + shared_state_ptr shared_state = std::make_shared(); + shared_state->set_handler(new command_handler_t, &command_handler_t::destroy); + connection_ptr conn{new connection_t(io_context, shared_state, {}, {})}; + endpoint_t endpoint(boost::asio::ip::address::from_string("127.0.0.1"), 48080); + conn->socket().connect(endpoint); + conn->socket().set_option(boost::asio::ip::tcp::socket::reuse_address(true)); + conn->start({}, {}); + context_t context; + conn->get_context(context); + event_t handshaked; + typename messages::handshake::request_t msg{{ + ::config::NETWORK_ID, + 58080, + }}; + epee::net_utils::async_invoke_remote_command2( + context, + messages::handshake::ID, + msg, + *shared_state, + [conn, &handshaked](int code, const typename messages::handshake::response &msg, context_t &context){ + EXPECT_TRUE(code >= 0); + handshaked.raise(); + }, + P2P_DEFAULT_HANDSHAKE_INVOKE_TIMEOUT + ); + handshaked.wait(); + conn->strand_.post([conn]{ + conn->cancel(); + }); + conn.reset(); + work.reset(); + for (auto& w: workers) { + w.join(); + } + }; + using path_t = boost::filesystem::path; + using ec_t = boost::system::error_code; + auto create_dir = []{ + ec_t ec; + path_t path = boost::filesystem::temp_directory_path() / boost::filesystem::unique_path("daemon-%%%%%%%%%%%%%%%%", ec); + if (ec) + return path_t{}; + auto success = boost::filesystem::create_directory(path, ec); + if (not ec && success) + return path; + return path_t{}; + }; + auto remove_tree = [](const path_t &path){ + ec_t ec; + boost::filesystem::remove_all(path, ec); + }; + const auto dir = create_dir(); + ASSERT_TRUE(not dir.empty()); + protocol_t protocol{}; + node_server_t node_server(protocol); + protocol.set_p2p_endpoint(&node_server); + node_server.init( + [&dir]{ + options_t options; + boost::program_options::store( + boost::program_options::command_line_parser({ + "--p2p-bind-ip=127.0.0.1", + "--p2p-bind-port=48080", + "--out-peers=0", + "--data-dir", + dir.string(), + "--no-igd", + "--add-exclusive-node=127.0.0.1:48080", + "--check-updates=disabled", + "--disable-dns-checkpoints", + }).options([]{ + options_description_t options_description{}; + cryptonote::core::init_options(options_description); + node_server_t::init_options(options_description); + return options_description; + }()).run(), + options + ); + return options; + }() + ); + worker_t worker([&]{ + node_server.run(); + }); + conduct_test(protocol); + node_server.send_stop_signal(); + worker.join(); + node_server.deinit(); + remove_tree(dir); +} + namespace nodetool { template class node_server>; } namespace cryptonote { template class t_cryptonote_protocol_handler; }