Added support for "noise" over I1P/Tor to mask Tx transmission.

release-v0.7.1.0
Lee Clagett 4 years ago
parent bdfc63ae4d
commit 3b24b1d082

@ -160,25 +160,6 @@ the system clock is noticeably off (and therefore more fingerprintable),
linking the public IPv4/IPv6 connections with the anonymity networks will be
more difficult.
### Bandwidth Usage
An ISP can passively monitor `monerod` connections from a node and observe when
a transaction is sent over a Tor/I2P connection via timing analysis + size of
data sent during that timeframe. I2P should provide better protection against
this attack - its connections are not circuit based. However, if a node is
only using I2P for broadcasting Monero transactions, the total aggregate of
I2P data would also leak information.
#### Mitigation
There is no current mitigation for the user right now. This attack is fairly
sophisticated, and likely requires support from the internet host of a Monero
user.
In the near future, "whitening" the amount of data sent over anonymity network
connections will be performed. An attempt will be made to make a transaction
broadcast indistinguishable from a peer timed sync command.
### Intermittent Monero Syncing
If a user only runs `monerod` to send a transaction then quit, this can also
@ -208,3 +189,36 @@ is a tradeoff in potential isses. Also, anyone attempting this strategy really
wants to uncover a user, it seems unlikely that this would be performed against
every Tor/I2P user.
### I2P/Tor Stream Used Twice
If a single I2P/Tor stream is used 2+ times for transmitting a transaction, the
operator of the hidden service can conclude that both transactions came from the
same source. If the subsequent transactions spend a change output from the
earlier transactions, this will also reveal the "real" spend in the ring
signature. This issue was (primarily) raised by @secparam on Twitter.
#### Mitigation
`monerod` currently selects two outgoing connections every 5 minutes for
transmitting transactions over I2P/Tor. Using outgoing connections prevents an
adversary from making many incoming connections to obtain information (this
technique was taken from Dandelion). Outgoing connections also do not have a
persistent public key identity - the creation of a new circuit will generate
a new public key identity. The lock time on a change address is ~20 minutes, so
`monerod` will have rotated its selected outgoing connections several times in
most cases. However, the number of outgoing connections is typically a small
fixed number, so there is a decent probability of re-use with the same public
key identity.
@secparam (twitter) recommended changing circuits (Tor) as an additional
precaution. This is likely not a good idea - forcibly requesting Tor to change
circuits is observable by the ISP. Instead, `monerod` should likely disconnect
from peers ocassionally. Tor will rotate circuits every ~10 minutes, so
establishing new connections will use a new public key identity and make it
more difficult for the hidden service to link information. This process will
have to be done carefully because closing/reconnecting connections can also
leak information to hidden services if done improperly.
At the current time, if users need to frequently make transactions, I2P/Tor
will improve privacy from ISPs and other common adversaries, but still have
some metadata leakages to unknown hidden service operators.

