Merge pull request #7669

679d055 Remove payload copy in all outgoing p2p messages (Lee Clagett)
pull/7681/head
luigi1111 3 years ago
commit 0a1ddc2eff
No known key found for this signature in database
GPG Key ID: F4ACA0183641E010

@ -31,6 +31,7 @@
#include <cstdint>
#include "byte_stream.h"
#include "net_utils_base.h"
#include "span.h"
@ -83,11 +84,12 @@ namespace levin
#define LEVIN_PROTOCOL_VER_0 0
#define LEVIN_PROTOCOL_VER_1 1
template<class t_connection_context = net_utils::connection_context_base>
struct levin_commands_handler
{
virtual int invoke(int command, const epee::span<const uint8_t> in_buff, byte_slice& buff_out, t_connection_context& context)=0;
virtual int invoke(int command, const epee::span<const uint8_t> in_buff, byte_stream& buff_out, t_connection_context& context)=0;
virtual int notify(int command, const epee::span<const uint8_t> in_buff, t_connection_context& context)=0;
virtual void callback(t_connection_context& context){};
@ -125,12 +127,41 @@ namespace levin
}
}
//! Provides space for levin (p2p) header, so that payload can be sent without copy
class message_writer
{
byte_slice finalize(uint32_t command, uint32_t flags, uint32_t return_code, bool expect_response);
public:
using header = bucket_head2;
explicit message_writer(std::size_t reserve = 8192);
message_writer(const message_writer&) = delete;
message_writer(message_writer&&) = default;
~message_writer() = default;
message_writer& operator=(const message_writer&) = delete;
message_writer& operator=(message_writer&&) = default;
//! \return Size of payload (excludes header size).
std::size_t payload_size() const noexcept
{
return buffer.size() < sizeof(header) ? 0 : buffer.size() - sizeof(header);
}
byte_slice finalize_invoke(uint32_t command) { return finalize(command, LEVIN_PACKET_REQUEST, 0, true); }
byte_slice finalize_notify(uint32_t command) { return finalize(command, LEVIN_PACKET_REQUEST, 0, false); }
byte_slice finalize_response(uint32_t command, uint32_t return_code)
{
return finalize(command, LEVIN_PACKET_RESPONSE, return_code, false);
}
//! Has space for levin header until a finalize method is used
byte_stream buffer;
};
//! \return Intialized levin header.
bucket_head2 make_header(uint32_t command, uint64_t msg_size, uint32_t flags, bool expect_response) noexcept;
//! \return A levin notification message.
byte_slice make_notify(int command, epee::span<const std::uint8_t> payload);
/*! Generate a dummy levin message.
\param noise_bytes Total size of the returned `byte_slice`.
@ -140,12 +171,11 @@ namespace levin
/*! Generate 1+ levin messages that are identical to the noise message size.
\param noise Each levin message will be identical to the size of this
message. The bytes from this message will be used for padding.
\param noise_size Each levin message will be identical to this value.
\return `nullptr` if `noise.size()` is less than the levin header size.
Otherwise, a levin notification message OR 2+ levin fragment messages.
Each message is `noise.size()` in length. */
byte_slice make_fragmented_notify(const byte_slice& noise, int command, epee::span<const std::uint8_t> payload);
byte_slice make_fragmented_notify(const std::size_t noise_size, int command, message_writer message);
}
}

