Dropping cppzmq dependency; adding some zmq utils

pull/326/head
Lee Clagett 5 years ago
parent 1b93cb74bb
commit f91a06c6d7

@ -978,14 +978,14 @@ if(CMAKE_C_COMPILER_ID STREQUAL "Clang" AND ARCH_WIDTH EQUAL "32" AND NOT IOS AN
endif()
endif()
find_path(ZMQ_INCLUDE_PATH zmq.hpp)
find_path(ZMQ_INCLUDE_PATH zmq.h)
find_library(ZMQ_LIB zmq)
find_library(PGM_LIBRARY pgm)
find_library(NORM_LIBRARY norm)
find_library(SODIUM_LIBRARY sodium)
if(NOT ZMQ_INCLUDE_PATH)
message(FATAL_ERROR "Could not find required header zmq.hpp")
message(FATAL_ERROR "Could not find required header zmq.h")
endif()
if(NOT ZMQ_LIB)
message(FATAL_ERROR "Could not find required libzmq")

@ -178,7 +178,7 @@ library archives (`.a`).
| pkg-config | any | NO | `pkg-config` | `base-devel` | `pkgconf` | NO | |
| Boost | 1.58 | NO | `libboost-all-dev` | `boost` | `boost-devel` | NO | C++ libraries |
| OpenSSL | basically any | NO | `libssl-dev` | `openssl` | `openssl-devel` | NO | sha256 sum |
| libzmq | 3.0.0 | NO | `libzmq3-dev` | `zeromq` | `cppzmq-devel` | NO | ZeroMQ library |
| libzmq | 3.0.0 | NO | `libzmq3-dev` | `zeromq` | `zeromq-devel` | NO | ZeroMQ library |
| OpenPGM | ? | NO | `libpgm-dev` | `libpgm` | `openpgm-devel` | NO | For ZeroMQ |
| libnorm[2] | ? | NO | `libnorm-dev` | | ` | YES | For ZeroMQ |
| libunbound | 1.4.16 | YES | `libunbound-dev` | `unbound` | `unbound-devel` | NO | DNS resolver |

@ -26,8 +26,10 @@
# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
set(net_sources dandelionpp.cpp error.cpp i2p_address.cpp parse.cpp socks.cpp socks_connect.cpp tor_address.cpp)
set(net_headers dandelionpp.h error.h i2p_address.h parse.h socks.h socks_connect.h tor_address.h)
set(net_sources dandelionpp.cpp error.cpp i2p_address.cpp parse.cpp socks.cpp
socks_connect.cpp tor_address.cpp zmq.cpp)
set(net_headers dandelionpp.h error.h i2p_address.h parse.h socks.h socks_connect.h
tor_address.h zmq.h)
monero_add_library(net ${net_sources} ${net_headers})
target_link_libraries(net common epee ${Boost_ASIO_LIBRARY})

@ -0,0 +1,188 @@
// Copyright (c) 2019, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "net/zmq.h"
#include <cassert>
#include <cerrno>
#include <limits>
#include <utility>
namespace net
{
namespace zmq
{
const std::error_category& error_category() noexcept
{
struct category final : std::error_category
{
virtual const char* name() const noexcept override final
{
return "error::error_category()";
}
virtual std::string message(int value) const override final
{
char const* const msg = zmq_strerror(value);
if (msg)
return msg;
return "zmq_strerror failure";
}
virtual std::error_condition default_error_condition(int value) const noexcept override final
{
// maps specific errors to generic `std::errc` cases.
switch (value)
{
case EFSM:
case ETERM:
break;
default:
/* zmq is using cerrno errors. C++ spec indicates that
`std::errc` values must be identical to the cerrno value.
So just map every zmq specific error to the generic errc
equivalent. zmq extensions must be in the switch or they
map to a non-existent errc enum value. */
return std::errc(value);
}
return std::error_condition{value, *this};
}
};
static const category instance{};
return instance;
}
void terminate::call(void* ptr) noexcept
{
assert(ptr != nullptr); // see header
while (zmq_term(ptr))
{
if (zmq_errno() != EINTR)
break;
}
}
namespace
{
//! RAII wrapper for `zmq_msg_t`.
class message
{
zmq_msg_t handle_;
public:
message() noexcept
: handle_()
{
zmq_msg_init(handle());
}
message(message&& rhs) = delete;
message(const message& rhs) = delete;
message& operator=(message&& rhs) = delete;
message& operator=(const message& rhs) = delete;
~message() noexcept
{
zmq_msg_close(handle());
}
zmq_msg_t* handle() noexcept
{
return std::addressof(handle_);
}
const char* data() noexcept
{
return static_cast<const char*>(zmq_msg_data(handle()));
}
std::size_t size() noexcept
{
return zmq_msg_size(handle());
}
};
struct do_receive
{
/* ZMQ documentation states that message parts are atomic - either
all are received or none are. Looking through ZMQ code and
Github discussions indicates that after part 1 is returned,
`EAGAIN` cannot be returned to meet these guarantees. Unit tests
verify (for the `inproc://` case) that this is the behavior.
Therefore, read errors after the first part are treated as a
failure for the entire message (probably `ETERM`). */
int operator()(std::string& payload, void* const socket, const int flags) const
{
static constexpr const int max_out = std::numeric_limits<int>::max();
const std::string::size_type initial = payload.size();
message part{};
for (;;)
{
int last = 0;
if ((last = zmq_msg_recv(part.handle(), socket, flags)) < 0)
return last;
payload.append(part.data(), part.size());
if (!zmq_msg_more(part.handle()))
break;
}
const std::string::size_type added = payload.size() - initial;
return unsigned(max_out) < added ? max_out : int(added);
}
};
template<typename F, typename... T>
expect<void> retry_op(F op, T&&... args) noexcept(noexcept(op(args...)))
{
for (;;)
{
if (0 <= op(args...))
return success();
const int error = zmq_errno();
if (error != EINTR)
return make_error_code(error);
}
}
} // anonymous
expect<std::string> receive(void* const socket, const int flags)
{
std::string payload{};
MONERO_CHECK(retry_op(do_receive{}, payload, socket, flags));
return {std::move(payload)};
}
expect<void> send(const epee::span<const std::uint8_t> payload, void* const socket, const int flags) noexcept
{
return retry_op(zmq_send, socket, payload.data(), payload.size(), flags);
}
} // zmq
} // net

