Merge pull request #8426

a82fba4 address PR comments (j-berman)
3be1dbd connection: fix implementation (anon)
724ff21 connection: add segfault and deadlocks demo (anon)
pull/470/head
luigi1111 2 years ago
commit 9df069f4ce
No known key found for this signature in database
GPG Key ID: F4ACA0183641E010

@ -1076,6 +1076,7 @@ if(STATIC)
set(Boost_USE_STATIC_RUNTIME ON)
endif()
find_package(Boost 1.58 QUIET REQUIRED COMPONENTS system filesystem thread date_time chrono regex serialization program_options locale)
add_definitions(-DBOOST_ASIO_ENABLE_SEQUENTIAL_STRAND_ALLOCATION)
set(CMAKE_FIND_LIBRARY_SUFFIXES ${OLD_LIB_SUFFIXES})
if(NOT Boost_FOUND)

@ -44,12 +44,16 @@
#include <cassert>
#include <map>
#include <memory>
#include <condition_variable>
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/array.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/thread/thread.hpp>
#include <boost/optional.hpp>
#include "byte_slice.h"
#include "net_utils_base.h"
#include "syncobj.h"
@ -87,7 +91,172 @@ namespace net_utils
{
public:
typedef typename t_protocol_handler::connection_context t_connection_context;
private:
using connection_t = connection<t_protocol_handler>;
using connection_ptr = boost::shared_ptr<connection_t>;
using ssl_support_t = epee::net_utils::ssl_support_t;
using timer_t = boost::asio::steady_timer;
using duration_t = timer_t::duration;
using ec_t = boost::system::error_code;
using handshake_t = boost::asio::ssl::stream_base::handshake_type;
using io_context_t = boost::asio::io_service;
using strand_t = boost::asio::io_service::strand;
using socket_t = boost::asio::ip::tcp::socket;
using network_throttle_t = epee::net_utils::network_throttle;
using network_throttle_manager_t = epee::net_utils::network_throttle_manager;
unsigned int host_count(int delta = 0);
duration_t get_default_timeout();
duration_t get_timeout_from_bytes_read(size_t bytes) const;
void state_status_check();
void start_timer(duration_t duration, bool add = {});
void async_wait_timer();
void cancel_timer();
void start_handshake();
void start_read();
void start_write();
void start_shutdown();
void cancel_socket();
void cancel_handler();
void interrupt();
void on_interrupted();
void terminate();
void on_terminating();
bool send(epee::byte_slice message);
bool start_internal(
bool is_income,
bool is_multithreaded,
boost::optional<network_address> real_remote
);
enum status_t {
TERMINATED,
RUNNING,
INTERRUPTED,
TERMINATING,
WASTED,
};
struct state_t {
struct stat_t {
struct {
network_throttle_t throttle{"speed_in", "throttle_speed_in"};
} in;
struct {
network_throttle_t throttle{"speed_out", "throttle_speed_out"};
} out;
};
struct data_t {
struct {
std::array<uint8_t, 0x2000> buffer;
} read;
struct {
std::deque<epee::byte_slice> queue;
bool wait_consume;
} write;
};
struct ssl_t {
bool enabled;
bool forced;
bool detected;
bool handshaked;
};
struct socket_status_t {
bool connected;
bool wait_handshake;
bool cancel_handshake;
bool wait_read;
bool handle_read;
bool cancel_read;
bool wait_write;
bool handle_write;
bool cancel_write;
bool wait_shutdown;
bool cancel_shutdown;
};
struct timer_status_t {
bool wait_expire;
bool cancel_expire;
bool reset_expire;
};
struct timers_status_t {
struct throttle_t {
timer_status_t in;
timer_status_t out;
};
timer_status_t general;
throttle_t throttle;
};
struct protocol_t {
size_t reference_counter;
bool released;
bool initialized;
bool wait_release;
bool wait_init;
size_t wait_callback;
};
std::mutex lock;
std::condition_variable_any condition;
status_t status;
socket_status_t socket;
ssl_t ssl;
timers_status_t timers;
protocol_t protocol;
stat_t stat;
data_t data;
};
struct timers_t {
timers_t(io_context_t &io_context):
general(io_context),
throttle(io_context)
{}
struct throttle_t {
throttle_t(io_context_t &io_context):
in(io_context),
out(io_context)
{}
timer_t in;
timer_t out;
};
timer_t general;
throttle_t throttle;
};
io_context_t &m_io_context;
t_connection_type m_connection_type;
t_connection_context m_conn_context{};
strand_t m_strand;
timers_t m_timers;
connection_ptr self{};
bool m_local{};
std::string m_host{};
state_t m_state{};
t_protocol_handler m_handler;
public:
struct shared_state : connection_basic_shared_state, t_protocol_handler::config_type
{
shared_state()
@ -119,7 +288,7 @@ namespace net_utils
// `real_remote` is the actual endpoint (if connection is to proxy, etc.)
bool start(bool is_income, bool is_multithreaded, network_address real_remote);
void get_context(t_connection_context& context_){context_ = context;}
void get_context(t_connection_context& context_){context_ = m_conn_context;}
void call_back_starter();
@ -141,58 +310,6 @@ namespace net_utils
virtual bool add_ref();
virtual bool release();
//------------------------------------------------------
bool do_send_chunk(byte_slice chunk); ///< will send (or queue) a part of data. internal use only
boost::shared_ptr<connection<t_protocol_handler> > safe_shared_from_this();
bool shutdown();
/// Handle completion of a receive operation.
void handle_receive(const boost::system::error_code& e,
std::size_t bytes_transferred);
/// Handle completion of a read operation.
void handle_read(const boost::system::error_code& e,
std::size_t bytes_transferred);
/// Handle completion of a write operation.
void handle_write(const boost::system::error_code& e, size_t cb);
/// reset connection timeout timer and callback
void reset_timer(boost::posix_time::milliseconds ms, bool add);
boost::posix_time::milliseconds get_default_timeout();
boost::posix_time::milliseconds get_timeout_from_bytes_read(size_t bytes);
/// host connection count tracking
unsigned int host_count(const std::string &host, int delta = 0);
/// Buffer for incoming data.
boost::array<char, 8192> buffer_;
size_t buffer_ssl_init_fill;
t_connection_context context;
// TODO what do they mean about wait on destructor?? --rfree :
//this should be the last one, because it could be wait on destructor, while other activities possible on other threads
t_protocol_handler m_protocol_handler;
//typename t_protocol_handler::config_type m_dummy_config;
size_t m_reference_count = 0; // reference count managed through add_ref/release support
boost::shared_ptr<connection<t_protocol_handler> > m_self_ref; // the reference to hold
critical_section m_self_refs_lock;
critical_section m_chunking_lock; // held while we add small chunks of the big do_send() to small do_send_chunk()
critical_section m_shutdown_lock; // held while shutting down
t_connection_type m_connection_type;
// for calculate speed (last 60 sec)
network_throttle m_throttle_speed_in;
network_throttle m_throttle_speed_out;
boost::mutex m_throttle_speed_in_mutex;
boost::mutex m_throttle_speed_out_mutex;
boost::asio::deadline_timer m_timer;
bool m_local;
bool m_ready_to_close;
std::string m_host;
public:
void setRpcStation();
};

File diff suppressed because it is too large Load Diff

@ -110,6 +110,11 @@ namespace net_utils
//! Search against internal fingerprints. Always false if `behavior() != user_certificate_check`.
bool has_fingerprint(boost::asio::ssl::verify_context &ctx) const;
//! configure ssl_stream handshake verification
void configure(
boost::asio::ssl::stream<boost::asio::ip::tcp::socket> &socket,
boost::asio::ssl::stream_base::handshake_type type,
const std::string& host = {}) const;
boost::asio::ssl::context create_context() const;
/*! \note If `this->support == autodetect && this->verification != none`,

@ -32,6 +32,8 @@
#include <boost/asio/ssl.hpp>
#include <boost/cerrno.hpp>
#include <boost/filesystem/operations.hpp>
#include <boost/asio/strand.hpp>
#include <condition_variable>
#include <boost/lambda/lambda.hpp>
#include <openssl/ssl.h>
#include <openssl/pem.h>
@ -488,12 +490,10 @@ bool ssl_options_t::has_fingerprint(boost::asio::ssl::verify_context &ctx) const
return false;
}
bool ssl_options_t::handshake(
void ssl_options_t::configure(
boost::asio::ssl::stream<boost::asio::ip::tcp::socket> &socket,
boost::asio::ssl::stream_base::handshake_type type,
boost::asio::const_buffer buffer,
const std::string& host,
std::chrono::milliseconds timeout) const
const std::string& host) const
{
socket.next_layer().set_option(boost::asio::ip::tcp::no_delay(true));
@ -538,30 +538,98 @@ bool ssl_options_t::handshake(
return true;
});
}
}
auto& io_service = GET_IO_SERVICE(socket);
boost::asio::steady_timer deadline(io_service, timeout);
deadline.async_wait([&socket](const boost::system::error_code& error) {
if (error != boost::asio::error::operation_aborted)
bool ssl_options_t::handshake(
boost::asio::ssl::stream<boost::asio::ip::tcp::socket> &socket,
boost::asio::ssl::stream_base::handshake_type type,
boost::asio::const_buffer buffer,
const std::string& host,
std::chrono::milliseconds timeout) const
{
socket.next_layer().close();
configure(socket, type, host);
auto start_handshake = [&]{
using ec_t = boost::system::error_code;
using timer_t = boost::asio::steady_timer;
using strand_t = boost::asio::io_service::strand;
using socket_t = boost::asio::ip::tcp::socket;
auto &io_context = GET_IO_SERVICE(socket);
if (io_context.stopped())
io_context.reset();
strand_t strand(io_context);
timer_t deadline(io_context, timeout);
struct state_t {
std::mutex lock;
std::condition_variable_any condition;
ec_t result;
bool wait_timer;
bool wait_handshake;
bool cancel_timer;
bool cancel_handshake;
};
state_t state{};
state.wait_timer = true;
auto on_timer = [&](const ec_t &ec){
std::lock_guard<std::mutex> guard(state.lock);
state.wait_timer = false;
state.condition.notify_all();
if (!state.cancel_timer) {
state.cancel_handshake = true;
ec_t ec;
socket.next_layer().cancel(ec);
}
});
};
boost::system::error_code ec = boost::asio::error::would_block;
socket.async_handshake(type, boost::asio::buffer(buffer), boost::lambda::var(ec) = boost::lambda::_1);
if (io_service.stopped())
{
io_service.reset();
state.wait_handshake = true;
auto on_handshake = [&](const ec_t &ec, size_t bytes_transferred){
std::lock_guard<std::mutex> guard(state.lock);
state.wait_handshake = false;
state.condition.notify_all();
state.result = ec;
if (!state.cancel_handshake) {
state.cancel_timer = true;
ec_t ec;
deadline.cancel(ec);
}
while (ec == boost::asio::error::would_block && !io_service.stopped())
{
// should poll_one(), can't run_one() because it can block if there is
// another worker thread executing io_service's tasks
// TODO: once we get Boost 1.66+, replace with run_one_for/run_until
std::this_thread::sleep_for(std::chrono::milliseconds(30));
io_service.poll_one();
};
deadline.async_wait(on_timer);
strand.post(
[&]{
socket.async_handshake(
type,
boost::asio::buffer(buffer),
strand.wrap(on_handshake)
);
}
);
while (!io_context.stopped())
{
io_context.poll_one();
std::lock_guard<std::mutex> guard(state.lock);
state.condition.wait_for(
state.lock,
std::chrono::milliseconds(30),
[&]{
return !state.wait_timer && !state.wait_handshake;
}
);
if (!state.wait_timer && !state.wait_handshake)
break;
}
if (state.result.value()) {
ec_t ec;
socket.next_layer().shutdown(socket_t::shutdown_both, ec);
socket.next_layer().close(ec);
}
return state.result;
};
const auto ec = start_handshake();
if (ec)
{

@ -31,6 +31,8 @@
#include <boost/chrono/chrono.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/thread/mutex.hpp>
#include <condition_variable>
#include <mutex>
#include "gtest/gtest.h"
@ -276,6 +278,11 @@ TEST(test_epee_connection, test_lifetime)
ASSERT_TRUE(shared_state->get_connections_count() == 0);
constexpr auto DELAY = 30;
constexpr auto TIMEOUT = 1;
while (server.get_connections_count()) {
server.get_config_shared()->del_in_connections(
server.get_config_shared()->get_in_connections_count()
);
}
server.get_config_shared()->set_handler(new command_handler_t(DELAY), &command_handler_t::destroy);
for (auto i = 0; i < N; ++i) {
tag = create_connection();
@ -332,7 +339,7 @@ TEST(test_epee_connection, test_lifetime)
),
&command_handler_t::destroy
);
for (auto i = 0; i < N; ++i) {
for (auto i = 0; i < N * N * N; ++i) {
{
connection_ptr conn(new connection_t(io_context, shared_state, {}, {}));
conn->socket().connect(endpoint);
@ -342,6 +349,7 @@ TEST(test_epee_connection, test_lifetime)
}
ASSERT_TRUE(shared_state->get_connections_count() == 1);
shared_state->del_out_connections(1);
while (shared_state->sock_count);
ASSERT_TRUE(shared_state->get_connections_count() == 0);
}
@ -452,7 +460,11 @@ TEST(test_epee_connection, test_lifetime)
}
for (;workers.size(); workers.pop_back())
workers.back().join();
while (server.get_connections_count()) {
server.get_config_shared()->del_in_connections(
server.get_config_shared()->get_in_connections_count()
);
}
});
for (auto& w: workers) {
@ -462,3 +474,241 @@ TEST(test_epee_connection, test_lifetime)
server.timed_wait_server_stop(5 * 1000);
server.deinit_server();
}
TEST(test_epee_connection, ssl_shutdown)
{
struct context_t: epee::net_utils::connection_context_base {
static constexpr size_t get_max_bytes(int) noexcept { return -1; }
static constexpr int handshake_command() noexcept { return 1001; }
static constexpr bool handshake_complete() noexcept { return true; }
};
struct command_handler_t: epee::levin::levin_commands_handler<context_t> {
virtual int invoke(int, const epee::span<const uint8_t>, epee::byte_stream&, context_t&) override { return {}; }
virtual int notify(int, const epee::span<const uint8_t>, context_t&) override { return {}; }
virtual void callback(context_t&) override {}
virtual void on_connection_new(context_t&) override {}
virtual void on_connection_close(context_t&) override { }
virtual ~command_handler_t() override {}
static void destroy(epee::levin::levin_commands_handler<context_t>* ptr) { delete ptr; }
};
using handler_t = epee::levin::async_protocol_handler<context_t>;
using io_context_t = boost::asio::io_service;
using endpoint_t = boost::asio::ip::tcp::endpoint;
using server_t = epee::net_utils::boosted_tcp_server<handler_t>;
using socket_t = boost::asio::ip::tcp::socket;
using ssl_socket_t = boost::asio::ssl::stream<socket_t>;
using ssl_context_t = boost::asio::ssl::context;
using ec_t = boost::system::error_code;
endpoint_t endpoint(boost::asio::ip::address::from_string("127.0.0.1"), 5263);
server_t server(epee::net_utils::e_connection_type_P2P);
server.init_server(endpoint.port(),
endpoint.address().to_string(),
0,
"",
false,
true,
epee::net_utils::ssl_support_t::e_ssl_support_enabled
);
server.get_config_shared()->set_handler(new command_handler_t, &command_handler_t::destroy);
server.run_server(2, false);
ssl_context_t ssl_context{boost::asio::ssl::context::sslv23};
io_context_t io_context;
ssl_socket_t socket(io_context, ssl_context);
ec_t ec;
socket.next_layer().connect(endpoint, ec);
EXPECT_EQ(ec.value(), 0);
socket.handshake(boost::asio::ssl::stream_base::client, ec);
EXPECT_EQ(ec.value(), 0);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
while (server.get_config_shared()->get_connections_count() < 1);
server.get_config_shared()->del_in_connections(1);
while (server.get_config_shared()->get_connections_count() > 0);
server.send_stop_signal();
EXPECT_TRUE(server.timed_wait_server_stop(5 * 1000));
server.deinit_server();
socket.next_layer().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
socket.next_layer().close(ec);
socket.shutdown(ec);
}
TEST(test_epee_connection, ssl_handshake)
{
using io_context_t = boost::asio::io_service;
using work_t = boost::asio::io_service::work;
using work_ptr = std::shared_ptr<work_t>;
using workers_t = std::vector<std::thread>;
using socket_t = boost::asio::ip::tcp::socket;
using ssl_socket_t = boost::asio::ssl::stream<socket_t>;
using ssl_socket_ptr = std::unique_ptr<ssl_socket_t>;
using ssl_options_t = epee::net_utils::ssl_options_t;
io_context_t io_context;
work_ptr work(std::make_shared<work_t>(io_context));
workers_t workers;
auto constexpr N = 2;
while (workers.size() < N) {
workers.emplace_back([&io_context]{
io_context.run();
});
}
ssl_options_t ssl_options{{}};
auto ssl_context = ssl_options.create_context();
for (size_t i = 0; i < N * N * N; ++i) {
ssl_socket_ptr ssl_socket(new ssl_socket_t(io_context, ssl_context));
ssl_socket->next_layer().open(boost::asio::ip::tcp::v4());
for (size_t i = 0; i < N; ++i) {
io_context.post([]{
std::this_thread::sleep_for(std::chrono::milliseconds(50));
});
}
EXPECT_EQ(
ssl_options.handshake(
*ssl_socket,
ssl_socket_t::server,
{},
{},
std::chrono::milliseconds(0)
),
false
);
ssl_socket->next_layer().close();
ssl_socket.reset();
}
work.reset();
for (;workers.size(); workers.pop_back())
workers.back().join();
}
TEST(boosted_tcp_server, strand_deadlock)
{
using context_t = epee::net_utils::connection_context_base;
using lock_t = std::mutex;
using unique_lock_t = std::unique_lock<lock_t>;
struct config_t {
using condition_t = std::condition_variable_any;
using lock_guard_t = std::lock_guard<lock_t>;
void notify_success()
{
lock_guard_t guard(lock);
success = true;
condition.notify_all();
}
lock_t lock;
condition_t condition;
bool success;
};
struct handler_t {
using config_type = config_t;
using connection_context = context_t;
using byte_slice_t = epee::byte_slice;
using socket_t = epee::net_utils::i_service_endpoint;
handler_t(socket_t *socket, config_t &config, context_t &context):
socket(socket),
config(config),
context(context)
{}
void after_init_connection()
{
unique_lock_t guard(lock);
if (!context.m_is_income) {
guard.unlock();
socket->do_send(byte_slice_t{"."});
}
}
void handle_qued_callback()
{
}
bool handle_recv(const char *data, size_t bytes_transferred)
{
unique_lock_t guard(lock);
if (!context.m_is_income) {
if (context.m_recv_cnt == 1024) {
guard.unlock();
socket->do_send(byte_slice_t{"."});
}
}
else {
if (context.m_recv_cnt == 1) {
for(size_t i = 0; i < 1024; ++i) {
guard.unlock();
socket->do_send(byte_slice_t{"."});
guard.lock();
}
}
else if(context.m_recv_cnt == 2) {
guard.unlock();
socket->close();
}
}
return true;
}
void release_protocol()
{
unique_lock_t guard(lock);
if(!context.m_is_income
&& context.m_recv_cnt == 1024
&& context.m_send_cnt == 2
) {
guard.unlock();
config.notify_success();
}
}
lock_t lock;
socket_t *socket;
config_t &config;
context_t &context;
};
using server_t = epee::net_utils::boosted_tcp_server<handler_t>;
using endpoint_t = boost::asio::ip::tcp::endpoint;
endpoint_t endpoint(boost::asio::ip::address::from_string("127.0.0.1"), 5262);
server_t server(epee::net_utils::e_connection_type_P2P);
server.init_server(
endpoint.port(),
endpoint.address().to_string(),
{},
{},
{},
true,
epee::net_utils::ssl_support_t::e_ssl_support_disabled
);
server.run_server(2, {});
server.async_call(
[&]{
context_t context;
ASSERT_TRUE(
server.connect(
endpoint.address().to_string(),
std::to_string(endpoint.port()),
5,
context,
"0.0.0.0",
epee::net_utils::ssl_support_t::e_ssl_support_disabled
)
);
}
);
{
unique_lock_t guard(server.get_config_object().lock);
EXPECT_TRUE(
server.get_config_object().condition.wait_for(
guard,
std::chrono::seconds(5),
[&] { return server.get_config_object().success; }
)
);
}
server.send_stop_signal();
server.timed_wait_server_stop(5 * 1000);
server.deinit_server();
}

Loading…
Cancel
Save