@ -0,0 +1,165 @@
# Levin Protocol
This is a document explaining the current design of the levin protocol, as
used by Monero. The protocol is largely inherited from cryptonote, but has
undergone some changes.
This document also may differ from the `struct bucket_head2` in Monero's
code slightly - the spec here is slightly more strict to allow for
extensibility.
One of the goals of this document is to clearly indicate what is being sent
"on the wire" to identify metadata that could de-anonymize users over I2P/Tor.
These issues will be addressed as they are found. See `ANONMITY_NETWORKS.md` in
the top-level folder for any outstanding issues.
> This document does not currently list all data being sent by the monero
> protocol, that portion is a work-in-progress. Please take the time to do it
> if interested in learning about Monero p2p traffic!
## Header
This header is sent for every Monero p2p message.
```
0 1 2 3
0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| 0x01 | 0x21 | 0x01 | 0x01 |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| 0x01 | 0x01 | 0x01 | 0x01 |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Length |
| |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| E. Response | Command
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Return Code
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|Q|S|B|E| Reserved
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| 0x01 | 0x00 | 0x00 |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| 0x00 |
+-+-+-+-+-+-+-+-+
```
### Signature
The first 8 bytes are the "signature" which helps identify the protocol (in
case someone connected to the wrong port, etc). The comments indicate that byte
sequence is from "benders nightmare".
This also can be used by deep packet inspection (DPI) engines to identify
Monero when the link is not encrypted. SSL has been proposed as a means to
mitigate this issue, but BIP-151 or the Noise protocol should also be considered.
### Length
The length is an unsigned 64-bit little endian integer. The length does _not_
include the header.
The implementation currently rejects received messages that exceed 100 MB
(base 10) by default.
### Expect Response
A zero-byte if no response is expected from the peer, and a non-zero byte if a
response is expected from the peer. Peers must respond to requests with this
flag in the same order that they were received, however, other messages can be
sent between responses.
There are some commands in the
[cryptonote protocol](#cryptonote-protocol-commands) where a response is
expected from the peer, but this flag is not set. Those responses are returned
as notify messages and can be sent in any order by the peer.
### Command
An unsigned 32-bit little endian integer representing the Monero specific
command being invoked.
### Return Code
A signed 32-bit little integer integer representing the response from the peer
from the last command that was invoked. This is `0` for request messages.
### Flags
* `Q` - Bit is set if the message is a request.
* `S` - Bit is set if the message is a response.
* `B` - Bit is set if this is a the beginning of a [fragmented message](#fragmented-messages).
* `E` - Bit is set if this is the end of a [fragmented message](#fragmented-messages).
### Version
A fixed value of `1` as an unsigned 32-bit little endian integer.
## Message Flow
The protocol can be subdivided into: (1) notifications, (2) requests,
(3) responses, (4) fragmented messages, and (5) dummy messages. Response
messages must be sent in the same order that a peer issued a request message.
A peer does not have to send a response immediately following a request - any
other message type can be sent instead.
### Notifications
Notifications are one-way messages that can be sent at any time without
an expectation of a response from the peer. The `Q` bit must be set, the `S`,
`B` and `E` bits must be unset, and the `Expect Response` field must be zeroed.
Some notifications must be in response to other notifications. This is not
part of the levin messaging layer, and is described in the
[commands](#commands) section.
### Requests
Requests are the basis of the admin protocol for Monero. The `Q` bit must be
set, the `S`, `B` and `E` bits must be unset, and the `Expect Response` field
must be non-zero. The peer is expected to send a response message with the same
`command` number.
### Responses
Response message can only be sent after a peer first issues a request message.
Responses must have the `S` bit set, the `Q`, `B` and `E` bits unset, and have
a zeroed `Expect Response` field. The `Command` field must be the same value
that was sent in the request message. The `Return Code` is specific to the
`Command` being issued (see [commands])(#commands)).
### Fragmented
Fragmented messages were introduced for the "white noise" feature for i2p/tor.
A transaction can be sent in fragments to conceal when "real" data is being
sent instead of dummy messages. Only one fragmented message can be sent at a
time, and bits `B` and `E` are never set at the same time
(see [dummy messages](#dummy)). The re-constructed message must contain a
levin header for a different (non-fragment) message type.
The `Q` and `S` bits are never set and the `Expect Response` field must always
be zero. The first fragment has the `B` bit set, neither `B` nor `E` is set for
"middle" fragments, and `E` is set for the last fragment.
### Dummy
Dummy messages have the `B` and `E` bits set, the `Q` and `S` bits unset, and
the `Expect Reponse` field zeroed. When a message of this type is received, the
contents can be safely ignored.
## Commands
### P2P (Admin) Commands
#### (`1001` Request) Handshake
#### (`1001` Response) Handshake
#### (`1002` Request) Timed Sync
#### (`1002` Response) Timed Sync
#### (`1003` Request) Ping
#### (`1003` Response) Ping
#### (`1004` Request) Stat Info
#### (`1004` Response) Stat Info
#### (`1005` Request) Network State
#### (`1005` Response) Network State
#### (`1006` Request) Peer ID
#### (`1006` Reponse) Peer ID
#### (`1007` Request) Support Flags
#### (`1007` Response) Support Flags
### Cryptonote Protocol Commands
#### (`2001` Notification) New Block
#### (`2002` Notification) New Transactions
#### (`2003` Notification) Request Get Objects
#### (`2004` Notification) Response Get Objects
#### (`2006` Notification) Request Chain
#### (`2007` Notification) Response Chain Entry
#### (`2008` Notification) New Fluffy Block
#### (`2009` Notification) Request Fluffy Missing TX

@ -49,10 +49,11 @@
#include <boost/asio/ssl.hpp>
#include <boost/array.hpp>
#include <boost/noncopyable.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/shared_ptr.hpp> //! \TODO Convert to std::shared_ptr
#include <boost/enable_shared_from_this.hpp>
#include <boost/interprocess/detail/atomic.hpp>
#include <boost/thread/thread.hpp>
#include <memory>
#include "byte_slice.h"
#include "net_utils_base.h"
#include "syncobj.h"
@ -91,25 +92,24 @@ namespace net_utils
public:
typedef typename t_protocol_handler::connection_context t_connection_context;
struct shared_state : connection_basic_shared_state
struct shared_state : connection_basic_shared_state, t_protocol_handler::config_type
{
shared_state()
: connection_basic_shared_state(), pfilter(nullptr), config(), stop_signal_sent(false)
: connection_basic_shared_state(), t_protocol_handler::config_type(), pfilter(nullptr), stop_signal_sent(false)
{}
i_connection_filter* pfilter;
typename t_protocol_handler::config_type config;
bool stop_signal_sent;
};
/// Construct a connection with the given io_service.
explicit connection( boost::asio::io_service& io_service,
boost::shared_ptr<shared_state> state,
std::shared_ptr<shared_state> state,
t_connection_type connection_type,
epee::net_utils::ssl_support_t ssl_support);
explicit connection( boost::asio::ip::tcp::socket&& sock,
boost::shared_ptr<shared_state> state,
std::shared_ptr<shared_state> state,
t_connection_type connection_type,
epee::net_utils::ssl_support_t ssl_support);
@ -271,7 +271,13 @@ namespace net_utils
typename t_protocol_handler::config_type& get_config_object()
{
assert(m_state != nullptr); // always set in constructor
return m_state->config;
return *m_state;
}
std::shared_ptr<typename t_protocol_handler::config_type> get_config_shared()
{
assert(m_state != nullptr); // always set in constructor
return {m_state};
}
int get_binded_port(){return m_port;}
@ -352,7 +358,7 @@ namespace net_utils
bool is_thread_worker();
const boost::shared_ptr<typename connection<t_protocol_handler>::shared_state> m_state;
const std::shared_ptr<typename connection<t_protocol_handler>::shared_state> m_state;
/// The io_service used to perform asynchronous operations.
struct worker

@ -68,7 +68,7 @@ namespace epee
namespace net_utils
{
template<typename T>
T& check_and_get(boost::shared_ptr<T>& ptr)
T& check_and_get(std::shared_ptr<T>& ptr)
{
CHECK_AND_ASSERT_THROW_MES(bool(ptr), "shared_state cannot be null");
return *ptr;
@ -81,7 +81,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
template<class t_protocol_handler>
connection<t_protocol_handler>::connection( boost::asio::io_service& io_service,
boost::shared_ptr<shared_state> state,
std::shared_ptr<shared_state> state,
t_connection_type connection_type,
ssl_support_t ssl_support
)
@ -91,13 +91,13 @@ PRAGMA_WARNING_DISABLE_VS(4355)
template<class t_protocol_handler>
connection<t_protocol_handler>::connection( boost::asio::ip::tcp::socket&& sock,
boost::shared_ptr<shared_state> state,
std::shared_ptr<shared_state> state,
t_connection_type connection_type,
ssl_support_t ssl_support
)
:
connection_basic(std::move(sock), state, ssl_support),
m_protocol_handler(this, check_and_get(state).config, context),
m_protocol_handler(this, check_and_get(state), context),
buffer_ssl_init_fill(0),
m_connection_type( connection_type ),
m_throttle_speed_in("speed_in", "throttle_speed_in"),
@ -378,7 +378,6 @@ PRAGMA_WARNING_DISABLE_VS(4355)
if(!recv_res)
{
//_info("[sock " << socket().native_handle() << "] protocol_want_close");
//some error in protocol, protocol handler ask to close connection
boost::interprocess::ipcdetail::atomic_write32(&m_want_close_connection, 1);
bool do_shutdown = false;
@ -601,7 +600,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
double current_speed_up;
{
CRITICAL_REGION_LOCAL(m_throttle_speed_out_mutex);
m_throttle_speed_out.handle_trafic_exact(cb);
m_throttle_speed_out.handle_trafic_exact(chunk.size());
current_speed_up = m_throttle_speed_out.get_current_speed();
}
context.m_current_speed_up = current_speed_up;
@ -665,7 +664,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
auto size_now = m_send_que.front().size();
MDEBUG("do_send_chunk() NOW SENSD: packet="<<size_now<<" B");
if (speed_limit_is_enabled())
do_send_handler_write( chunk.data(), chunk.size() ); // (((H)))
do_send_handler_write( m_send_que.back().data(), m_send_que.back().size() ); // (((H)))
CHECK_AND_ASSERT_MES( size_now == m_send_que.front().size(), false, "Unexpected queue size");
reset_timer(get_default_timeout(), false);
@ -893,7 +892,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
template<class t_protocol_handler>
boosted_tcp_server<t_protocol_handler>::boosted_tcp_server( t_connection_type connection_type ) :
m_state(boost::make_shared<typename connection<t_protocol_handler>::shared_state>()),
m_state(std::make_shared<typename connection<t_protocol_handler>::shared_state>()),
m_io_service_local_instance(new worker()),
io_service_(m_io_service_local_instance->io_service),
acceptor_(io_service_),
@ -912,7 +911,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
template<class t_protocol_handler>
boosted_tcp_server<t_protocol_handler>::boosted_tcp_server(boost::asio::io_service& extarnal_io_service, t_connection_type connection_type) :
m_state(boost::make_shared<typename connection<t_protocol_handler>::shared_state>()),
m_state(std::make_shared<typename connection<t_protocol_handler>::shared_state>()),
io_service_(extarnal_io_service),
acceptor_(io_service_),
acceptor_ipv6(io_service_),

@ -100,7 +100,7 @@ class connection_basic_pimpl; // PIMPL for this class
class connection_basic { // not-templated base class for rapid developmet of some code parts
// beware of removing const, net_utils::connection is sketchily doing a cast to prevent storing ptr twice
const boost::shared_ptr<connection_basic_shared_state> m_state;
const std::shared_ptr<connection_basic_shared_state> m_state;
public:
std::unique_ptr< connection_basic_pimpl > mI; // my Implementation
@ -119,8 +119,8 @@ class connection_basic { // not-templated base class for rapid developmet of som
public:
// first counter is the ++/-- count of current sockets, the other socket_number is only-increasing ++ number generator
connection_basic(boost::asio::ip::tcp::socket&& socket, boost::shared_ptr<connection_basic_shared_state> state, ssl_support_t ssl_support);
connection_basic(boost::asio::io_service &io_service, boost::shared_ptr<connection_basic_shared_state> state, ssl_support_t ssl_support);
connection_basic(boost::asio::ip::tcp::socket&& socket, std::shared_ptr<connection_basic_shared_state> state, ssl_support_t ssl_support);
connection_basic(boost::asio::io_service &io_service, std::shared_ptr<connection_basic_shared_state> state, ssl_support_t ssl_support);
virtual ~connection_basic() noexcept(false);

@ -49,7 +49,7 @@ namespace net_utils
{
invalid = 0,
public_ = 1, // public is keyword
i2p = 2,
i2p = 2, // order from here changes priority of selection for origin TXes
tor = 3
};

@ -29,7 +29,11 @@
#ifndef _LEVIN_BASE_H_
#define _LEVIN_BASE_H_
#include <cstdint>
#include "byte_slice.h"
#include "net_utils_base.h"
#include "span.h"
#define LEVIN_SIGNATURE 0x0101010101012101LL //Bender's nightmare
@ -72,6 +76,8 @@ namespace levin
#define LEVIN_PACKET_REQUEST 0x00000001
#define LEVIN_PACKET_RESPONSE 0x00000002
#define LEVIN_PACKET_BEGIN 0x00000004
#define LEVIN_PACKET_END 0x00000008
#define LEVIN_PROTOCOL_VER_0 0
@ -118,9 +124,30 @@ namespace levin
}
}
//! \return Intialized levin header.
bucket_head2 make_header(uint32_t command, uint64_t msg_size, uint32_t flags, bool expect_response) noexcept;
//! \return A levin notification message.
byte_slice make_notify(int command, epee::span<const std::uint8_t> payload);
/*! Generate a dummy levin message.
\param noise_bytes Total size of the returned `byte_slice`.
\return `nullptr` if `noise_size` is smaller than the levin header.
Otherwise, a dummy levin message. */
byte_slice make_noise_notify(std::size_t noise_bytes);
/*! Generate 1+ levin messages that are identical to the noise message size.
\param noise Each levin message will be identical to the size of this
message. The bytes from this message will be used for padding.
\return `nullptr` if `noise.size()` is less than the levin header size.
Otherwise, a levin notification message OR 2+ levin fragment messages.
Each message is `noise.size()` in length. */
byte_slice make_fragmented_notify(const byte_slice& noise, int command, epee::span<const std::uint8_t> payload);
}
}
#endif //_LEVIN_BASE_H_

@ -32,6 +32,7 @@
#include <boost/smart_ptr/make_shared.hpp>
#include <atomic>
#include <deque>
#include "levin_base.h"
#include "buffer.h"
@ -91,6 +92,7 @@ public:
int invoke_async(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED);
int notify(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id);
int send(epee::byte_slice message, const boost::uuids::uuid& connection_id);
bool close(boost::uuids::uuid connection_id);
bool update_connection_context(const t_connection_context& contxt);
bool request_callback(boost::uuids::uuid connection_id);
@ -117,6 +119,22 @@ public:
template<class t_connection_context = net_utils::connection_context_base>
class async_protocol_handler
{
std::string m_fragment_buffer;
bool send_message(uint32_t command, epee::span<const uint8_t> in_buff, uint32_t flags, bool expect_response)
{
const bucket_head2 head = make_header(command, in_buff.size(), flags, expect_response);
if(!m_pservice_endpoint->do_send(byte_slice{as_byte_span(head), in_buff}))
return false;
MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb
<< ", flags" << head.m_flags
<< ", r?=" << head.m_have_to_return_data
<<", cmd = " << head.m_command
<< ", ver=" << head.m_protocol_version);
return true;
}
public:
typedef t_connection_context connection_context;
typedef async_protocol_handler_config<t_connection_context> config_type;
@ -259,34 +277,6 @@ public:
return handler->is_timer_started();
}
template<class callback_t> friend struct anvoke_handler;
static bucket_head2 make_header(uint32_t command, uint64_t msg_size, uint32_t flags, bool expect_response) noexcept
{
bucket_head2 head = {0};
head.m_signature = SWAP64LE(LEVIN_SIGNATURE);
head.m_have_to_return_data = expect_response;
head.m_cb = SWAP64LE(msg_size);
head.m_command = SWAP32LE(command);
head.m_protocol_version = SWAP32LE(LEVIN_PROTOCOL_VER_1);
head.m_flags = SWAP32LE(flags);
return head;
}
bool send_message(uint32_t command, epee::span<const uint8_t> in_buff, uint32_t flags, bool expect_response)
{
const bucket_head2 head = make_header(command, in_buff.size(), flags, expect_response);
if(!m_pservice_endpoint->do_send(byte_slice{as_byte_span(head), in_buff}))
return false;
MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb
<< ", flags" << head.m_flags
<< ", r?=" << head.m_have_to_return_data
<<", cmd = " << head.m_command
<< ", ver=" << head.m_protocol_version);
return true;
}
public:
async_protocol_handler(net_utils::i_service_endpoint* psnd_hndlr,
config_type& config,
@ -403,7 +393,12 @@ public:
return false;
}
if(m_cache_in_buffer.size() + cb > m_config.m_max_packet_size)
// these should never fail, but do runtime check for safety
CHECK_AND_ASSERT_MES(m_config.m_max_packet_size >= m_cache_in_buffer.size(), false, "Bad m_cache_in_buffer.size()");
CHECK_AND_ASSERT_MES(m_config.m_max_packet_size - m_cache_in_buffer.size() >= m_fragment_buffer.size(), false, "Bad m_cache_in_buffer.size() + m_fragment_buffer.size()");
// flipped to subtraction; prevent overflow since m_max_packet_size is variable and public
if(cb > m_config.m_max_packet_size - m_cache_in_buffer.size() - m_fragment_buffer.size())
{
MWARNING(m_connection_context << "Maximum packet size exceed!, m_max_packet_size = " << m_config.m_max_packet_size
<< ", packet received " << m_cache_in_buffer.size() + cb
@ -435,8 +430,38 @@ public:
}
break;
}
{
std::string temp{};
epee::span<const uint8_t> buff_to_invoke = m_cache_in_buffer.carve((std::string::size_type)m_current_head.m_cb);
m_state = stream_state_head;
// abstract_tcp_server2.h manages max bandwidth for a p2p link
if (!(m_current_head.m_flags & (LEVIN_PACKET_REQUEST | LEVIN_PACKET_RESPONSE)))
{
// special noise/fragment command
static constexpr const uint32_t both_flags = (LEVIN_PACKET_BEGIN | LEVIN_PACKET_END);
if ((m_current_head.m_flags & both_flags) == both_flags)
break; // noise message, skip to next message
if (m_current_head.m_flags & LEVIN_PACKET_BEGIN)
m_fragment_buffer.clear();
m_fragment_buffer.append(reinterpret_cast<const char*>(buff_to_invoke.data()), buff_to_invoke.size());
if (!(m_current_head.m_flags & LEVIN_PACKET_END))
break; // skip to next message
if (m_fragment_buffer.size() < sizeof(bucket_head2))
{
MERROR(m_connection_context << "Fragmented data too small for levin header");
return false;
}
temp = std::move(m_fragment_buffer);
m_fragment_buffer.clear();
std::memcpy(std::addressof(m_current_head), std::addressof(temp[0]), sizeof(bucket_head2));
buff_to_invoke = {reinterpret_cast<const uint8_t*>(temp.data()) + sizeof(bucket_head2), temp.size() - sizeof(bucket_head2)};
}
bool is_response = (m_oponent_protocol_ver == LEVIN_PROTOCOL_VER_1 && m_current_head.m_flags&LEVIN_PACKET_RESPONSE);
@ -489,9 +514,9 @@ public:
m_current_head.m_command, buff_to_invoke, return_buff, m_connection_context
);
bucket_head2 head = make_header(m_current_head.m_command, return_buff.size(), LEVIN_PACKET_RESPONSE, false);
head.m_return_code = SWAP32LE(return_code);
return_buff.insert(0, reinterpret_cast<const char*>(&head), sizeof(head));
bucket_head2 head = make_header(m_current_head.m_command, return_buff.size(), LEVIN_PACKET_RESPONSE, false);
head.m_return_code = SWAP32LE(return_code);
return_buff.insert(0, reinterpret_cast<const char*>(&head), sizeof(head));
if(!m_pservice_endpoint->do_send(byte_slice{std::move(return_buff)}))
return false;
@ -505,8 +530,13 @@ public:
else
m_config.m_pcommands_handler->notify(m_current_head.m_command, buff_to_invoke, m_connection_context);
}
// reuse small buffer
if (!temp.empty() && temp.capacity() <= 64 * 1024)
{
temp.clear();
m_fragment_buffer = std::move(temp);
}
}
m_state = stream_state_head;
break;
case stream_state_head:
{
@ -616,7 +646,7 @@ public:
if (LEVIN_OK != err_code)
{
epee::span<const uint8_t> stub_buff{(const uint8_t*)"", 0};
epee::span<const uint8_t> stub_buff = nullptr;
// Never call callback inside critical section, that can cause deadlock
cb(err_code, stub_buff, m_connection_context);
return false;
@ -698,6 +728,32 @@ public:
return 1;
}
/*! Sends `message` without adding a levin header. The message must have
been created with `make_notify`, `make_noise_notify` or
`make_fragmented_notify`. See additional instructions for
`make_fragmented_notify`.
\return 1 on success */
int send(byte_slice message)
{
const misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler(
boost::bind(&async_protocol_handler::finish_outer_call, this)
);
if(m_deletion_initiated)
return LEVIN_ERROR_CONNECTION_DESTROYED;
const std::size_t length = message.size();
if (!m_pservice_endpoint->do_send(std::move(message)))
{
LOG_ERROR_CC(m_connection_context, "Failed to send message, dropping it");
return -1;
}
MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << (length - sizeof(bucket_head2)) << ", r?=0]");
return 1;
}
//------------------------------------------------------------------------------------------
boost::uuids::uuid get_connection_id() {return m_connection_context.m_connection_id;}
//------------------------------------------------------------------------------------------
@ -876,6 +932,14 @@ int async_protocol_handler_config<t_connection_context>::notify(int command, con
}
//------------------------------------------------------------------------------------------
template<class t_connection_context>
int async_protocol_handler_config<t_connection_context>::send(byte_slice message, const boost::uuids::uuid& connection_id)
{
async_protocol_handler<t_connection_context>* aph;
int r = find_and_lock_connection(connection_id, aph);
return LEVIN_OK == r ? aph->send(std::move(message)) : 0;
}
//------------------------------------------------------------------------------------------
template<class t_connection_context>
bool async_protocol_handler_config<t_connection_context>::close(boost::uuids::uuid connection_id)
{
CRITICAL_REGION_LOCAL(m_connects_lock);

@ -31,6 +31,7 @@
//#include <Winsock2.h>
//#include <Ws2tcpip.h>
#include <atomic>
#include <string>
#include <boost/version.hpp>
#include <boost/asio/io_service.hpp>
@ -154,7 +155,7 @@ namespace net_utils
}
inline
try_connect_result_t try_connect(const std::string& addr, const std::string& port, std::chrono::milliseconds timeout, epee::net_utils::ssl_support_t ssl_support)
try_connect_result_t try_connect(const std::string& addr, const std::string& port, std::chrono::milliseconds timeout)
{
m_deadline.expires_from_now(timeout);
boost::unique_future<boost::asio::ip::tcp::socket> connection = m_connector(addr, port, m_deadline);
@ -174,11 +175,11 @@ namespace net_utils
m_connected = true;
m_deadline.expires_at(std::chrono::steady_clock::time_point::max());
// SSL Options
if (ssl_support == epee::net_utils::ssl_support_t::e_ssl_support_enabled || ssl_support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect)
if (m_ssl_options.support == epee::net_utils::ssl_support_t::e_ssl_support_enabled || m_ssl_options.support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect)
{
if (!m_ssl_options.handshake(*m_ssl_socket, boost::asio::ssl::stream_base::client, addr))
{
if (ssl_support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect)
if (m_ssl_options.support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect)
{
boost::system::error_code ignored_ec;
m_ssl_socket->next_layer().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignored_ec);
@ -217,7 +218,7 @@ namespace net_utils
// Get a list of endpoints corresponding to the server name.
try_connect_result_t try_connect_result = try_connect(addr, port, timeout, m_ssl_options.support);
try_connect_result_t try_connect_result = try_connect(addr, port, timeout);
if (try_connect_result == CONNECT_FAILURE)
return false;
if (m_ssl_options.support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect)
@ -226,7 +227,7 @@ namespace net_utils
{
MERROR("SSL handshake failed on an autodetect connection, reconnecting without SSL");
m_ssl_options.support = epee::net_utils::ssl_support_t::e_ssl_support_disabled;
if (try_connect(addr, port, timeout, m_ssl_options.support) != CONNECT_SUCCESS)
if (try_connect(addr, port, timeout) != CONNECT_SUCCESS)
return false;
}
}
@ -562,7 +563,7 @@ namespace net_utils
{
m_deadline.cancel();
boost::system::error_code ec;
if(m_ssl_options.support != ssl_support_t::e_ssl_support_disabled)
if(m_ssl_options)
shutdown_ssl();
m_ssl_socket->next_layer().cancel(ec);
if(ec)

@ -27,7 +27,7 @@
# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
add_library(epee STATIC byte_slice.cpp hex.cpp http_auth.cpp mlog.cpp net_helper.cpp net_utils_base.cpp string_tools.cpp wipeable_string.cpp
memwipe.c connection_basic.cpp network_throttle.cpp network_throttle-detail.cpp mlocker.cpp buffer.cpp net_ssl.cpp)
levin_base.cpp memwipe.c connection_basic.cpp network_throttle.cpp network_throttle-detail.cpp mlocker.cpp buffer.cpp net_ssl.cpp)
if (USE_READLINE AND GNU_READLINE_FOUND)
add_library(epee_readline STATIC readline_buffer.cpp)

@ -128,7 +128,7 @@ connection_basic_pimpl::connection_basic_pimpl(const std::string &name) : m_thro
int connection_basic_pimpl::m_default_tos;
// methods:
connection_basic::connection_basic(boost::asio::ip::tcp::socket&& sock, boost::shared_ptr<connection_basic_shared_state> state, ssl_support_t ssl_support)
connection_basic::connection_basic(boost::asio::ip::tcp::socket&& sock, std::shared_ptr<connection_basic_shared_state> state, ssl_support_t ssl_support)
:
m_state(std::move(state)),
mI( new connection_basic_pimpl("peer") ),
@ -152,7 +152,7 @@ connection_basic::connection_basic(boost::asio::ip::tcp::socket&& sock, boost::s
_note("Spawned connection #"<<mI->m_peer_number<<" to " << remote_addr_str << " currently we have sockets count:" << m_state->sock_count);
}
connection_basic::connection_basic(boost::asio::io_service &io_service, boost::shared_ptr<connection_basic_shared_state> state, ssl_support_t ssl_support)
connection_basic::connection_basic(boost::asio::io_service &io_service, std::shared_ptr<connection_basic_shared_state> state, ssl_support_t ssl_support)
:
m_state(std::move(state)),
mI( new connection_basic_pimpl("peer") ),

@ -0,0 +1,128 @@
// Copyright (c) 2019, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "net/levin_base.h"
#include "int-util.h"
namespace epee
{
namespace levin
{
bucket_head2 make_header(uint32_t command, uint64_t msg_size, uint32_t flags, bool expect_response) noexcept
{
bucket_head2 head = {0};
head.m_signature = SWAP64LE(LEVIN_SIGNATURE);
head.m_have_to_return_data = expect_response;
head.m_cb = SWAP64LE(msg_size);
head.m_command = SWAP32LE(command);
head.m_protocol_version = SWAP32LE(LEVIN_PROTOCOL_VER_1);
head.m_flags = SWAP32LE(flags);
return head;
}
byte_slice make_notify(int command, epee::span<const std::uint8_t> payload)
{
const bucket_head2 head = make_header(command, payload.size(), LEVIN_PACKET_REQUEST, false);
return byte_slice{epee::as_byte_span(head), payload};
}
byte_slice make_noise_notify(const std::size_t noise_bytes)
{
static constexpr const std::uint32_t flags =
LEVIN_PACKET_BEGIN | LEVIN_PACKET_END;
if (noise_bytes < sizeof(bucket_head2))
return nullptr;
std::string buffer(noise_bytes, char(0));
const bucket_head2 head = make_header(0, noise_bytes - sizeof(bucket_head2), flags, false);
std::memcpy(std::addressof(buffer[0]), std::addressof(head), sizeof(head));
return byte_slice{std::move(buffer)};
}
byte_slice make_fragmented_notify(const byte_slice& noise_message, int command, epee::span<const std::uint8_t> payload)
{
const size_t noise_size = noise_message.size();
if (noise_size < sizeof(bucket_head2) * 2)
return nullptr;
if (payload.size() <= noise_size - sizeof(bucket_head2))
{
/* The entire message can be sent at once, and the levin binary parser
will ignore extra bytes. So just pad with zeroes and otherwise send
a "normal", not fragmented message. */
const size_t padding = noise_size - sizeof(bucket_head2) - payload.size();
const span<const uint8_t> padding_bytes{noise_message.end() - padding, padding};
const bucket_head2 head = make_header(command, noise_size - sizeof(bucket_head2), LEVIN_PACKET_REQUEST, false);
return byte_slice{as_byte_span(head), payload, padding_bytes};
}
// fragment message
const size_t payload_space = noise_size - sizeof(bucket_head2);
const size_t expected_fragments = ((payload.size() - 2) / payload_space) + 1;
std::string buffer{};
buffer.reserve((expected_fragments + 1) * noise_size); // +1 here overselects for internal bucket_head2 value
bucket_head2 head = make_header(0, noise_size - sizeof(bucket_head2), LEVIN_PACKET_BEGIN, false);
buffer.append(reinterpret_cast<const char*>(&head), sizeof(head));
head.m_command = command;
head.m_flags = LEVIN_PACKET_REQUEST;
head.m_cb = payload.size();
buffer.append(reinterpret_cast<const char*>(&head), sizeof(head));
size_t copy_size = payload.remove_prefix(payload_space - sizeof(bucket_head2));
buffer.append(reinterpret_cast<const char*>(payload.data()) - copy_size, copy_size);
head.m_command = 0;
head.m_flags = 0;
head.m_cb = noise_size - sizeof(bucket_head2);
while (!payload.empty())
{
copy_size = payload.remove_prefix(payload_space);
if (payload.empty())
head.m_flags = LEVIN_PACKET_END;
buffer.append(reinterpret_cast<const char*>(&head), sizeof(head));
buffer.append(reinterpret_cast<const char*>(payload.data()) - copy_size, copy_size);
}
const size_t padding = noise_size - copy_size - sizeof(bucket_head2);
buffer.append(reinterpret_cast<const char*>(noise_message.end()) - padding, padding);
return byte_slice{std::move(buffer)};
}
} // levin
} // epee

@ -31,8 +31,10 @@
#pragma once
#include <unordered_set>
#include <atomic>
#include <boost/date_time/posix_time/posix_time.hpp>
#include "net/net_utils_base.h"
#include "copyable_atomic.h"
#include "crypto/hash.h"
namespace cryptonote
{

@ -100,6 +100,16 @@
#define CRYPTONOTE_MEMPOOL_TX_LIVETIME (86400*3) //seconds, three days
#define CRYPTONOTE_MEMPOOL_TX_FROM_ALT_BLOCK_LIVETIME 604800 //seconds, one week
// see src/cryptonote_protocol/levin_notify.cpp
#define CRYPTONOTE_NOISE_MIN_EPOCH 5 // minutes
#define CRYPTONOTE_NOISE_EPOCH_RANGE 30 // seconds
#define CRYPTONOTE_NOISE_MIN_DELAY 10 // seconds
#define CRYPTONOTE_NOISE_DELAY_RANGE 5 // seconds
#define CRYPTONOTE_NOISE_BYTES 3*1024 // 3 KiB
#define CRYPTONOTE_NOISE_CHANNELS 2 // Max outgoing connections per zone used for noise/covert sending
#define CRYPTONOTE_MAX_FRAGMENTS 20 // ~20 * NOISE_BYTES max payload size for covert/noise send
#define COMMAND_RPC_GET_BLOCKS_FAST_MAX_COUNT 1000
#define P2P_LOCAL_WHITE_PEERLIST_LIMIT 1000

@ -2227,69 +2227,11 @@ skip:
template<class t_core>
bool t_cryptonote_protocol_handler<t_core>::relay_transactions(NOTIFY_NEW_TRANSACTIONS::request& arg, cryptonote_connection_context& exclude_context)
{
const bool hide_tx_broadcast =
1 < m_p2p->get_zone_count() && exclude_context.m_remote_address.get_zone() == epee::net_utils::zone::invalid;
if (hide_tx_broadcast)
MDEBUG("Attempting to conceal origin of tx via anonymity network connection(s)");
for(auto& tx_blob : arg.txs)
m_core.on_transaction_relayed(tx_blob);
// no check for success, so tell core they're relayed unconditionally
const bool pad_transactions = m_core.pad_transactions() || hide_tx_broadcast;
size_t bytes = pad_transactions ? 9 /* header */ + 4 /* 1 + 'txs' */ + tools::get_varint_data(arg.txs.size()).size() : 0;
for(auto tx_blob_it = arg.txs.begin(); tx_blob_it!=arg.txs.end(); ++tx_blob_it)
{
m_core.on_transaction_relayed(*tx_blob_it);
if (pad_transactions)
bytes += tools::get_varint_data(tx_blob_it->size()).size() + tx_blob_it->size();
}
if (pad_transactions)
{
// stuff some dummy bytes in to stay safe from traffic volume analysis
static constexpr size_t granularity = 1024;
size_t padding = granularity - bytes % granularity;
const size_t overhead = 2 /* 1 + '_' */ + tools::get_varint_data(padding).size();
if (overhead > padding)
padding = 0;
else
padding -= overhead;
arg._ = std::string(padding, ' ');
std::string arg_buff;
epee::serialization::store_t_to_binary(arg, arg_buff);
// we probably lowballed the payload size a bit, so added a but too much. Fix this now.
size_t remove = arg_buff.size() % granularity;
if (remove > arg._.size())
arg._.clear();
else
arg._.resize(arg._.size() - remove);
// if the size of _ moved enough, we might lose byte in size encoding, we don't care
}
std::vector<std::pair<epee::net_utils::zone, boost::uuids::uuid>> connections;
m_p2p->for_each_connection([hide_tx_broadcast, &exclude_context, &connections](connection_context& context, nodetool::peerid_type peer_id, uint32_t support_flags)
{
const epee::net_utils::zone current_zone = context.m_remote_address.get_zone();
const bool broadcast_to_peer =
peer_id &&
(hide_tx_broadcast != bool(current_zone == epee::net_utils::zone::public_)) &&
exclude_context.m_connection_id != context.m_connection_id;
if (broadcast_to_peer)
connections.push_back({current_zone, context.m_connection_id});
return true;
});
if (connections.empty())
MERROR("Transaction not relayed - no" << (hide_tx_broadcast ? " privacy": "") << " peers available");
else
{
std::string fullBlob;
epee::serialization::store_t_to_binary(arg, fullBlob);
m_p2p->relay_notify_to_list(NOTIFY_NEW_TRANSACTIONS::ID, epee::strspan<uint8_t>(fullBlob), std::move(connections));
}
m_p2p->send_txs(std::move(arg.txs), exclude_context.m_remote_address.get_zone(), exclude_context.m_connection_id, m_core.pad_transactions());
return true;
}
//------------------------------------------------------------------------------------------------------------------------

@ -0,0 +1,574 @@
// Copyright (c) 2019, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "levin_notify.h"
#include <boost/asio/steady_timer.hpp>
#include <boost/system/system_error.hpp>
#include <chrono>
#include <deque>
#include <stdexcept>
#include "common/expect.h"
#include "common/varint.h"
#include "cryptonote_config.h"
#include "crypto/random.h"
#include "cryptonote_basic/connection_context.h"
#include "cryptonote_protocol/cryptonote_protocol_defs.h"
#include "net/dandelionpp.h"
#include "p2p/net_node.h"
namespace cryptonote
{
namespace levin
{
namespace
{
constexpr std::size_t connection_id_reserve_size = 100;
constexpr const std::chrono::minutes noise_min_epoch{CRYPTONOTE_NOISE_MIN_EPOCH};
constexpr const std::chrono::seconds noise_epoch_range{CRYPTONOTE_NOISE_EPOCH_RANGE};
constexpr const std::chrono::seconds noise_min_delay{CRYPTONOTE_NOISE_MIN_DELAY};
constexpr const std::chrono::seconds noise_delay_range{CRYPTONOTE_NOISE_DELAY_RANGE};
/*! Select a randomized duration from 0 to `range`. The precision will be to
the systems `steady_clock`. As an example, supplying 3 seconds to this
function will select a duration from [0, 3] seconds, and the increments
for the selection will be determined by the `steady_clock` precision
(typically nanoseconds).
\return A randomized duration from 0 to `range`. */
std::chrono::steady_clock::duration random_duration(std::chrono::steady_clock::duration range)
{
using rep = std::chrono::steady_clock::rep;
return std::chrono::steady_clock::duration{crypto::rand_range(rep(0), range.count())};
}
//! \return All outgoing connections supporting fragments in `connections`.
std::vector<boost::uuids::uuid> get_out_connections(connections& p2p)
{
std::vector<boost::uuids::uuid> outs;
outs.reserve(connection_id_reserve_size);
/* The foreach call is serialized with a lock, but should be quick due to
the reserve call so a strand is not used. Investigate if there is lots
of waiting in here. */
p2p.foreach_connection([&outs] (detail::p2p_context& context) {
if (!context.m_is_income)
outs.emplace_back(context.m_connection_id);
return true;
});
return outs;
}
std::string make_tx_payload(std::vector<blobdata>&& txs, const bool pad)
{
NOTIFY_NEW_TRANSACTIONS::request request{};
request.txs = std::move(txs);
if (pad)
{
size_t bytes = 9 /* header */ + 4 /* 1 + 'txs' */ + tools::get_varint_data(request.txs.size()).size();
for(auto tx_blob_it = request.txs.begin(); tx_blob_it!=request.txs.end(); ++tx_blob_it)
bytes += tools::get_varint_data(tx_blob_it->size()).size() + tx_blob_it->size();
// stuff some dummy bytes in to stay safe from traffic volume analysis
static constexpr const size_t granularity = 1024;
size_t padding = granularity - bytes % granularity;
const size_t overhead = 2 /* 1 + '_' */ + tools::get_varint_data(padding).size();
if (overhead > padding)
padding = 0;
else
padding -= overhead;
request._ = std::string(padding, ' ');
std::string arg_buff;
epee::serialization::store_t_to_binary(request, arg_buff);
// we probably lowballed the payload size a bit, so added a but too much. Fix this now.
size_t remove = arg_buff.size() % granularity;
if (remove > request._.size())
request._.clear();
else
request._.resize(request._.size() - remove);
// if the size of _ moved enough, we might lose byte in size encoding, we don't care
}
std::string fullBlob;
if (!epee::serialization::store_t_to_binary(request, fullBlob))
throw std::runtime_error{"Failed to serialize to epee binary format"};
return fullBlob;
}
/* The current design uses `asio::strand`s. The documentation isn't as clear
as it should be - a `strand` has an internal `mutex` and `bool`. The
`mutex` synchronizes thread access and the `bool` is set when a thread is
executing something "in the strand". Therefore, if a callback has lots of
work to do in a `strand`, asio can switch to some other task instead of
blocking 1+ threads to wait for the original thread to complete the task
(as is the case when client code has a `mutex` inside the callback). The
downside is that asio _always_ allocates for the callback, even if it can
be immediately executed. So if all work in a strand is minimal, a lock
may be better.
This code uses a strand per "zone" and a strand per "channel in a zone".
`dispatch` is used heavily, which means "execute immediately in _this_
thread if the strand is not in use, otherwise queue the callback to be
executed immediately after the strand completes its current task".
`post` is used where deferred execution to an `asio::io_service::run`
thread is preferred.
The strand per "zone" is useful because the levin
`foreach_connection` is blocked with a mutex anyway. So this primarily
helps with reducing blocking of a thread attempting a "flood"
notification. Updating/merging the outgoing connections in the
Dandelion++ map is also somewhat expensive.
The strand per "channel" may need a re-visit. The most "expensive" code
is figuring out the noise/notification to send. If levin code is
optimized further, it might be better to just use standard locks per
channel. */
//! A queue of levin messages for a noise i2p/tor link
struct noise_channel
{
explicit noise_channel(boost::asio::io_service& io_service)
: active(nullptr),
queue(),
strand(io_service),
next_noise(io_service),
connection(boost::uuids::nil_uuid())
{}
// `asio::io_service::strand` cannot be copied or moved
noise_channel(const noise_channel&) = delete;
noise_channel& operator=(const noise_channel&) = delete;
// Only read/write these values "inside the strand"
epee::byte_slice active;
std::deque<epee::byte_slice> queue;
boost::asio::io_service::strand strand;
boost::asio::steady_timer next_noise;
boost::uuids::uuid connection;
};
} // anonymous
namespace detail
{
struct zone
{
explicit zone(boost::asio::io_service& io_service, std::shared_ptr<connections> p2p, epee::byte_slice noise_in)
: p2p(std::move(p2p)),
noise(std::move(noise_in)),
next_epoch(io_service),
strand(io_service),
map(),
channels(),
connection_count(0)
{
for (std::size_t count = 0; !noise.empty() && count < CRYPTONOTE_NOISE_CHANNELS; ++count)
channels.emplace_back(io_service);
}
const std::shared_ptr<connections> p2p;
const epee::byte_slice noise; //!< `!empty()` means zone is using noise channels
boost::asio::steady_timer next_epoch;
boost::asio::io_service::strand strand;
net::dandelionpp::connection_map map;//!< Tracks outgoing uuid's for noise channels or Dandelion++ stems
std::deque<noise_channel> channels; //!< Never touch after init; only update elements on `noise_channel.strand`
std::atomic<std::size_t> connection_count; //!< Only update in strand, can be read at any time
};
} // detail
namespace
{
//! Adds a message to the sending queue of the channel.
class queue_covert_notify
{
std::shared_ptr<detail::zone> zone_;
epee::byte_slice message_; // Requires manual copy constructor
const std::size_t destination_;
public:
queue_covert_notify(std::shared_ptr<detail::zone> zone, epee::byte_slice message, std::size_t destination)
: zone_(std::move(zone)), message_(std::move(message)), destination_(destination)
{}
queue_covert_notify(queue_covert_notify&&) = default;
queue_covert_notify(const queue_covert_notify& source)
: zone_(source.zone_), message_(source.message_.clone()), destination_(source.destination_)
{}
//! \pre Called within `zone_->channels[destionation_].strand`.
void operator()()
{
if (!zone_)
return;
noise_channel& channel = zone_->channels.at(destination_);
assert(channel.strand.running_in_this_thread());
if (!channel.connection.is_nil())
channel.queue.push_back(std::move(message_));
}
};
//! Sends a message to every active connection
class flood_notify
{
std::shared_ptr<detail::zone> zone_;
epee::byte_slice message_; // Requires manual copy
boost::uuids::uuid source_;
public:
explicit flood_notify(std::shared_ptr<detail::zone> zone, epee::byte_slice message, const boost::uuids::uuid& source)
: zone_(std::move(zone)), message_(message.clone()), source_(source)
{}
flood_notify(flood_notify&&) = default;
flood_notify(const flood_notify& source)
: zone_(source.zone_), message_(source.message_.clone()), source_(source.source_)
{}
void operator()() const
{
if (!zone_ || !zone_->p2p)
return;
assert(zone_->strand.running_in_this_thread());
/* The foreach should be quick, but then it iterates and acquires the
same lock for every connection. So do in a strand because two threads
will ping-pong each other with cacheline invalidations. Revisit if
algorithm changes or the locking strategy within the levin config
class changes. */
std::vector<boost::uuids::uuid> connections;
connections.reserve(connection_id_reserve_size);
zone_->p2p->foreach_connection([this, &connections] (detail::p2p_context& context) {
if (this->source_ != context.m_connection_id)
connections.emplace_back(context.m_connection_id);
return true;
});
for (const boost::uuids::uuid& connection : connections)
zone_->p2p->send(message_.clone(), connection);
}
};
//! Updates the connection for a channel.
struct update_channel
{
std::shared_ptr<detail::zone> zone_;
const std::size_t channel_;
const boost::uuids::uuid connection_;
//! \pre Called within `stem_.strand`.
void operator()() const
{
if (!zone_)
return;
noise_channel& channel = zone_->channels.at(channel_);
assert(channel.strand.running_in_this_thread());
static_assert(
CRYPTONOTE_MAX_FRAGMENTS <= (noise_min_epoch / (noise_min_delay + noise_delay_range)),
"Max fragments more than the max that can be sent in an epoch"
);
/* This clears the active message so that a message "in-flight" is
restarted. DO NOT try to send the remainder of the fragments, this
additional send time can leak that this node was sending out a real
notify (tx) instead of dummy noise. */
channel.connection = connection_;
channel.active = nullptr;
if (connection_.is_nil())
channel.queue.clear();
}
};
//! Merges `out_connections_` into the existing `zone_->map`.
struct update_channels
{
std::shared_ptr<detail::zone> zone_;
std::vector<boost::uuids::uuid> out_connections_;
//! \pre Called within `zone->strand`.
static void post(std::shared_ptr<detail::zone> zone)
{
if (!zone)
return;
assert(zone->strand.running_in_this_thread());
zone->connection_count = zone->map.size();
for (auto id = zone->map.begin(); id != zone->map.end(); ++id)
{
const std::size_t i = id - zone->map.begin();
zone->channels[i].strand.post(update_channel{zone, i, *id});
}
}
//! \pre Called within `zone_->strand`.
void operator()()
{
if