address PR comments

pull/470/head
j-berman 2 years ago
parent 3be1dbd096
commit a82fba4b7b

@ -89,20 +89,14 @@ namespace net_utils
public i_service_endpoint,
public connection_basic
{
public:
typedef typename t_protocol_handler::connection_context t_connection_context;
private:
using string_t = std::string;
using handler_t = t_protocol_handler;
using context_t = typename handler_t::connection_context;
using connection_t = connection<handler_t>;
using connection_t = connection<t_protocol_handler>;
using connection_ptr = boost::shared_ptr<connection_t>;
using ssl_support_t = epee::net_utils::ssl_support_t;
using timer_t = boost::asio::steady_timer;
using duration_t = timer_t::duration;
using lock_t = std::mutex;
using condition_t = std::condition_variable_any;
using lock_guard_t = std::lock_guard<lock_t>;
using unique_lock_t = std::unique_lock<lock_t>;
using byte_slice_t = epee::byte_slice;
using ec_t = boost::system::error_code;
using handshake_t = boost::asio::ssl::stream_base::handshake_type;
@ -110,8 +104,6 @@ namespace net_utils
using strand_t = boost::asio::io_service::strand;
using socket_t = boost::asio::ip::tcp::socket;
using write_queue_t = std::deque<byte_slice_t>;
using read_buffer_t = std::array<uint8_t, 0x2000>;
using network_throttle_t = epee::net_utils::network_throttle;
using network_throttle_manager_t = epee::net_utils::network_throttle_manager;
@ -119,6 +111,8 @@ namespace net_utils
duration_t get_default_timeout();
duration_t get_timeout_from_bytes_read(size_t bytes) const;
void state_status_check();
void start_timer(duration_t duration, bool add = {});
void async_wait_timer();
void cancel_timer();
@ -137,13 +131,21 @@ namespace net_utils
void terminate();
void on_terminating();
bool send(byte_slice_t message);
bool send(epee::byte_slice message);
bool start_internal(
bool is_income,
bool is_multithreaded,
boost::optional<network_address> real_remote
);
enum status_t {
TERMINATED,
RUNNING,
INTERRUPTED,
TERMINATING,
WASTED,
};
struct state_t {
struct stat_t {
struct {
@ -156,10 +158,10 @@ namespace net_utils
struct data_t {
struct {
read_buffer_t buffer;
std::array<uint8_t, 0x2000> buffer;
} read;
struct {
write_queue_t queue;
std::deque<epee::byte_slice> queue;
bool wait_consume;
} write;
};
@ -171,7 +173,7 @@ namespace net_utils
bool handshaked;
};
struct socket_t {
struct socket_status_t {
bool connected;
bool wait_handshake;
@ -189,30 +191,22 @@ namespace net_utils
bool cancel_shutdown;
};
struct timer_t {
struct timer_status_t {
bool wait_expire;
bool cancel_expire;
bool reset_expire;
};
struct timers_t {
struct timers_status_t {
struct throttle_t {
timer_t in;
timer_t out;
timer_status_t in;
timer_status_t out;
};
timer_t general;
timer_status_t general;
throttle_t throttle;
};
enum status_t {
TERMINATED,
RUNNING,
INTERRUPTED,
TERMINATING,
WASTED,
};
struct protocol_t {
size_t reference_counter;
bool released;
@ -223,19 +217,17 @@ namespace net_utils
size_t wait_callback;
};
lock_t lock;
condition_t condition;
std::mutex lock;
std::condition_variable_any condition;
status_t status;
socket_t socket;
socket_status_t socket;
ssl_t ssl;
timers_t timers;
timers_status_t timers;
protocol_t protocol;
stat_t stat;
data_t data;
};
using status_t = typename state_t::status_t;
struct timers_t {
timers_t(io_context_t &io_context):
general(io_context),
@ -254,19 +246,17 @@ namespace net_utils
throttle_t throttle;
};
io_context_t &io_context;
t_connection_type connection_type;
context_t context{};
strand_t strand;
timers_t timers;
io_context_t &m_io_context;
t_connection_type m_connection_type;
t_connection_context m_conn_context{};
strand_t m_strand;
timers_t m_timers;
connection_ptr self{};
bool local{};
string_t host{};
state_t state{};
handler_t handler;
bool m_local{};
std::string m_host{};
state_t m_state{};
t_protocol_handler m_handler;
public:
typedef typename t_protocol_handler::connection_context t_connection_context;
struct shared_state : connection_basic_shared_state, t_protocol_handler::config_type
{
shared_state()
@ -298,7 +288,7 @@ namespace net_utils
// `real_remote` is the actual endpoint (if connection is to proxy, etc.)
bool start(bool is_income, bool is_multithreaded, network_address real_remote);
void get_context(t_connection_context& context_){context_ = context;}
void get_context(t_connection_context& context_){context_ = m_conn_context;}
void call_back_starter();

File diff suppressed because it is too large Load Diff

@ -553,9 +553,6 @@ bool ssl_options_t::handshake(
using ec_t = boost::system::error_code;
using timer_t = boost::asio::steady_timer;
using strand_t = boost::asio::io_service::strand;
using lock_t = std::mutex;
using lock_guard_t = std::lock_guard<lock_t>;
using condition_t = std::condition_variable_any;
using socket_t = boost::asio::ip::tcp::socket;
auto &io_context = GET_IO_SERVICE(socket);
@ -565,8 +562,8 @@ bool ssl_options_t::handshake(
timer_t deadline(io_context, timeout);
struct state_t {
lock_t lock;
condition_t condition;
std::mutex lock;
std::condition_variable_any condition;
ec_t result;
bool wait_timer;
bool wait_handshake;
@ -577,10 +574,10 @@ bool ssl_options_t::handshake(
state.wait_timer = true;
auto on_timer = [&](const ec_t &ec){
lock_guard_t guard(state.lock);
std::lock_guard<std::mutex> guard(state.lock);
state.wait_timer = false;
state.condition.notify_all();
if (not state.cancel_timer) {
if (!state.cancel_timer) {
state.cancel_handshake = true;
ec_t ec;
socket.next_layer().cancel(ec);
@ -589,11 +586,11 @@ bool ssl_options_t::handshake(
state.wait_handshake = true;
auto on_handshake = [&](const ec_t &ec, size_t bytes_transferred){
lock_guard_t guard(state.lock);
std::lock_guard<std::mutex> guard(state.lock);
state.wait_handshake = false;
state.condition.notify_all();
state.result = ec;
if (not state.cancel_handshake) {
if (!state.cancel_handshake) {
state.cancel_timer = true;
ec_t ec;
deadline.cancel(ec);
@ -614,15 +611,15 @@ bool ssl_options_t::handshake(
while (!io_context.stopped())
{
io_context.poll_one();
lock_guard_t guard(state.lock);
std::lock_guard<std::mutex> guard(state.lock);
state.condition.wait_for(
state.lock,
std::chrono::milliseconds(30),
[&]{
return not state.wait_timer and not state.wait_handshake;
return !state.wait_timer && !state.wait_handshake;
}
);
if (not state.wait_timer and not state.wait_handshake)
if (!state.wait_timer && !state.wait_handshake)
break;
}
if (state.result.value()) {

@ -617,7 +617,7 @@ TEST(boosted_tcp_server, strand_deadlock)
void after_init_connection()
{
unique_lock_t guard(lock);
if (not context.m_is_income) {
if (!context.m_is_income) {
guard.unlock();
socket->do_send(byte_slice_t{"."});
}
@ -628,7 +628,7 @@ TEST(boosted_tcp_server, strand_deadlock)
bool handle_recv(const char *data, size_t bytes_transferred)
{
unique_lock_t guard(lock);
if (not context.m_is_income) {
if (!context.m_is_income) {
if (context.m_recv_cnt == 1024) {
guard.unlock();
socket->do_send(byte_slice_t{"."});
@ -652,9 +652,9 @@ TEST(boosted_tcp_server, strand_deadlock)
void release_protocol()
{
unique_lock_t guard(lock);
if(not context.m_is_income
and context.m_recv_cnt == 1024
and context.m_send_cnt == 2
if(!context.m_is_income
&& context.m_recv_cnt == 1024
&& context.m_send_cnt == 2
) {
guard.unlock();
config.notify_success();

Loading…
Cancel
Save