@ -0,0 +1,136 @@
// Copyright (c) 2019, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include <memory>
#include <string>
#include <system_error>
#include <zmq.h>
#include "common/expect.h"
#include "span.h"
//! If the expression is less than 0, return the current ZMQ error code.
#define MONERO_ZMQ_CHECK(...) \
do \
{ \
if (( __VA_ARGS__ ) < 0) \
return {::net::zmq::get_error_code()}; \
} while (0)
//! Print a message followed by the current ZMQ error message.
#define MONERO_LOG_ZMQ_ERROR(...) \
do \
{ \
MERROR( __VA_ARGS__ << ": " << ::net::zmq::get_error_code().message()); \
} while (0)
//! Throw an exception with a custom `msg`, current ZMQ error code, filename, and line number.
#define MONERO_ZMQ_THROW(msg) \
MONERO_THROW( ::net::zmq::get_error_code(), msg )
namespace net
{
namespace zmq
{
//! \return Category for ZMQ errors.
const std::error_category& error_category() noexcept;
//! \return `code` (usally from zmq_errno()`) using `net::zmq::error_category()`.
inline std::error_code make_error_code(int code) noexcept
{
return std::error_code{code, error_category()};
}
//! \return Error from `zmq_errno()` using `net::zmq::error_category()`.
inline std::error_code get_error_code() noexcept
{
return make_error_code(zmq_errno());
}
//! Calls `zmq_term`
class terminate
{
static void call(void* ptr) noexcept;
public:
void operator()(void* ptr) const noexcept
{
if (ptr)
call(ptr);
}
};
//! Calls `zmq_close`
struct close
{
void operator()(void* ptr) const noexcept
{
if (ptr)
zmq_close(ptr);
}
};
//! Unique ZMQ context handle, calls `zmq_term` on destruction.
using context = std::unique_ptr<void, terminate>;
//! Unique ZMQ socket handle, calls `zmq_close` on destruction.
using socket = std::unique_ptr<void, close>;
/*! Read all parts of the next message on `socket`. Blocks until the entire
next message (all parts) are read, or until `zmq_term` is called on the
`zmq_context` associated with `socket`. If the context is terminated,
`make_error_code(ETERM)` is returned.
\note This will automatically retry on `EINTR`, so exiting on
interrupts requires context termination.
\note If non-blocking behavior is requested on `socket` or by `flags`,
then `net::zmq::make_error_code(EAGAIN)` will be returned if this
would block.
\param socket Handle created with `zmq_socket`.
\param flags See `zmq_msg_read` for possible flags.
\return Message payload read from `socket` or ZMQ error. */
expect<std::string> receive(void* socket, int flags = 0);
/*! Sends `payload` on `socket`. Blocks until the entire message is queued
for sending, or until `zmq_term` is called on the `zmq_context`
associated with `socket`. If the context is terminated,
`make_error_code(ETERM)` is returned.
\note This will automatically retry on `EINTR`, so exiting on
interrupts requires context termination.
\note If non-blocking behavior is requested on `socket` or by `flags`,
then `net::zmq::make_error_code(EAGAIN)` will be returned if this
would block.
\param payload sent as one message on `socket`.
\param socket Handle created with `zmq_socket`.
\param flags See `zmq_send` for possible flags.
\return `success()` if sent, otherwise ZMQ error. */
expect<void> send(epee::span<const std::uint8_t> payload, void* socket, int flags = 0) noexcept;
} // zmq
} // net

