address PR comments

pull/470/head
j-berman 2 years ago
parent 3be1dbd096
commit a82fba4b7b

@ -89,20 +89,14 @@ namespace net_utils
public i_service_endpoint, public i_service_endpoint,
public connection_basic public connection_basic
{ {
public:
typedef typename t_protocol_handler::connection_context t_connection_context;
private: private:
using string_t = std::string; using connection_t = connection<t_protocol_handler>;
using handler_t = t_protocol_handler;
using context_t = typename handler_t::connection_context;
using connection_t = connection<handler_t>;
using connection_ptr = boost::shared_ptr<connection_t>; using connection_ptr = boost::shared_ptr<connection_t>;
using ssl_support_t = epee::net_utils::ssl_support_t; using ssl_support_t = epee::net_utils::ssl_support_t;
using timer_t = boost::asio::steady_timer; using timer_t = boost::asio::steady_timer;
using duration_t = timer_t::duration; using duration_t = timer_t::duration;
using lock_t = std::mutex;
using condition_t = std::condition_variable_any;
using lock_guard_t = std::lock_guard<lock_t>;
using unique_lock_t = std::unique_lock<lock_t>;
using byte_slice_t = epee::byte_slice;
using ec_t = boost::system::error_code; using ec_t = boost::system::error_code;
using handshake_t = boost::asio::ssl::stream_base::handshake_type; using handshake_t = boost::asio::ssl::stream_base::handshake_type;
@ -110,8 +104,6 @@ namespace net_utils
using strand_t = boost::asio::io_service::strand; using strand_t = boost::asio::io_service::strand;
using socket_t = boost::asio::ip::tcp::socket; using socket_t = boost::asio::ip::tcp::socket;
using write_queue_t = std::deque<byte_slice_t>;
using read_buffer_t = std::array<uint8_t, 0x2000>;
using network_throttle_t = epee::net_utils::network_throttle; using network_throttle_t = epee::net_utils::network_throttle;
using network_throttle_manager_t = epee::net_utils::network_throttle_manager; using network_throttle_manager_t = epee::net_utils::network_throttle_manager;
@ -119,6 +111,8 @@ namespace net_utils
duration_t get_default_timeout(); duration_t get_default_timeout();
duration_t get_timeout_from_bytes_read(size_t bytes) const; duration_t get_timeout_from_bytes_read(size_t bytes) const;
void state_status_check();
void start_timer(duration_t duration, bool add = {}); void start_timer(duration_t duration, bool add = {});
void async_wait_timer(); void async_wait_timer();
void cancel_timer(); void cancel_timer();
@ -137,13 +131,21 @@ namespace net_utils
void terminate(); void terminate();
void on_terminating(); void on_terminating();
bool send(byte_slice_t message); bool send(epee::byte_slice message);
bool start_internal( bool start_internal(
bool is_income, bool is_income,
bool is_multithreaded, bool is_multithreaded,
boost::optional<network_address> real_remote boost::optional<network_address> real_remote
); );
enum status_t {
TERMINATED,
RUNNING,
INTERRUPTED,
TERMINATING,
WASTED,
};
struct state_t { struct state_t {
struct stat_t { struct stat_t {
struct { struct {
@ -156,10 +158,10 @@ namespace net_utils
struct data_t { struct data_t {
struct { struct {
read_buffer_t buffer; std::array<uint8_t, 0x2000> buffer;
} read; } read;
struct { struct {
write_queue_t queue; std::deque<epee::byte_slice> queue;
bool wait_consume; bool wait_consume;
} write; } write;
}; };
@ -171,7 +173,7 @@ namespace net_utils
bool handshaked; bool handshaked;
}; };
struct socket_t { struct socket_status_t {
bool connected; bool connected;
bool wait_handshake; bool wait_handshake;
@ -189,30 +191,22 @@ namespace net_utils
bool cancel_shutdown; bool cancel_shutdown;
}; };
struct timer_t { struct timer_status_t {
bool wait_expire; bool wait_expire;
bool cancel_expire; bool cancel_expire;
bool reset_expire; bool reset_expire;
}; };
struct timers_t { struct timers_status_t {
struct throttle_t { struct throttle_t {
timer_t in; timer_status_t in;
timer_t out; timer_status_t out;
}; };
timer_t general; timer_status_t general;
throttle_t throttle; throttle_t throttle;
}; };
enum status_t {
TERMINATED,
RUNNING,
INTERRUPTED,
TERMINATING,
WASTED,
};
struct protocol_t { struct protocol_t {
size_t reference_counter; size_t reference_counter;
bool released; bool released;
@ -223,19 +217,17 @@ namespace net_utils
size_t wait_callback; size_t wait_callback;
}; };
lock_t lock; std::mutex lock;
condition_t condition; std::condition_variable_any condition;
status_t status; status_t status;
socket_t socket; socket_status_t socket;
ssl_t ssl; ssl_t ssl;
timers_t timers; timers_status_t timers;
protocol_t protocol; protocol_t protocol;
stat_t stat; stat_t stat;
data_t data; data_t data;
}; };
using status_t = typename state_t::status_t;
struct timers_t { struct timers_t {
timers_t(io_context_t &io_context): timers_t(io_context_t &io_context):
general(io_context), general(io_context),
@ -254,19 +246,17 @@ namespace net_utils
throttle_t throttle; throttle_t throttle;
}; };
io_context_t &io_context; io_context_t &m_io_context;
t_connection_type connection_type; t_connection_type m_connection_type;
context_t context{}; t_connection_context m_conn_context{};
strand_t strand; strand_t m_strand;
timers_t timers; timers_t m_timers;
connection_ptr self{}; connection_ptr self{};
bool local{}; bool m_local{};
string_t host{}; std::string m_host{};
state_t state{}; state_t m_state{};
handler_t handler; t_protocol_handler m_handler;
public: public:
typedef typename t_protocol_handler::connection_context t_connection_context;
struct shared_state : connection_basic_shared_state, t_protocol_handler::config_type struct shared_state : connection_basic_shared_state, t_protocol_handler::config_type
{ {
shared_state() shared_state()
@ -298,7 +288,7 @@ namespace net_utils
// `real_remote` is the actual endpoint (if connection is to proxy, etc.) // `real_remote` is the actual endpoint (if connection is to proxy, etc.)
bool start(bool is_income, bool is_multithreaded, network_address real_remote); bool start(bool is_income, bool is_multithreaded, network_address real_remote);
void get_context(t_connection_context& context_){context_ = context;} void get_context(t_connection_context& context_){context_ = m_conn_context;}
void call_back_starter(); void call_back_starter();

File diff suppressed because it is too large Load Diff

@ -553,9 +553,6 @@ bool ssl_options_t::handshake(
using ec_t = boost::system::error_code; using ec_t = boost::system::error_code;
using timer_t = boost::asio::steady_timer; using timer_t = boost::asio::steady_timer;
using strand_t = boost::asio::io_service::strand; using strand_t = boost::asio::io_service::strand;
using lock_t = std::mutex;
using lock_guard_t = std::lock_guard<lock_t>;
using condition_t = std::condition_variable_any;
using socket_t = boost::asio::ip::tcp::socket; using socket_t = boost::asio::ip::tcp::socket;
auto &io_context = GET_IO_SERVICE(socket); auto &io_context = GET_IO_SERVICE(socket);
@ -565,8 +562,8 @@ bool ssl_options_t::handshake(
timer_t deadline(io_context, timeout); timer_t deadline(io_context, timeout);
struct state_t { struct state_t {
lock_t lock; std::mutex lock;
condition_t condition; std::condition_variable_any condition;
ec_t result; ec_t result;
bool wait_timer; bool wait_timer;
bool wait_handshake; bool wait_handshake;
@ -577,10 +574,10 @@ bool ssl_options_t::handshake(
state.wait_timer = true; state.wait_timer = true;
auto on_timer = [&](const ec_t &ec){ auto on_timer = [&](const ec_t &ec){
lock_guard_t guard(state.lock); std::lock_guard<std::mutex> guard(state.lock);
state.wait_timer = false; state.wait_timer = false;
state.condition.notify_all(); state.condition.notify_all();
if (not state.cancel_timer) { if (!state.cancel_timer) {
state.cancel_handshake = true; state.cancel_handshake = true;
ec_t ec; ec_t ec;
socket.next_layer().cancel(ec); socket.next_layer().cancel(ec);
@ -589,11 +586,11 @@ bool ssl_options_t::handshake(
state.wait_handshake = true; state.wait_handshake = true;
auto on_handshake = [&](const ec_t &ec, size_t bytes_transferred){ auto on_handshake = [&](const ec_t &ec, size_t bytes_transferred){
lock_guard_t guard(state.lock); std::lock_guard<std::mutex> guard(state.lock);
state.wait_handshake = false; state.wait_handshake = false;
state.condition.notify_all(); state.condition.notify_all();
state.result = ec; state.result = ec;
if (not state.cancel_handshake) { if (!state.cancel_handshake) {
state.cancel_timer = true; state.cancel_timer = true;
ec_t ec; ec_t ec;
deadline.cancel(ec); deadline.cancel(ec);
@ -614,15 +611,15 @@ bool ssl_options_t::handshake(
while (!io_context.stopped()) while (!io_context.stopped())
{ {
io_context.poll_one(); io_context.poll_one();
lock_guard_t guard(state.lock); std::lock_guard<std::mutex> guard(state.lock);
state.condition.wait_for( state.condition.wait_for(
state.lock, state.lock,
std::chrono::milliseconds(30), std::chrono::milliseconds(30),
[&]{ [&]{
return not state.wait_timer and not state.wait_handshake; return !state.wait_timer && !state.wait_handshake;
} }
); );
if (not state.wait_timer and not state.wait_handshake) if (!state.wait_timer && !state.wait_handshake)
break; break;
} }
if (state.result.value()) { if (state.result.value()) {

@ -617,7 +617,7 @@ TEST(boosted_tcp_server, strand_deadlock)
void after_init_connection() void after_init_connection()
{ {
unique_lock_t guard(lock); unique_lock_t guard(lock);
if (not context.m_is_income) { if (!context.m_is_income) {
guard.unlock(); guard.unlock();
socket->do_send(byte_slice_t{"."}); socket->do_send(byte_slice_t{"."});
} }
@ -628,7 +628,7 @@ TEST(boosted_tcp_server, strand_deadlock)
bool handle_recv(const char *data, size_t bytes_transferred) bool handle_recv(const char *data, size_t bytes_transferred)
{ {
unique_lock_t guard(lock); unique_lock_t guard(lock);
if (not context.m_is_income) { if (!context.m_is_income) {
if (context.m_recv_cnt == 1024) { if (context.m_recv_cnt == 1024) {
guard.unlock(); guard.unlock();
socket->do_send(byte_slice_t{"."}); socket->do_send(byte_slice_t{"."});
@ -652,9 +652,9 @@ TEST(boosted_tcp_server, strand_deadlock)
void release_protocol() void release_protocol()
{ {
unique_lock_t guard(lock); unique_lock_t guard(lock);
if(not context.m_is_income if(!context.m_is_income
and context.m_recv_cnt == 1024 && context.m_recv_cnt == 1024
and context.m_send_cnt == 2 && context.m_send_cnt == 2
) { ) {
guard.unlock(); guard.unlock();
config.notify_success(); config.notify_success();

Loading…
Cancel
Save