@ -51,6 +51,21 @@
#define MIN_BYTES_WANTED 512
#endif
template<typename context_t>
void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, const char* category)
{
MCINFO("net.p2p.traffic", context << bytes << " bytes " << (sent ? "sent" : "received") << (error ? "/corrupt" : "")
<< " for category " << category << " initiated by " << (initiator ? "us" : "peer"));
}
template<typename context_t>
void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, int command)
{
char buf[32];
snprintf(buf, sizeof(buf), "command-%u", command);
on_levin_traffic(context, initiator, sent, error, bytes, buf);
}
namespace epee
{
namespace levin
@ -88,11 +103,10 @@ public:
uint64_t m_max_packet_size;
uint64_t m_invoke_timeout;
int invoke(int command, const epee::span<const uint8_t> in_buff, std::string& buff_out, boost::uuids::uuid connection_id);
int invoke(int command, message_writer in_msg, std::string& buff_out, boost::uuids::uuid connection_id);
template<class callback_t>
int invoke_async(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED);
int invoke_async(int command, message_writer in_msg, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED);
int notify(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id);
int send(epee::byte_slice message, const boost::uuids::uuid& connection_id);
bool close(boost::uuids::uuid connection_id);
bool update_connection_context(const t_connection_context& contxt);
@ -122,12 +136,17 @@ class async_protocol_handler
{
std::string m_fragment_buffer;
bool send_message(uint32_t command, epee::span<const uint8_t> in_buff, uint32_t flags, bool expect_response)
bool send_message(byte_slice message)
{
const bucket_head2 head = make_header(command, in_buff.size(), flags, expect_response);
if(!m_pservice_endpoint->do_send(byte_slice{as_byte_span(head), in_buff}))
if (message.size() < sizeof(message_writer::header))
return false;
message_writer::header head;
std::memcpy(std::addressof(head), message.data(), sizeof(head));
if(!m_pservice_endpoint->do_send(std::move(message)))
return false;
on_levin_traffic(m_connection_context, true, true, false, head.m_cb, head.m_command);
MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb
<< ", flags" << head.m_flags
<< ", r?=" << head.m_have_to_return_data
@ -523,26 +542,17 @@ public:
{
if(m_current_head.m_have_to_return_data)
{
byte_slice return_buff;
levin::message_writer return_message{32 * 1024};
const uint32_t return_code = m_config.m_pcommands_handler->invoke(
m_current_head.m_command, buff_to_invoke, return_buff, m_connection_context
m_current_head.m_command, buff_to_invoke, return_message.buffer, m_connection_context
);
// peer_id remains unset if dropped
if (m_current_head.m_command == m_connection_context.handshake_command() && m_connection_context.handshake_complete())
m_max_packet_size = m_config.m_max_packet_size;
bucket_head2 head = make_header(m_current_head.m_command, return_buff.size(), LEVIN_PACKET_RESPONSE, false);
head.m_return_code = SWAP32LE(return_code);
if(!m_pservice_endpoint->do_send(byte_slice{{epee::as_byte_span(head), epee::to_span(return_buff)}}))
if(!send_message(return_message.finalize_response(m_current_head.m_command, return_code)))
return false;
MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb
<< ", flags" << head.m_flags
<< ", r?=" << head.m_have_to_return_data
<<", cmd = " << head.m_command
<< ", ver=" << head.m_protocol_version);
}
else
m_config.m_pcommands_handler->notify(m_current_head.m_command, buff_to_invoke, m_connection_context);
@ -619,7 +629,7 @@ public:
}
template<class callback_t>
bool async_invoke(int command, const epee::span<const uint8_t> in_buff, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
bool async_invoke(int command, message_writer in_msg, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
{
misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler(
boost::bind(&async_protocol_handler::finish_outer_call, this));
@ -638,7 +648,7 @@ public:
if (command == m_connection_context.handshake_command())
m_max_packet_size = m_config.m_max_packet_size;
if(!send_message(command, in_buff, LEVIN_PACKET_REQUEST, true))
if(!send_message(in_msg.finalize_invoke(command)))
{
LOG_ERROR_CC(m_connection_context, "Failed to do_send");
err_code = LEVIN_ERROR_CONNECTION;
@ -664,7 +674,7 @@ public:
return true;
}
int invoke(int command, const epee::span<const uint8_t> in_buff, std::string& buff_out)
int invoke(int command, message_writer in_msg, std::string& buff_out)
{
misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler(
boost::bind(&async_protocol_handler::finish_outer_call, this));
@ -676,7 +686,7 @@ public:
if (command == m_connection_context.handshake_command())
m_max_packet_size = m_config.m_max_packet_size;
if (!send_message(command, in_buff, LEVIN_PACKET_REQUEST, true))
if (!send_message(in_msg.finalize_invoke(command)))
{
LOG_ERROR_CC(m_connection_context, "Failed to send request");
return LEVIN_ERROR_CONNECTION;
@ -713,25 +723,9 @@ public:
return m_invoke_result_code;
}
int notify(int command, const epee::span<const uint8_t> in_buff)
{
misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler(
boost::bind(&async_protocol_handler::finish_outer_call, this));
CRITICAL_REGION_LOCAL(m_call_lock);
if (!send_message(command, in_buff, LEVIN_PACKET_REQUEST, false))
{
LOG_ERROR_CC(m_connection_context, "Failed to send notify message");
return -1;
}
return 1;
}
/*! Sends `message` without adding a levin header. The message must have
been created with `make_notify`, `make_noise_notify` or
`make_fragmented_notify`. See additional instructions for
/*! Sends `message` without adding a levin header. The message must have been
created with `make_noise_notify`, `make_fragmented_notify`, or
`message_writer::finalize_notify`. See additional instructions for
`make_fragmented_notify`.
\return 1 on success */
@ -741,14 +735,11 @@ public:
boost::bind(&async_protocol_handler::finish_outer_call, this)
);
const std::size_t length = message.size();
if (!m_pservice_endpoint->do_send(std::move(message)))
if (!send_message(std::move(message)))
{
LOG_ERROR_CC(m_connection_context, "Failed to send message, dropping it");
return -1;
}
MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << (length - sizeof(bucket_head2)) << ", r?=0]");
return 1;
}
//------------------------------------------------------------------------------------------
@ -838,19 +829,19 @@ int async_protocol_handler_config<t_connection_context>::find_and_lock_connectio
}
//------------------------------------------------------------------------------------------
template<class t_connection_context>
int async_protocol_handler_config<t_connection_context>::invoke(int command, const epee::span<const uint8_t> in_buff, std::string& buff_out, boost::uuids::uuid connection_id)
int async_protocol_handler_config<t_connection_context>::invoke(int command, message_writer in_msg, std::string& buff_out, boost::uuids::uuid connection_id)
{
async_protocol_handler<t_connection_context>* aph;
int r = find_and_lock_connection(connection_id, aph);
return LEVIN_OK == r ? aph->invoke(command, in_buff, buff_out) : r;
return LEVIN_OK == r ? aph->invoke(command, std::move(in_msg), buff_out) : r;
}
//------------------------------------------------------------------------------------------
template<class t_connection_context> template<class callback_t>
int async_protocol_handler_config<t_connection_context>::invoke_async(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout)
int async_protocol_handler_config<t_connection_context>::invoke_async(int command, message_writer in_msg, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout)
{
async_protocol_handler<t_connection_context>* aph;
int r = find_and_lock_connection(connection_id, aph);
return LEVIN_OK == r ? aph->async_invoke(command, in_buff, cb, timeout) : r;
return LEVIN_OK == r ? aph->async_invoke(command, std::move(in_msg), cb, timeout) : r;
}
//------------------------------------------------------------------------------------------
template<class t_connection_context> template<class callback_t>
@ -929,14 +920,6 @@ void async_protocol_handler_config<t_connection_context>::set_handler(levin_comm
}
//------------------------------------------------------------------------------------------
template<class t_connection_context>
int async_protocol_handler_config<t_connection_context>::notify(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id)
{
async_protocol_handler<t_connection_context>* aph;
int r = find_and_lock_connection(connection_id, aph);
return LEVIN_OK == r ? aph->notify(command, in_buff) : r;
}
//------------------------------------------------------------------------------------------
template<class t_connection_context>
int async_protocol_handler_config<t_connection_context>::send(byte_slice message, const boost::uuids::uuid& connection_id)
{
async_protocol_handler<t_connection_context>* aph;

@ -37,21 +37,14 @@
#undef MONERO_DEFAULT_LOG_CATEGORY
#define MONERO_DEFAULT_LOG_CATEGORY "net"
template<typename context_t>
void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, const char *category);
template<typename context_t>
void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, int command);
namespace
{
template<typename context_t>
void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, const char *category)
{
MCINFO("net.p2p.traffic", context << bytes << " bytes " << (sent ? "sent" : "received") << (error ? "/corrupt" : "")
<< " for category " << category << " initiated by " << (initiator ? "us" : "peer"));
}
template<typename context_t>
void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, int command)
{
char buf[32];
snprintf(buf, sizeof(buf), "command-%u", command);
return on_levin_traffic(context, initiator, sent, error, bytes, buf);
}
static const constexpr epee::serialization::portable_storage::limits_t default_levin_limits = {
8192, // objects
16384, // fields
@ -117,12 +110,11 @@ namespace epee
const boost::uuids::uuid &conn_id = context.m_connection_id;
typename serialization::portable_storage stg;
out_struct.store(stg);
byte_slice buff_to_send;
levin::message_writer to_send{16 * 1024};
std::string buff_to_recv;
stg.store_to_binary(buff_to_send, 16 * 1024);
stg.store_to_binary(to_send.buffer);
on_levin_traffic(context, true, true, false, buff_to_send.size(), command);
int res = transport.invoke(command, boost::string_ref{reinterpret_cast<const char*>(buff_to_send.data()), buff_to_send.size()}, buff_to_recv, conn_id);
int res = transport.invoke(command, std::move(to_send), buff_to_recv, conn_id);
if( res <=0 )
{
LOG_PRINT_L1("Failed to invoke command " << command << " return code " << res);
@ -145,10 +137,9 @@ namespace epee
const boost::uuids::uuid &conn_id = context.m_connection_id;
typename serialization::portable_storage stg;
const_cast<t_arg&>(out_struct).store(stg);//TODO: add true const support to searilzation
byte_slice buff_to_send;
stg.store_to_binary(buff_to_send, 16 * 1024);
on_levin_traffic(context, true, true, false, buff_to_send.size(), command);
int res = transport.invoke_async(command, epee::to_span(buff_to_send), conn_id, [cb, command](int code, const epee::span<const uint8_t> buff, typename t_transport::connection_context& context)->bool
levin::message_writer to_send{16 * 1024};
stg.store_to_binary(to_send.buffer);
int res = transport.invoke_async(command, std::move(to_send), conn_id, [cb, command](int code, const epee::span<const uint8_t> buff, typename t_transport::connection_context& context)->bool
{
t_result result_struct = AUTO_VAL_INIT(result_struct);
if( code <=0 )
@ -192,11 +183,10 @@ namespace epee
const boost::uuids::uuid &conn_id = context.m_connection_id;
serialization::portable_storage stg;
out_struct.store(stg);
byte_slice buff_to_send;
stg.store_to_binary(buff_to_send);
levin::message_writer to_send;
stg.store_to_binary(to_send.buffer);
on_levin_traffic(context, true, true, false, buff_to_send.size(), command);
int res = transport.notify(command, epee::to_span(buff_to_send), conn_id);
int res = transport.send(to_send.finalize_notify(command), conn_id);
if(res <=0 )
{
MERROR("Failed to notify command " << command << " return code " << res);
@ -207,7 +197,7 @@ namespace epee
//----------------------------------------------------------------------------------------------------
//----------------------------------------------------------------------------------------------------
template<class t_owner, class t_in_type, class t_out_type, class t_context, class callback_t>
int buff_to_t_adapter(int command, const epee::span<const uint8_t> in_buff, byte_slice& buff_out, callback_t cb, t_context& context )
int buff_to_t_adapter(int command, const epee::span<const uint8_t> in_buff, byte_stream& buff_out, callback_t cb, t_context& context )
{
serialization::portable_storage strg;
if(!strg.load_from_binary(in_buff, &default_levin_limits))
@ -230,12 +220,11 @@ namespace epee
serialization::portable_storage strg_out;
static_cast<t_out_type&>(out_struct).store(strg_out);
if(!strg_out.store_to_binary(buff_out, 32 * 1024))
if(!strg_out.store_to_binary(buff_out))
{
LOG_ERROR("Failed to store_to_binary in command" << command);
return -1;
}
on_levin_traffic(context, false, true, false, buff_out.size(), command);
return res;
}
@ -262,7 +251,7 @@ namespace epee
}
#define CHAIN_LEVIN_INVOKE_MAP2(context_type) \
int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_slice& buff_out, context_type& context) \
int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_stream& buff_out, context_type& context) \
{ \
bool handled = false; \
return handle_invoke_map(false, command, in_buff, buff_out, context, handled); \
@ -271,13 +260,13 @@ namespace epee
#define CHAIN_LEVIN_NOTIFY_MAP2(context_type) \
int notify(int command, const epee::span<const uint8_t> in_buff, context_type& context) \
{ \
bool handled = false; epee::byte_slice fake_str; \
return handle_invoke_map(true, command, in_buff, fake_str, context, handled); \
bool handled = false; epee::byte_stream fake_str; \
return handle_invoke_map(true, command, in_buff, fake_str, context, handled); \
}
#define CHAIN_LEVIN_INVOKE_MAP() \
int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_slice& buff_out, epee::net_utils::connection_context_base& context) \
int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_stream& buff_out, epee::net_utils::connection_context_base& context) \
{ \
bool handled = false; \
return handle_invoke_map(false, command, in_buff, buff_out, context, handled); \
@ -297,7 +286,7 @@ namespace epee
}
#define BEGIN_INVOKE_MAP2(owner_type) \
template <class t_context> int handle_invoke_map(bool is_notify, int command, const epee::span<const uint8_t> in_buff, epee::byte_slice& buff_out, t_context& context, bool& handled) \
template <class t_context> int handle_invoke_map(bool is_notify, int command, const epee::span<const uint8_t> in_buff, epee::byte_stream& buff_out, t_context& context, bool& handled) \
{ \
try { \
typedef owner_type internal_owner_type_name;

@ -34,6 +34,7 @@
namespace epee
{
class byte_slice;
class byte_stream;
namespace serialization
{
/************************************************************************/
@ -83,8 +84,13 @@ namespace epee
//-------------------------------------------------------------------------------
bool store_to_binary(byte_slice& target, std::size_t initial_buffer_size = 8192);
bool load_from_binary(const epee::span<const uint8_t> target, const limits_t *limits = NULL);
bool load_from_binary(const std::string& target, const limits_t *limits = NULL);
bool store_to_binary(byte_stream& ss);
bool load_from_binary(const epee::span<const uint8_t> target, const limits_t *limits = nullptr);
bool load_from_binary(const std::string& target, const limits_t *limits = nullptr)
{
return load_from_binary(epee::strspan<uint8_t>(target), limits);
}
template<class trace_policy>
bool dump_as_xml(std::string& targetObj, const std::string& root_name = "");
bool dump_as_json(std::string& targetObj, size_t indent = 0, bool insert_newlines = true);

@ -36,6 +36,8 @@
namespace epee
{
class byte_stream;
namespace serialization
{
//-----------------------------------------------------------------------------------------------------------
@ -127,5 +129,14 @@ namespace epee
store_t_to_binary(str_in, binary_buff, initial_buffer_size);
return binary_buff;
}
//-----------------------------------------------------------------------------------------------------------
template<class t_struct>
bool store_t_to_binary(t_struct& str_in, byte_stream& binary_buff)
{
portable_storage ps;
str_in.store(ps);
return ps.store_to_binary(binary_buff);
}
}
}

@ -34,6 +34,25 @@ namespace epee
{
namespace levin
{
message_writer::message_writer(const std::size_t reserve)
: buffer()
{
buffer.reserve(reserve);
buffer.put_n(0, sizeof(header));
}
byte_slice message_writer::finalize(const uint32_t command, const uint32_t flags, const uint32_t return_code, const bool expect_response)
{
if (buffer.size() < sizeof(header))
throw std::runtime_error{"levin_writer::finalize already called"};
header head = make_header(command, payload_size(), flags, expect_response);
head.m_return_code = SWAP32LE(return_code);
std::memcpy(buffer.tellp() - buffer.size(), std::addressof(head), sizeof(head));
return byte_slice{std::move(buffer)};
}
bucket_head2 make_header(uint32_t command, uint64_t msg_size, uint32_t flags, bool expect_response) noexcept
{
bucket_head2 head = {0};
@ -47,12 +66,6 @@ namespace levin
return head;
}
byte_slice make_notify(int command, epee::span<const std::uint8_t> payload)
{
const bucket_head2 head = make_header(command, payload.size(), LEVIN_PACKET_REQUEST, false);
return byte_slice{epee::as_byte_span(head), payload};
}
byte_slice make_noise_notify(const std::size_t noise_bytes)
{
static constexpr const std::uint32_t flags =
@ -68,46 +81,40 @@ namespace levin
return byte_slice{std::move(buffer)};
}
byte_slice make_fragmented_notify(const byte_slice& noise_message, int command, epee::span<const std::uint8_t> payload)
byte_slice make_fragmented_notify(const std::size_t noise_size, const int command, message_writer message)
{
const size_t noise_size = noise_message.size();
if (noise_size < sizeof(bucket_head2) * 2)
return nullptr;
if (payload.size() <= noise_size - sizeof(bucket_head2))
if (message.buffer.size() <= noise_size)
{
/* The entire message can be sent at once, and the levin binary parser
will ignore extra bytes. So just pad with zeroes and otherwise send
a "normal", not fragmented message. */
const size_t padding = noise_size - sizeof(bucket_head2) - payload.size();
const span<const uint8_t> padding_bytes{noise_message.end() - padding, padding};
const bucket_head2 head = make_header(command, noise_size - sizeof(bucket_head2), LEVIN_PACKET_REQUEST, false);
return byte_slice{as_byte_span(head), payload, padding_bytes};
message.buffer.put_n(0, noise_size - message.buffer.size());
return message.finalize_notify(command);
}
// fragment message
const byte_slice payload_bytes = message.finalize_notify(command);
span<const std::uint8_t> payload = to_span(payload_bytes);
const size_t payload_space = noise_size - sizeof(bucket_head2);
const size_t expected_fragments = ((payload.size() - 2) / payload_space) + 1;
std::string buffer{};
buffer.reserve((expected_fragments + 1) * noise_size); // +1 here overselects for internal bucket_head2 value
byte_stream buffer{};
buffer.reserve(expected_fragments * noise_size);
bucket_head2 head = make_header(0, noise_size - sizeof(bucket_head2), LEVIN_PACKET_BEGIN, false);
buffer.append(reinterpret_cast<const char*>(&head), sizeof(head));
bucket_head2 head = make_header(0, payload_space, LEVIN_PACKET_BEGIN, false);
buffer.write(as_byte_span(head));
head.m_command = command;
head.m_flags = LEVIN_PACKET_REQUEST;
head.m_cb = payload.size();
buffer.append(reinterpret_cast<const char*>(&head), sizeof(head));
// internal levin header is in payload already
size_t copy_size = payload.remove_prefix(payload_space - sizeof(bucket_head2));
buffer.append(reinterpret_cast<const char*>(payload.data()) - copy_size, copy_size);
size_t copy_size = payload.remove_prefix(payload_space);
buffer.write(payload.data() - copy_size, copy_size);
head.m_command = 0;
head.m_flags = 0;
head.m_cb = noise_size - sizeof(bucket_head2);
while (!payload.empty())
{
copy_size = payload.remove_prefix(payload_space);
@ -115,12 +122,12 @@ namespace levin
if (payload.empty())
head.m_flags = LEVIN_PACKET_END;
buffer.append(reinterpret_cast<const char*>(&head), sizeof(head));
buffer.append(reinterpret_cast<const char*>(payload.data()) - copy_size, copy_size);
buffer.write(as_byte_span(head));
buffer.write(payload.data() - copy_size, copy_size);
}
const size_t padding = noise_size - copy_size - sizeof(bucket_head2);
buffer.append(reinterpret_cast<const char*>(noise_message.end()) - padding, padding);
buffer.put_n(0, padding);
return byte_slice{std::move(buffer)};
}

@ -48,15 +48,23 @@ namespace serialization
TRY_ENTRY();
byte_stream ss;
ss.reserve(initial_buffer_size);
store_to_binary(ss);
target = epee::byte_slice{std::move(ss)};
return true;
CATCH_ENTRY("portable_storage::store_to_binary", false);
}
bool portable_storage::store_to_binary(byte_stream& ss)
{
TRY_ENTRY();
storage_block_header sbh{};
sbh.m_signature_a = SWAP32LE(PORTABLE_STORAGE_SIGNATUREA);
sbh.m_signature_b = SWAP32LE(PORTABLE_STORAGE_SIGNATUREB);
sbh.m_ver = PORTABLE_STORAGE_FORMAT_VER;
ss.write(epee::as_byte_span(sbh));
pack_entry_to_buff(ss, m_root);
target = epee::byte_slice{std::move(ss)};
return true;
CATCH_ENTRY("portable_storage::store_to_binary", false)
CATCH_ENTRY("portable_storage::store_to_binary", false);
}
bool portable_storage::dump_as_json(std::string& buff, size_t indent, bool insert_newlines)
@ -76,11 +84,6 @@ namespace serialization
CATCH_ENTRY("portable_storage::load_from_json", false)
}
bool portable_storage::load_from_binary(const std::string& target, const limits_t *limits)
{
return load_from_binary(epee::strspan<uint8_t>(target), limits);
}
bool portable_storage::load_from_binary(const epee::span<const uint8_t> source, const limits_t *limits)
{
m_root.m_entries.clear();

@ -57,6 +57,7 @@
#include "common/notify.h"
#include "common/varint.h"
#include "common/pruning.h"
#include "time_helper.h"
#undef MONERO_DEFAULT_LOG_CATEGORY
#define MONERO_DEFAULT_LOG_CATEGORY "blockchain"

@ -46,6 +46,8 @@
#include "block_queue.h"
#include "common/perf_timer.h"
#include "cryptonote_basic/connection_context.h"
#include "net/levin_base.h"
#include "p2p/net_node_common.h"
#include <boost/circular_buffer.hpp>
PUSH_WARNINGS
@ -195,10 +197,11 @@ namespace cryptonote
bool post_notify(typename t_parameter::request& arg, cryptonote_connection_context& context)
{
LOG_PRINT_L2("[" << epee::net_utils::print_connection_context_short(context) << "] post " << typeid(t_parameter).name() << " -->");
epee::byte_slice blob;
epee::serialization::store_t_to_binary(arg, blob, 256 * 1024); // optimize for block responses
epee::levin::message_writer out{256 * 1024}; // optimize for block responses
epee::serialization::store_t_to_binary(arg, out.buffer);
//handler_response_blocks_now(blob.size()); // XXX
return m_p2p->invoke_notify_to_peer(t_parameter::ID, epee::to_span(blob), context);
return m_p2p->invoke_notify_to_peer(t_parameter::ID, std::move(out), context);
}
};

@ -2713,15 +2713,15 @@ skip:
// send fluffy ones first, we want to encourage people to run that
if (!fluffyConnections.empty())
{
epee::byte_slice fluffyBlob;
epee::serialization::store_t_to_binary(fluffy_arg, fluffyBlob, 32 * 1024);
m_p2p->relay_notify_to_list(NOTIFY_NEW_FLUFFY_BLOCK::ID, epee::to_span(fluffyBlob), std::move(fluffyConnections));
epee::levin::message_writer fluffyBlob{32 * 1024};
epee::serialization::store_t_to_binary(fluffy_arg, fluffyBlob.buffer);
m_p2p->relay_notify_to_list(NOTIFY_NEW_FLUFFY_BLOCK::ID, std::move(fluffyBlob), std::move(fluffyConnections));
}
if (!fullConnections.empty())
{
epee::byte_slice fullBlob;
epee::serialization::store_t_to_binary(arg, fullBlob, 128 * 1024);
m_p2p->relay_notify_to_list(NOTIFY_NEW_BLOCK::ID, epee::to_span(fullBlob), std::move(fullConnections));
epee::levin::message_writer fullBlob{128 * 1024};
epee::serialization::store_t_to_binary(arg, fullBlob.buffer);
m_p2p->relay_notify_to_list(NOTIFY_NEW_BLOCK::ID, std::move(fullBlob), std::move(fullConnections));
}
return true;

@ -30,8 +30,8 @@
#pragma once
#include "p2p/net_node_common.h"
#include "cryptonote_protocol/cryptonote_protocol_defs.h"
#include "cryptonote_protocol/enums.h"
#include "cryptonote_basic/connection_context.h"
namespace cryptonote
{

@ -159,7 +159,7 @@ namespace levin
return get_out_connections(p2p, get_blockchain_height(p2p, core));
}
epee::byte_slice make_tx_payload(std::vector<blobdata>&& txs, const bool pad, const bool fluff)
epee::levin::message_writer make_tx_message(std::vector<blobdata>&& txs, const bool pad, const bool fluff)
{
NOTIFY_NEW_TRANSACTIONS::request request{};
request.txs = std::move(txs);
@ -193,21 +193,17 @@ namespace levin
// if the size of _ moved enough, we might lose byte in size encoding, we don't care
}
epee::byte_slice fullBlob;
if (!epee::serialization::store_t_to_binary(request, fullBlob))
epee::levin::message_writer out;
if (!epee::serialization::store_t_to_binary(request, out.buffer))
throw std::runtime_error{"Failed to serialize to epee binary format"};
return fullBlob;
return out;
}
bool make_payload_send_txs(connections& p2p, std::vector<blobdata>&& txs, const boost::uuids::uuid& destination, const bool pad, const bool fluff)
{
const epee::byte_slice blob = make_tx_payload(std::move(txs), pad, fluff);
p2p.for_connection(destination, [&blob](detail::p2p_context& context) {
on_levin_traffic(context, true, true, false, blob.size(), NOTIFY_NEW_TRANSACTIONS::ID);
return true;
});
return p2p.notify(NOTIFY_NEW_TRANSACTIONS::ID, epee::to_span(blob), destination);
epee::byte_slice blob = make_tx_message(std::move(txs), pad, fluff).finalize_notify(NOTIFY_NEW_TRANSACTIONS::ID);
return p2p.send(std::move(blob), destination);
}
/* The current design uses `asio::strand`s. The documentation isn't as clear
@ -653,10 +649,6 @@ namespace levin
else
message = zone_->noise.clone();
zone_->p2p->for_connection(channel.connection, [&](detail::p2p_context& context) {
on_levin_traffic(context, true, true, false, message.size(), "noise");
return true;
});
if (zone_->p2p->send(std::move(message), channel.connection))
{
if (!channel.queue.empty() && channel.active.empty())
@ -816,9 +808,8 @@ namespace levin
// Padding is not useful when using noise mode. Send as stem so receiver
// forwards in Dandelion++ mode.
const epee::byte_slice payload = make_tx_payload(std::move(txs), false, false);
epee::byte_slice message = epee::levin::make_fragmented_notify(
zone_->noise, NOTIFY_NEW_TRANSACTIONS::ID, epee::to_span(payload)
zone_->noise.size(), NOTIFY_NEW_TRANSACTIONS::ID, make_tx_message(std::move(txs), false, false)
);
if (CRYPTONOTE_MAX_FRAGMENTS * zone_->noise.size() < message.size())
{

@ -344,10 +344,9 @@ namespace nodetool
virtual void on_connection_close(p2p_connection_context& context);
virtual void callback(p2p_connection_context& context);
//----------------- i_p2p_endpoint -------------------------------------------------------------
virtual bool relay_notify_to_list(int command, const epee::span<const uint8_t> data_buff, std::vector<std::pair<epee::net_utils::zone, boost::uuids::uuid>> connections);
virtual bool relay_notify_to_list(int command, epee::levin::message_writer message, std::vector<std::pair<epee::net_utils::zone, boost::uuids::uuid>> connections) final;
virtual epee::net_utils::zone send_txs(std::vector<cryptonote::blobdata> txs, const epee::net_utils::zone origin, const boost::uuids::uuid& source, cryptonote::relay_method tx_relay);
virtual bool invoke_command_to_peer(int command, const epee::span<const uint8_t> req_buff, std::string& resp_buff, const epee::net_utils::connection_context_base& context);
virtual bool invoke_notify_to_peer(int command, const epee::span<const uint8_t> req_buff, const epee::net_utils::connection_context_base& context);
virtual bool invoke_notify_to_peer(int command, epee::levin::message_writer message, const epee::net_utils::connection_context_base& context) final;
virtual bool drop_connection(const epee::net_utils::connection_context_base& context);
virtual void request_callback(const epee::net_utils::connection_context_base& context);
virtual void for_each_connection(std::function<bool(typename t_payload_net_handler::connection_context&, peerid_type, uint32_t)> f);

@ -2175,8 +2175,9 @@ namespace nodetool
}
//-----------------------------------------------------------------------------------
template<class t_payload_net_handler>
bool node_server<t_payload_net_handler>::relay_notify_to_list(int command, const epee::span<const uint8_t> data_buff, std::vector<std::pair<epee::net_utils::zone, boost::uuids::uuid>> connections)
bool node_server<t_payload_net_handler>::relay_notify_to_list(int command, epee::levin::message_writer data_buff, std::vector<std::pair<epee::net_utils::zone, boost::uuids::uuid>> connections)
{
epee::byte_slice message = data_buff.finalize_notify(command);
std::sort(connections.begin(), connections.end());
auto zone = m_network_zones.begin();
for(const auto& c_id: connections)
@ -2194,7 +2195,7 @@ namespace nodetool
++zone;
}
if (zone->first == c_id.first)
zone->second.m_net_server.get_config_object().notify(command, data_buff, c_id.second);
zone->second.m_net_server.get_config_object().send(message.clone(), c_id.second);
}
return true;
}
@ -2261,24 +2262,13 @@ namespace nodetool
}
//-----------------------------------------------------------------------------------
template<class t_payload_net_handler>
bool node_server<t_payload_net_handler>::invoke_notify_to_peer(int command, const epee::span<const uint8_t> req_buff, const epee::net_utils::connection_context_base& context)
bool node_server<t_payload_net_handler>::invoke_notify_to_peer(const int command, epee::levin::message_writer message, const epee::net_utils::connection_context_base& context)
{
if(is_filtered_command(context.m_remote_address, command))
return false;
network_zone& zone = m_network_zones.at(context.m_remote_address.get_zone());
int res = zone.m_net_server.get_config_object().notify(command, req_buff, context.m_connection_id);
return res > 0;
}
//-----------------------------------------------------------------------------------
template<class t_payload_net_handler>
bool node_server<t_payload_net_handler>::invoke_command_to_peer(int command, const epee::span<const uint8_t> req_buff, std::string& resp_buff, const epee::net_utils::connection_context_base& context)
{
if(is_filtered_command(context.m_remote_address, command))
return false;
network_zone& zone = m_network_zones.at(context.m_remote_address.get_zone());
int res = zone.m_net_server.get_config_object().invoke(command, req_buff, resp_buff, context.m_connection_id);
int res = zone.m_net_server.get_config_object().send(message.finalize_notify(command), context.m_connection_id);
return res > 0;
}
//-----------------------------------------------------------------------------------

@ -40,6 +40,8 @@
#include "net/net_utils_base.h"
#include "p2p_protocol_defs.h"
namespace epee { namespace levin { class message_writer; } }
namespace nodetool
{
@ -49,10 +51,9 @@ namespace nodetool
template<class t_connection_context>
struct i_p2p_endpoint
{
virtual bool relay_notify_to_list(int command, const epee::span<const uint8_t> data_buff, std::vector<std::pair<epee::net_utils::zone, boost::uuids::uuid>> connections)=0;
virtual bool relay_notify_to_list(int command, epee::levin::message_writer message, std::vector<std::pair<epee::net_utils::zone, boost::uuids::uuid>> connections)=0;
virtual epee::net_utils::zone send_txs(std::vector<cryptonote::blobdata> txs, const epee::net_utils::zone origin, const boost::uuids::uuid& source, cryptonote::relay_method tx_relay)=0;
virtual bool invoke_command_to_peer(int command, const epee::span<const uint8_t> req_buff, std::string& resp_buff, const epee::net_utils::connection_context_base& context)=0;
virtual bool invoke_notify_to_peer(int command, const epee::span<const uint8_t> req_buff, const epee::net_utils::connection_context_base& context)=0;
virtual bool invoke_notify_to_peer(int command, epee::levin::message_writer message, const epee::net_utils::connection_context_base& context)=0;
virtual bool drop_connection(const epee::net_utils::connection_context_base& context)=0;
virtual void request_callback(const epee::net_utils::connection_context_base& context)=0;
virtual uint64_t get_public_connections_count()=0;
@ -71,7 +72,7 @@ namespace nodetool
template<class t_connection_context>
struct p2p_endpoint_stub: public i_p2p_endpoint<t_connection_context>
{
virtual bool relay_notify_to_list(int command, const epee::span<const uint8_t> data_buff, std::vector<std::pair<epee::net_utils::zone, boost::uuids::uuid>> connections)
virtual bool relay_notify_to_list(int command, epee::levin::message_writer message, std::vector<std::pair<epee::net_utils::zone, boost::uuids::uuid>> connections)
{
return false;
}
@ -79,11 +80,7 @@ namespace nodetool
{
return epee::net_utils::zone::invalid;
}
virtual bool invoke_command_to_peer(int command, const epee::span<const uint8_t> req_buff, std::string& resp_buff, const epee::net_utils::connection_context_base& context)
{
return false;
}
virtual bool invoke_notify_to_peer(int command, const epee::span<const uint8_t> req_buff, const epee::net_utils::connection_context_base& context)
virtual bool invoke_notify_to_peer(int command, epee::levin::message_writer message, const epee::net_utils::connection_context_base& context)
{
return true;
}

@ -68,13 +68,13 @@ namespace
{
}
virtual int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_slice& buff_out, test_levin_connection_context& context)
virtual int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_stream& buff_out, test_levin_connection_context& context)
{
m_invoke_counter.inc();
boost::unique_lock<boost::mutex> lock(m_mutex);
m_last_command = command;
m_last_in_buf = std::string((const char*)in_buff.data(), in_buff.size());
buff_out = m_invoke_out_buf.clone();
buff_out.write(epee::to_span(m_invoke_out_buf));
return m_return_code;
}

@ -67,7 +67,7 @@ namespace net_load_tests
{
}
virtual int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_slice& buff_out, test_connection_context& context)
virtual int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_stream& buff_out, test_connection_context& context)
{
//m_invoke_counter.inc();
//std::unique_lock<std::mutex> lock(m_mutex);

@ -153,7 +153,7 @@ TEST(test_epee_connection, test_lifetime)
delay(delay),
on_connection_close_f(on_connection_close_f)
{}
virtual int invoke(int, const epee::span<const uint8_t>, epee::byte_slice&, context_t&) override { epee::misc_utils::sleep_no_w(delay); return {}; }
virtual int invoke(int, const epee::span<const uint8_t>, epee::byte_stream&, context_t&) override { epee::misc_utils::sleep_no_w(delay); return {}; }
virtual int notify(int, const epee::span<const uint8_t>, context_t&) override { return {}; }
virtual void callback(context_t&) override {}
virtual void on_connection_new(context_t&) override {}
@ -282,7 +282,7 @@ TEST(test_epee_connection, test_lifetime)
for (auto i = 0; i < N; ++i) {
tag = create_connection();
ASSERT_TRUE(shared_state->get_connections_count() == 1);
success = shared_state->invoke_async(1, {}, tag, [](int, const epee::span<const uint8_t>, context_t&){}, TIMEOUT);
success = shared_state->invoke_async(1, epee::levin::message_writer{}, tag, [](int, const epee::span<const uint8_t>, context_t&){}, TIMEOUT);
ASSERT_TRUE(success);
while (shared_state->sock_count == 1) {
success = shared_state->foreach_connection([&shared_state, &tag](context_t&){

@ -59,13 +59,13 @@ namespace
{
}
virtual int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_slice& buff_out, test_levin_connection_context& context)
virtual int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_stream& buff_out, test_levin_connection_context& context)
{
m_invoke_counter.inc();
boost::unique_lock<boost::mutex> lock(m_mutex);
m_last_command = command;
m_last_in_buf = std::string((const char*)in_buff.data(), in_buff.size());
buff_out = m_invoke_out_buf.clone();
buff_out.write(epee::to_span(m_invoke_out_buf));
return m_return_code;
}
@ -434,8 +434,11 @@ TEST_F(positive_test_connection_to_levin_protocol_handler_calls, handler_process
const int expected_command = 4673261;
const std::string in_data(256, 'e');
epee::levin::message_writer message{};
message.buffer.write(epee::to_span(in_data));
const epee::byte_slice noise = epee::levin::make_noise_notify(1024);
const epee::byte_slice notify = epee::levin::make_notify(expected_command, epee::strspan<std::uint8_t>(in_data));
const epee::byte_slice notify = message.finalize_notify(expected_command);
test_connection_ptr conn = create_connection();
@ -468,11 +471,16 @@ TEST_F(positive_test_connection_to_levin_protocol_handler_calls, handler_process
const int expected_command = 4673261;
const int expected_fragmented_command = 46732;
const std::string in_data(256, 'e');
std::string in_fragmented_data(1024 * 4, 'c');
epee::levin::message_writer message{};
message.buffer.write(epee::to_span(in_data));
epee::levin::message_writer in_fragmented_data;
in_fragmented_data.buffer.put_n('c', 1024 * 4);
const epee::byte_slice noise = epee::levin::make_noise_notify(1024);
const epee::byte_slice notify = epee::levin::make_notify(expected_command, epee::strspan<std::uint8_t>(in_data));
epee::byte_slice fragmented = epee::levin::make_fragmented_notify(noise, expected_fragmented_command, epee::strspan<std::uint8_t>(in_fragmented_data));
const epee::byte_slice notify = message.finalize_notify(expected_command);
epee::byte_slice fragmented = epee::levin::make_fragmented_notify(noise.size(), expected_fragmented_command, std::move(in_fragmented_data));
EXPECT_EQ(5u, fragmented.size() / 1024);
EXPECT_EQ(0u, fragmented.size() % 1024);
@ -497,11 +505,13 @@ TEST_F(positive_test_connection_to_levin_protocol_handler_calls, handler_process
ASSERT_TRUE(conn->m_protocol_handler.handle_recv(next.data(), next.size()));
}
in_fragmented_data.resize(((1024 - sizeof(epee::levin::bucket_head2)) * 5) - sizeof(epee::levin::bucket_head2)); // add padding zeroes
std::string compare_buffer(1024 * 4, 'c');
compare_buffer.resize(((1024 - sizeof(epee::levin::bucket_head2)) * 5) - sizeof(epee::levin::bucket_head2)); // add padding zeroes
ASSERT_EQ(4u, m_commands_handler.notify_counter());
ASSERT_EQ(0u, m_commands_handler.invoke_counter());
ASSERT_EQ(expected_fragmented_command, m_commands_handler.last_command());
ASSERT_EQ(in_fragmented_data, m_commands_handler.last_in_buf());
ASSERT_EQ(compare_buffer, m_commands_handler.last_in_buf());
ASSERT_EQ(0u, conn->send_counter());
ASSERT_TRUE(conn->last_send_data().empty());

@ -245,9 +245,9 @@ namespace
return out;
}
virtual int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_slice& buff_out, cryptonote::levin::detail::p2p_context& context) override final
virtual int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_stream& buff_out, cryptonote::levin::detail::p2p_context& context) override final
{
buff_out = nullptr;
buff_out.clear();
invoked_.push_back(
{context.m_connection_id, command, std::string{reinterpret_cast<const char*>(in_buff.data()), in_buff.size()}}
);
@ -384,21 +384,50 @@ TEST(make_header, expect_return)
EXPECT_EQ(0u, header1.m_flags);
}
TEST(make_notify, empty_payload)
TEST(message_writer, invoke_with_empty_payload)
{
const epee::byte_slice message = epee::levin::make_notify(443, nullptr);
const epee::byte_slice message = epee::levin::message_writer{}.finalize_invoke(443);
const epee::levin::bucket_head2 header =
epee::levin::make_header(443, 0, LEVIN_PACKET_REQUEST, true);
ASSERT_EQ(sizeof(header), message.size());
EXPECT_TRUE(std::memcmp(std::addressof(header), message.data(), sizeof(header)) == 0);
}
TEST(message_writer, invoke_with_payload)
{
std::string bytes(100, 'a');
std::generate(bytes.begin(), bytes.end(), crypto::random_device{});
epee::levin::message_writer writer{};
writer.buffer.write(epee::to_span(bytes));
const epee::byte_slice message = writer.finalize_invoke(443);
const epee::levin::bucket_head2 header =
epee::levin::make_header(443, bytes.size(), LEVIN_PACKET_REQUEST, true);
ASSERT_EQ(sizeof(header) + bytes.size(), message.size());
EXPECT_TRUE(std::memcmp(std::addressof(header), message.data(), sizeof(header)) == 0);
EXPECT_TRUE(std::memcmp(bytes.data(), message.data() + sizeof(header), bytes.size()) == 0);
}
TEST(message_writer, notify_with_empty_payload)
{
const epee::byte_slice message = epee::levin::message_writer{}.finalize_notify(443);
const epee::levin::bucket_head2 header =
epee::levin::make_header(443, 0, LEVIN_PACKET_REQUEST, false);
ASSERT_EQ(sizeof(header), message.size());
EXPECT_TRUE(std::memcmp(std::addressof(header), message.data(), sizeof(header)) == 0);
}
TEST(make_notify, with_payload)
TEST(message_writer, notify_with_payload)
{
std::string bytes(100, 'a');
std::generate(bytes.begin(), bytes.end(), crypto::random_device{});
const epee::byte_slice message = epee::levin::make_notify(443, epee::strspan<std::uint8_t>(bytes));
epee::levin::message_writer writer{};
writer.buffer.write(epee::to_span(bytes));
const epee::byte_slice message = writer.finalize_notify(443);
const epee::levin::bucket_head2 header =
epee::levin::make_header(443, bytes.size(), LEVIN_PACKET_REQUEST, false);
@ -407,6 +436,44 @@ TEST(make_notify, with_payload)
EXPECT_TRUE(std::memcmp(bytes.data(), message.data() + sizeof(header), bytes.size()) == 0);
}
TEST(message_writer, response_with_empty_payload)
{
const epee::byte_slice message = epee::levin::message_writer{}.finalize_response(443, 1);
epee::levin::bucket_head2 header =
epee::levin::make_header(443, 0, LEVIN_PACKET_RESPONSE, false);
header.m_return_code = SWAP32LE(1);
ASSERT_EQ(sizeof(header), message.size());
EXPECT_TRUE(std::memcmp(std::addressof(header), message.data(), sizeof(header)) == 0);
}
TEST(message_writer, response_with_payload)
{
std::string bytes(100, 'a');
std::generate(bytes.begin(), bytes.end(), crypto::random_device{});
epee::levin::message_writer writer{};
writer.buffer.write(epee::to_span(bytes));
const epee::byte_slice message = writer.finalize_response(443, 6450);
epee::levin::bucket_head2 header =
epee::levin::make_header(443, bytes.size(), LEVIN_PACKET_RESPONSE, false);
header.m_return_code = SWAP32LE(6450);
ASSERT_EQ(sizeof(header) + bytes.size(), message.size());
EXPECT_TRUE(std::memcmp(std::addressof(header), message.data(), sizeof(header)) == 0);
EXPECT_TRUE(std::memcmp(bytes.data(), message.data() + sizeof(header), bytes.size()) == 0);
}
TEST(message_writer, error)
{
epee::levin::message_writer writer{};
writer.buffer.clear();
EXPECT_THROW(writer.finalize_invoke(0), std::runtime_error);
EXPECT_THROW(writer.finalize_notify(0), std::runtime_error);
EXPECT_THROW(writer.finalize_response(0, 0), std::runtime_error);
}
TEST(make_noise, invalid)
{
EXPECT_TRUE(epee::levin::make_noise_notify(sizeof(epee::levin::bucket_head2) - 1).empty());
@ -428,13 +495,13 @@ TEST(make_noise, valid)
TEST(make_fragment, invalid)
{
EXPECT_TRUE(epee::levin::make_fragmented_notify(nullptr, 0, nullptr).empty());
EXPECT_TRUE(epee::levin::make_fragmented_notify(0, 0, epee::levin::message_writer{}).empty());
}
TEST(make_fragment, single)
{
const epee::byte_slice noise = epee::levin::make_noise_notify(1024);
const epee::byte_slice fragment = epee::levin::make_fragmented_notify(noise, 11, nullptr);
const epee::byte_slice fragment = epee::levin::make_fragmented_notify(noise.size(), 11, epee::levin::message_writer{});
const epee::levin::bucket_head2 header =
epee::levin::make_header(11, 1024 - sizeof(epee::levin::bucket_head2), LEVIN_PACKET_REQUEST, false);
@ -449,8 +516,13 @@ TEST(make_fragment, multiple)
std::string bytes(1024 * 3 - 150, 'a');
std::generate(bytes.begin(), bytes.end(), crypto::random_device{});
epee::levin::message_writer message;
message.buffer.write(epee::to_span(bytes));
const epee::byte_slice noise = epee::levin::make_noise_notify(1024);
epee::byte_slice fragment = epee::levin::make_fragmented_notify(noise, 114, epee::strspan<std::uint8_t>(bytes));
epee::byte_slice fragment = epee::levin::make_fragmented_notify(noise.size(), 114, std::move(message));
EXPECT_EQ(1024 * 3, fragment.size());
epee::levin::bucket_head2 header =
epee::levin::make_header(0, 1024 - sizeof(epee::levin::bucket_head2), LEVIN_PACKET_BEGIN, false);
@ -497,6 +569,7 @@ TEST(make_fragment, multiple)
fragment.take_slice(bytes.size());
EXPECT_EQ(18, fragment.size());
EXPECT_EQ(18, std::count(fragment.cbegin(), fragment.cend(), 0));
}
@ -2164,20 +2237,31 @@ TEST_F(levin_notify, command_max_bytes)
add_connection(true);
std::string bytes(4096, 'h');
std::string payload(4096, 'h');
epee::byte_slice bytes;
{
epee::levin::message_writer dest{};
dest.buffer.write(epee::to_span(payload));
bytes = dest.finalize_notify(ping_command);
}
EXPECT_EQ(1, get_connections().notify(ping_command, epee::strspan<std::uint8_t>(bytes), contexts_.front().get_id()));
EXPECT_EQ(1, get_connections().send(bytes.clone(), contexts_.front().get_id()));
EXPECT_EQ(1u, contexts_.front().process_send_queue(true));
EXPECT_EQ(1u, receiver_.notified_size());
const received_message msg = receiver_.get_raw_notification();
EXPECT_EQ(ping_command, msg.command);
EXPECT_EQ(contexts_.front().get_id(), msg.connection);
EXPECT_EQ(bytes, msg.payload);
EXPECT_EQ(payload, msg.payload);
bytes.push_back('e');
{
payload.push_back('h');
epee::levin::message_writer dest{};
dest.buffer.write(epee::to_span(payload));
bytes = dest.finalize_notify(ping_command);
}
EXPECT_EQ(1, get_connections().notify(ping_command, epee::strspan<std::uint8_t>(bytes), contexts_.front().get_id()));
EXPECT_EQ(1, get_connections().send(std::move(bytes), contexts_.front().get_id()));
EXPECT_EQ(1u, contexts_.front().process_send_queue(false));
EXPECT_EQ(0u, receiver_.notified_size());
}

@ -449,7 +449,6 @@ TEST(cryptonote_protocol_handler, race_condition)
};
struct net_node_t: commands_handler_t, p2p_endpoint_t {
using span_t = epee::span<const uint8_t>;
using string_t = std::string;
using zone_t = epee::net_utils::zone;
using uuid_t = boost::uuids::uuid;
using relay_t = cryptonote::relay_method;
@ -462,12 +461,9 @@ TEST(cryptonote_protocol_handler, race_condition)
using subnets = std::map<epee::net_utils::ipv4_network_subnet, time_t>;
using hosts = std::map<std::string, time_t>;
};
struct slice {
using bytes = epee::byte_slice;
};
shared_state_ptr shared_state;
core_protocol_ptr core_protocol;
virtual int invoke(int command, const span_t in, slice::bytes &out, context_t &context) override {
virtual int invoke(int command, const span_t in, epee::byte_stream &out, context_t &context) override {
if (core_protocol) {
if (command == messages::handshake::ID) {
return epee::net_utils::buff_to_t_adapter<void, typename messages::handshake::request, typename messages::handshake::response>(
@ -491,7 +487,7 @@ TEST(cryptonote_protocol_handler, race_condition)
virtual int notify(int command, const span_t in, context_t &context) override {
if (core_protocol) {
bool handled;
slice::bytes out;
epee::byte_stream out;
return core_protocol->handle_invoke_map(true, command, in, out, context, handled);
}
else
@ -527,22 +523,16 @@ TEST(cryptonote_protocol_handler, race_condition)
else
return {};
}
virtual bool invoke_command_to_peer(int command, const span_t in, string_t& out, const contexts::basic& context) override {
if (shared_state)
return shared_state->invoke(command, in, out, context.m_connection_id);
else
return {};
}
virtual bool invoke_notify_to_peer(int command, const span_t in, const contexts::basic& context) override {
virtual bool invoke_notify_to_peer(int command, epee::levin::message_writer in, const contexts::basic& context) override {
if (shared_state)
return shared_state->notify(command, in, context.m_connection_id);
return shared_state->send(in.finalize_notify(command), context.m_connection_id);
else
return {};
}
virtual bool relay_notify_to_list(int command, const span_t in, connections_t connections) override {
virtual bool relay_notify_to_list(int command, epee::levin::message_writer in, connections_t connections) override {
if (shared_state) {
for (auto &e: connections)
shared_state->notify(command, in, e.second);
shared_state->send(in.finalize_notify(command), e.second);
}
return {};
}

Loading…
Cancel
Save