@ -26,6 +26,8 @@
# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
include_directories(SYSTEM ${ZMQ_INCLUDE_PATH})
set(rpc_base_sources
rpc_args.cpp)

@ -28,18 +28,29 @@
#include "zmq_server.h"
#include <chrono>
#include <cstdint>
#include <system_error>
namespace cryptonote
{
namespace
{
constexpr const int num_zmq_threads = 1;
constexpr const std::int64_t max_message_size = 10 * 1024 * 1024; // 10 MiB
constexpr const std::chrono::seconds linger_timeout{2}; // wait period for pending out messages
}
namespace rpc
{
ZmqServer::ZmqServer(RpcHandler& h) :
handler(h),
stop_signal(false),
running(false),
context(DEFAULT_NUM_ZMQ_THREADS) // TODO: make this configurable
context(zmq_init(num_zmq_threads))
{
if (!context)
MONERO_ZMQ_THROW("Unable to create ZMQ context");
}
ZmqServer::~ZmqServer()
@ -48,71 +59,88 @@ ZmqServer::~ZmqServer()
void ZmqServer::serve()
{
while (1)
try
{
try
// socket must close before `zmq_term` will exit.
const net::zmq::socket socket = std::move(rep_socket);
if (!socket)
{
zmq::message_t message;
if (!rep_socket)
{
throw std::runtime_error("ZMQ RPC server reply socket is null");
}
while (rep_socket->recv(&message, 0))
{
std::string message_string(reinterpret_cast<const char *>(message.data()), message.size());
MDEBUG(std::string("Received RPC request: \"") + message_string + "\"");
std::string response = handler.handle(message_string);
zmq::message_t reply(response.size());
memcpy((void *) reply.data(), response.c_str(), response.size());
rep_socket->send(reply);
MDEBUG(std::string("Sent RPC reply: \"") + response + "\"");
}
}
catch (const boost::thread_interrupted& e)
{
MDEBUG("ZMQ Server thread interrupted.");
MERROR("ZMQ RPC server reply socket is null");
return;
}
catch (const zmq::error_t& e)
while (1)
{
MERROR(std::string("ZMQ error: ") + e.what());
const std::string message = MONERO_UNWRAP(net::zmq::receive(socket.get()));
MDEBUG("Received RPC request: \"" << message << "\"");
const std::string& response = handler.handle(message);
MONERO_UNWRAP(net::zmq::send(epee::strspan<std::uint8_t>(response), socket.get()));
MDEBUG("Sent RPC reply: \"" << response << "\"");
}
boost::this_thread::interruption_point();
}
catch (const std::system_error& e)
{
if (e.code() != net::zmq::make_error_code(ETERM))
MERROR("ZMQ RPC Server Error: " << e.what());
}
catch (const std::exception& e)
{
MERROR("ZMQ RPC Server Error: " << e.what());
}
catch (...)
{
MERROR("Unknown error in ZMQ RPC server");
}
}
bool ZmqServer::addIPCSocket(std::string address, std::string port)
bool ZmqServer::addIPCSocket(const boost::string_ref address, const boost::string_ref port)
{
MERROR("ZmqServer::addIPCSocket not yet implemented!");
return false;
}
bool ZmqServer::addTCPSocket(std::string address, std::string port)
bool ZmqServer::addTCPSocket(boost::string_ref address, boost::string_ref port)
{
try
if (!context)
{
std::string addr_prefix("tcp://");
MERROR("ZMQ RPC Server already shutdown");
return false;
}
rep_socket.reset(new zmq::socket_t(context, ZMQ_REP));
rep_socket.reset(zmq_socket(context.get(), ZMQ_REP));
if (!rep_socket)
{
MONERO_LOG_ZMQ_ERROR("ZMQ RPC Server socket create failed");
return false;
}
rep_socket->setsockopt(ZMQ_RCVTIMEO, &DEFAULT_RPC_RECV_TIMEOUT_MS, sizeof(DEFAULT_RPC_RECV_TIMEOUT_MS));
if (zmq_setsockopt(rep_socket.get(), ZMQ_MAXMSGSIZE, std::addressof(max_message_size), sizeof(max_message_size)) != 0)
{
MONERO_LOG_ZMQ_ERROR("Failed to set maximum incoming message size");
return false;
}
if (address.empty())
address = "*";
if (port.empty())
port = "*";
std::string bind_address = addr_prefix + address + std::string(":") + port;
rep_socket->bind(bind_address.c_str());
static constexpr const int linger_value = std::chrono::milliseconds{linger_timeout}.count();
if (zmq_setsockopt(rep_socket.get(), ZMQ_LINGER, std::addressof(linger_value), sizeof(linger_value)) != 0)
{
MONERO_LOG_ZMQ_ERROR("Failed to set linger timeout");
return false;
}
catch (const std::exception& e)
if (address.empty())
address = "*";
if (port.empty())
port = "*";
std::string bind_address = "tcp://";
bind_address.append(address.data(), address.size());
bind_address += ":";
bind_address.append(port.data(), port.size());
if (zmq_bind(rep_socket.get(), bind_address.c_str()) < 0)
{
MERROR(std::string("Error creating ZMQ Socket: ") + e.what());
MONERO_LOG_ZMQ_ERROR("ZMQ RPC Server bind failed");
return false;
}
return true;
@ -120,22 +148,16 @@ bool ZmqServer::addTCPSocket(std::string address, std::string port)
void ZmqServer::run()
{
running = true;
run_thread = boost::thread(boost::bind(&ZmqServer::serve, this));
}
void ZmqServer::stop()
{
if (!running) return;
stop_signal = true;
if (!run_thread.joinable())
return;
run_thread.interrupt();
context.reset(); // destroying context terminates all calls
run_thread.join();
running = false;
return;
}

@ -29,12 +29,10 @@
#pragma once
#include <boost/thread/thread.hpp>
#include <zmq.hpp>
#include <string>
#include <memory>
#include <boost/utility/string_ref.hpp>
#include "common/command_line.h"
#include "net/zmq.h"
#include "rpc_handler.h"
namespace cryptonote
@ -43,9 +41,6 @@ namespace cryptonote
namespace rpc
{
static constexpr int DEFAULT_NUM_ZMQ_THREADS = 1;
static constexpr int DEFAULT_RPC_RECV_TIMEOUT_MS = 1000;
class ZmqServer
{
public:
@ -58,8 +53,8 @@ class ZmqServer
void serve();
bool addIPCSocket(std::string address, std::string port);
bool addTCPSocket(std::string address, std::string port);
bool addIPCSocket(boost::string_ref address, boost::string_ref port);
bool addTCPSocket(boost::string_ref address, boost::string_ref port);
void run();
void stop();
@ -67,14 +62,11 @@ class ZmqServer
private:
RpcHandler& handler;
volatile bool stop_signal;
volatile bool running;
zmq::context_t context;
net::zmq::context context;
boost::thread run_thread;
std::unique_ptr<zmq::socket_t> rep_socket;
net::zmq::socket rep_socket;
};

@ -117,7 +117,8 @@ target_link_libraries(unit_tests
${Boost_THREAD_LIBRARY}
${GTEST_LIBRARIES}
${CMAKE_THREAD_LIBS_INIT}
${EXTRA_LIBRARIES})
${EXTRA_LIBRARIES}
${ZMQ_LIB})
set_property(TARGET unit_tests
PROPERTY
FOLDER "tests")

@ -40,6 +40,7 @@
#include <boost/range/adaptor/sliced.hpp>
#include <boost/range/combine.hpp>
#include <boost/system/error_code.hpp>
#include <boost/thread/scoped_thread.hpp>
#include <boost/thread/thread.hpp>
#include <boost/uuid/nil_generator.hpp>
#include <boost/uuid/random_generator.hpp>
@ -59,6 +60,7 @@
#include "net/socks_connect.h"
#include "net/parse.h"
#include "net/tor_address.h"
#include "net/zmq.h"
#include "p2p/net_peerlist_boost_serialization.h"
#include "serialization/keyvalue_serialization.h"
#include "storages/portable_storage.h"
@ -1253,3 +1255,131 @@ TEST(dandelionpp_map, dropped_all_connections)
EXPECT_EQ(3u, entry.second);
}
}
TEST(zmq, error_codes)
{
EXPECT_EQ(
std::addressof(net::zmq::error_category()),
std::addressof(net::zmq::make_error_code(0).category())
);
EXPECT_EQ(
std::make_error_condition(std::errc::not_a_socket),
net::zmq::make_error_code(ENOTSOCK)
);
EXPECT_TRUE(
[]() -> expect<void>
{
MONERO_ZMQ_CHECK(zmq_msg_send(nullptr, nullptr, 0));
return success();
}().matches(std::errc::not_a_socket)
);
bool thrown = false;
try
{
MONERO_ZMQ_THROW("stuff");
}
catch (const std::system_error& e)
{
thrown = true;
EXPECT_EQ(std::make_error_condition(std::errc::not_a_socket), e.code());
}
EXPECT_TRUE(thrown);
}
TEST(zmq, read_write)
{
net::zmq::context context{zmq_init(1)};
ASSERT_NE(nullptr, context);
net::zmq::socket send_socket{zmq_socket(context.get(), ZMQ_REQ)};
net::zmq::socket recv_socket{zmq_socket(context.get(), ZMQ_REP)};
ASSERT_NE(nullptr, send_socket);
ASSERT_NE(nullptr, recv_socket);
ASSERT_EQ(0u, zmq_bind(recv_socket.get(), "inproc://testing"));
ASSERT_EQ(0u, zmq_connect(send_socket.get(), "inproc://testing"));
std::string message;
message.resize(1024);
crypto::rand(message.size(), reinterpret_cast<std::uint8_t*>(std::addressof(message[0])));
ASSERT_TRUE(bool(net::zmq::send(epee::strspan<std::uint8_t>(message), send_socket.get())));
const expect<std::string> received = net::zmq::receive(recv_socket.get());
ASSERT_TRUE(bool(received));
EXPECT_EQ(message, *received);
}
TEST(zmq, read_write_multipart)
{
net::zmq::context context{zmq_init(1)};
ASSERT_NE(nullptr, context);
net::zmq::socket send_socket{zmq_socket(context.get(), ZMQ_REQ)};
net::zmq::socket recv_socket{zmq_socket(context.get(), ZMQ_REP)};
ASSERT_NE(nullptr, send_socket);
ASSERT_NE(nullptr, recv_socket);
ASSERT_EQ(0u, zmq_bind(recv_socket.get(), "inproc://testing"));
ASSERT_EQ(0u, zmq_connect(send_socket.get(), "inproc://testing"));
std::string message;
message.resize(999);
crypto::rand(message.size(), reinterpret_cast<std::uint8_t*>(std::addressof(message[0])));
for (unsigned i = 0; i < 3; ++i)
{
const expect<std::string> received = net::zmq::receive(recv_socket.get(), ZMQ_DONTWAIT);
ASSERT_FALSE(bool(received));
EXPECT_EQ(net::zmq::make_error_code(EAGAIN), received.error());
const epee::span<const std::uint8_t> bytes{
reinterpret_cast<const std::uint8_t*>(std::addressof(message[0])) + (i * 333), 333
};
ASSERT_TRUE(bool(net::zmq::send(bytes, send_socket.get(), (i == 2 ? 0 : ZMQ_SNDMORE))));
}
const expect<std::string> received = net::zmq::receive(recv_socket.get(), ZMQ_DONTWAIT);
ASSERT_TRUE(bool(received));
EXPECT_EQ(message, *received);
}
TEST(zmq, read_write_termination)
{
net::zmq::context context{zmq_init(1)};
ASSERT_NE(nullptr, context);
// must be declared before sockets and after context
boost::scoped_thread<> thread{};
net::zmq::socket send_socket{zmq_socket(context.get(), ZMQ_REQ)};
net::zmq::socket recv_socket{zmq_socket(context.get(), ZMQ_REP)};
ASSERT_NE(nullptr, send_socket);
ASSERT_NE(nullptr, recv_socket);
ASSERT_EQ(0u, zmq_bind(recv_socket.get(), "inproc://testing"));
ASSERT_EQ(0u, zmq_connect(send_socket.get(), "inproc://testing"));
std::string message;
message.resize(1024);
crypto::rand(message.size(), reinterpret_cast<std::uint8_t*>(std::addressof(message[0])));
ASSERT_TRUE(bool(net::zmq::send(epee::strspan<std::uint8_t>(message), send_socket.get(), ZMQ_SNDMORE)));
expect<std::string> received = net::zmq::receive(recv_socket.get(), ZMQ_DONTWAIT);
ASSERT_FALSE(bool(received));
EXPECT_EQ(net::zmq::make_error_code(EAGAIN), received.error());
thread = boost::scoped_thread<>{
boost::thread{
[&context] () { context.reset(); }
}
};
received = net::zmq::receive(recv_socket.get());
ASSERT_FALSE(bool(received));
EXPECT_EQ(net::zmq::make_error_code(ETERM), received.error());
}

Loading…
Cancel
Save