@ -48,6 +48,8 @@
# include "cryptonote_basic/events.h"
# include "misc_log_ex.h"
# include "serialization/json_object.h"
# include "ringct/rctTypes.h"
# include "cryptonote_core/cryptonote_tx_utils.h"
# undef MONERO_DEFAULT_LOG_CATEGORY
# define MONERO_DEFAULT_LOG_CATEGORY "net.zmq"
@ -57,6 +59,7 @@ namespace
constexpr const char txpool_signal [ ] = " tx_signal " ;
using chain_writer = void ( epee : : byte_stream & , std : : uint64_t , epee : : span < const cryptonote : : block > ) ;
using miner_writer = void ( epee : : byte_stream & , uint8_t , uint64_t , const crypto : : hash & , const crypto : : hash & , cryptonote : : difficulty_type , uint64_t , uint64_t , const std : : vector < cryptonote : : tx_block_template_backlog_entry > & ) ;
using txpool_writer = void ( epee : : byte_stream & , epee : : span < const cryptonote : : txpool_event > ) ;
template < typename F >
@ -116,13 +119,30 @@ namespace
const epee : : span < const cryptonote : : block > blocks ;
} ;
//! Object for miner data serialization
struct miner_data
{
uint8_t major_version ;
uint64_t height ;
const crypto : : hash & prev_id ;
const crypto : : hash & seed_hash ;
cryptonote : : difficulty_type diff ;
uint64_t median_weight ;
uint64_t already_generated_coins ;
const std : : vector < cryptonote : : tx_block_template_backlog_entry > & tx_backlog ;
} ;
//! Object for "minimal" tx serialization
struct minimal_txpool
{
const cryptonote : : transaction & tx ;
crypto : : hash hash ;
uint64_t blob_size ;
uint64_t weight ;
uint64_t fee ;
} ;
void toJsonValue ( rapidjson : : Writer < epee : : byte_stream > & dest , const minimal_chain self )
void toJsonValue ( rapidjson : : Writer < epee : : byte_stream > & dest , const minimal_chain & self )
{
namespace adapt = boost : : adaptors ;
@ -143,19 +163,27 @@ namespace
dest . EndObject ( ) ;
}
void toJsonValue ( rapidjson : : Writer < epee : : byte_stream > & dest , const minimal_txpool self )
{
crypto : : hash id { } ;
std : : size_t blob_size = 0 ;
if ( ! get_transaction_hash ( self . tx , id , blob_size ) )
void toJsonValue ( rapidjson : : Writer < epee : : byte_stream > & dest , const miner_data & self )
{
MERROR ( " ZMQ/Pub failure: get_transaction_hash " ) ;
return ;
dest . StartObject ( ) ;
INSERT_INTO_JSON_OBJECT ( dest , major_version , self . major_version ) ;
INSERT_INTO_JSON_OBJECT ( dest , height , self . height ) ;
INSERT_INTO_JSON_OBJECT ( dest , prev_id , self . prev_id ) ;
INSERT_INTO_JSON_OBJECT ( dest , seed_hash , self . seed_hash ) ;
INSERT_INTO_JSON_OBJECT ( dest , difficulty , cryptonote : : hex ( self . diff ) ) ;
INSERT_INTO_JSON_OBJECT ( dest , median_weight , self . median_weight ) ;
INSERT_INTO_JSON_OBJECT ( dest , already_generated_coins , self . already_generated_coins ) ;
INSERT_INTO_JSON_OBJECT ( dest , tx_backlog , self . tx_backlog ) ;
dest . EndObject ( ) ;
}
void toJsonValue ( rapidjson : : Writer < epee : : byte_stream > & dest , const minimal_txpool & self )
{
dest . StartObject ( ) ;
INSERT_INTO_JSON_OBJECT ( dest , id , id ) ;
INSERT_INTO_JSON_OBJECT ( dest , blob_size , blob_size ) ;
INSERT_INTO_JSON_OBJECT ( dest , id , self . hash ) ;
INSERT_INTO_JSON_OBJECT ( dest , blob_size , self . blob_size ) ;
INSERT_INTO_JSON_OBJECT ( dest , weight , self . weight ) ;
INSERT_INTO_JSON_OBJECT ( dest , fee , self . fee ) ;
dest . EndObject ( ) ;
}
@ -169,6 +197,11 @@ namespace
json_pub ( buf , minimal_chain { height , blocks } ) ;
}
void json_miner_data ( epee : : byte_stream & buf , uint8_t major_version , uint64_t height , const crypto : : hash & prev_id , const crypto : : hash & seed_hash , cryptonote : : difficulty_type diff , uint64_t median_weight , uint64_t already_generated_coins , const std : : vector < cryptonote : : tx_block_template_backlog_entry > & tx_backlog )
{
json_pub ( buf , miner_data { major_version , height , prev_id , seed_hash , diff , median_weight , already_generated_coins , tx_backlog } ) ;
}
// boost::adaptors are in place "views" - no copy/move takes place
// moving transactions (via sort, etc.), is expensive!
@ -187,7 +220,7 @@ namespace
namespace adapt = boost : : adaptors ;
const auto to_minimal_tx = [ ] ( const cryptonote : : txpool_event & event )
{
return minimal_txpool { event . tx };
return minimal_txpool { event . tx , event . hash , event . blob_size , event . weight , cryptonote : : get_tx_fee ( event . tx ) };
} ;
json_pub ( buf , ( txes | adapt : : filtered ( is_valid { } ) | adapt : : transformed ( to_minimal_tx ) ) ) ;
}
@ -198,6 +231,11 @@ namespace
{ u8 " json-minimal-chain_main " , json_minimal_chain }
} } ;
constexpr const std : : array < context < miner_writer > , 1 > miner_contexts =
{ {
{ u8 " json-full-miner_data " , json_miner_data } ,
} } ;
constexpr const std : : array < context < txpool_writer > , 2 > txpool_contexts =
{ {
{ u8 " json-full-txpool_add " , json_full_txpool } ,
@ -321,6 +359,7 @@ namespace cryptonote { namespace listener
zmq_pub : : zmq_pub ( void * context )
: relay_ ( ) ,
chain_subs_ { { 0 } } ,
miner_subs_ { { 0 } } ,
txpool_subs_ { { 0 } } ,
sync_ ( )
{
@ -328,6 +367,7 @@ zmq_pub::zmq_pub(void* context)
throw std : : logic_error { " ZMQ context cannot be NULL " } ;
verify_sorted ( chain_contexts , " chain_contexts " ) ;
verify_sorted ( miner_contexts , " miner_contexts " ) ;
verify_sorted ( txpool_contexts , " txpool_contexts " ) ;
relay_ . reset ( zmq_socket ( context , ZMQ_PAIR ) ) ;
@ -348,22 +388,25 @@ bool zmq_pub::sub_request(boost::string_ref message)
message . remove_prefix ( 1 ) ;
const auto chain_range = get_range ( chain_contexts , message ) ;
const auto miner_range = get_range ( miner_contexts , message ) ;
const auto txpool_range = get_range ( txpool_contexts , message ) ;
if ( ! chain_range . empty ( ) | | ! txpool_range. empty ( ) )
if ( ! chain_range . empty ( ) | | ! miner_range. empty ( ) | | ! txpool_range. empty ( ) )
{
MDEBUG ( " Client " < < ( tag ? " subscribed " : " unsubscribed " ) < < " to " < <
chain_range . size ( ) < < " chain topic(s) and " < < txpool_range . size ( ) < < " txpool topic(s) " ) ;
chain_range . size ( ) < < " chain topic(s) , " < < miner_range . size ( ) < < " miner topic(s) and " < < txpool_range . size ( ) < < " txpool topic(s) " ) ;
const boost : : lock_guard < boost : : mutex > lock { sync_ } ;
switch ( tag )
{
case 0 :
remove_subscriptions ( chain_subs_ , chain_range , chain_contexts . begin ( ) ) ;
remove_subscriptions ( miner_subs_ , miner_range , miner_contexts . begin ( ) ) ;
remove_subscriptions ( txpool_subs_ , txpool_range , txpool_contexts . begin ( ) ) ;
return true ;
case 1 :
add_subscriptions ( chain_subs_ , chain_range , chain_contexts . begin ( ) ) ;
add_subscriptions ( miner_subs_ , miner_range , miner_contexts . begin ( ) ) ;
add_subscriptions ( txpool_subs_ , txpool_range , txpool_contexts . begin ( ) ) ;
return true ;
default :
@ -436,6 +479,25 @@ std::size_t zmq_pub::send_chain_main(const std::uint64_t height, const epee::spa
return 0 ;
}
std : : size_t zmq_pub : : send_miner_data ( uint8_t major_version , uint64_t height , const crypto : : hash & prev_id , const crypto : : hash & seed_hash , difficulty_type diff , uint64_t median_weight , uint64_t already_generated_coins , const std : : vector < tx_block_template_backlog_entry > & tx_backlog )
{
boost : : unique_lock < boost : : mutex > guard { sync_ } ;
const auto subs_copy = miner_subs_ ;
guard . unlock ( ) ;
for ( const std : : size_t sub : subs_copy )
{
if ( sub )
{
auto messages = make_pubs ( subs_copy , miner_contexts , major_version , height , prev_id , seed_hash , diff , median_weight , already_generated_coins , tx_backlog ) ;
guard . lock ( ) ;
return send_messages ( relay_ . get ( ) , messages ) ;
}
}
return 0 ;
}
std : : size_t zmq_pub : : send_txpool_add ( std : : vector < txpool_event > txes )
{
if ( txes . empty ( ) )
@ -466,6 +528,15 @@ void zmq_pub::chain_main::operator()(const std::uint64_t height, epee::span<cons
MERROR ( " Unable to send ZMQ/Pub - ZMQ server destroyed " ) ;
}
void zmq_pub : : miner_data : : operator ( ) ( uint8_t major_version , uint64_t height , const crypto : : hash & prev_id , const crypto : : hash & seed_hash , difficulty_type diff , uint64_t median_weight , uint64_t already_generated_coins , const std : : vector < tx_block_template_backlog_entry > & tx_backlog ) const
{
const std : : shared_ptr < zmq_pub > self = self_ . lock ( ) ;
if ( self )
self - > send_miner_data ( major_version , height , prev_id , seed_hash , diff , median_weight , already_generated_coins , tx_backlog ) ;
else
MERROR ( " Unable to send ZMQ/Pub - ZMQ server destroyed " ) ;
}
void zmq_pub : : txpool_add : : operator ( ) ( std : : vector < cryptonote : : txpool_event > txes ) const
{
const std : : shared_ptr < zmq_pub > self = self_ . lock ( ) ;