Create toggleable rendezvous behaviour wrapper

Encapsulate and handle registration refresh logic in
rendezvous::Behaviour.
Make
rendezvous config optional and remove defaults values.
rendezvous-asb-refactor
rishflab 3 years ago committed by Daniel Karzel
parent 2bdfc58ed7
commit b19123066d
No known key found for this signature in database
GPG Key ID: 30C3FC2E438ADB6E

@ -4,6 +4,7 @@ pub mod config;
mod event_loop;
mod rate;
mod recovery;
mod rendezvous;
pub mod tracing;
pub mod transport;

@ -1,16 +1,18 @@
use crate::asb::event_loop::LatestRate;
use crate::asb::rendezvous;
use crate::env;
use crate::network::quote::BidQuote;
use crate::network::swap_setup::alice;
use crate::network::swap_setup::alice::WalletSnapshot;
use crate::network::{encrypted_signature, quote, transfer_proof};
use crate::protocol::alice::State3;
use crate::rendezvous::XmrBtcNamespace;
use anyhow::{anyhow, Error};
use libp2p::identity::Keypair;
use libp2p::ping::{Ping, PingEvent};
use libp2p::rendezvous::{Event, Namespace, RegisterError};
use libp2p::request_response::{RequestId, ResponseChannel};
use libp2p::{rendezvous, NetworkBehaviour, PeerId};
use libp2p::swarm::toggle::Toggle;
use libp2p::{NetworkBehaviour, PeerId};
use uuid::Uuid;
#[derive(Debug)]
@ -44,12 +46,6 @@ pub enum OutEvent {
peer: PeerId,
error: Error,
},
Registered {
rendezvous_node: PeerId,
ttl: u64,
namespace: Namespace,
},
RegisterFailed(RegisterError),
/// "Fallback" variant that allows the event mapping code to swallow certain
/// events that we don't want the caller to deal with.
Other,
@ -79,7 +75,7 @@ pub struct Behaviour<LR>
where
LR: LatestRate + Send + 'static,
{
pub rendezvous: rendezvous::Rendezvous,
pub rendezvous: Toggle<rendezvous::Behaviour>,
pub quote: quote::Behaviour,
pub swap_setup: alice::Behaviour<LR>,
pub transfer_proof: transfer_proof::Behaviour,
@ -102,9 +98,29 @@ where
resume_only: bool,
env_config: env::Config,
keypair: Keypair,
rendezvous_config: Option<crate::asb::config::Rendezvous>,
) -> Self {
Self {
rendezvous: rendezvous::Rendezvous::new(keypair, rendezvous::Config::default()),
rendezvous: Toggle::from(match rendezvous_config {
None => None,
Some(config) => {
let namespace = match (env_config.bitcoin_network, env_config.monero_network) {
(bitcoin::Network::Bitcoin, monero::Network::Mainnet) => {
XmrBtcNamespace::Mainnet
}
(bitcoin::Network::Testnet, monero::Network::Stagenet) => {
XmrBtcNamespace::Testnet
}
_ => panic!("Cannot determine rendezvous namespace compatible with the bitcoin network: {:?} and monero network: {:?} specified in env config", env_config.bitcoin_network, env_config.monero_network),
};
Some(rendezvous::Behaviour::new(
keypair,
config.peer_id,
config.addr,
namespace,
))
}
}),
quote: quote::asb(),
swap_setup: alice::Behaviour::new(
min_buy,
@ -126,27 +142,8 @@ impl From<PingEvent> for OutEvent {
}
}
impl From<rendezvous::Event> for OutEvent {
fn from(event: rendezvous::Event) -> Self {
match event {
Event::Discovered { .. } => unreachable!("The ASB does not discover other nodes"),
Event::DiscoverFailed { .. } => unreachable!("The ASB does not discover other nodes"),
Event::Registered {
rendezvous_node,
ttl,
namespace,
} => OutEvent::Registered {
rendezvous_node,
ttl,
namespace,
},
Event::RegisterFailed(error) => OutEvent::RegisterFailed(error),
Event::DiscoverServed { .. } => unreachable!("ASB does not act as rendezvous node"),
Event::DiscoverNotServed { .. } => unreachable!("ASB does not act as rendezvous node"),
Event::PeerRegistered { .. } => unreachable!("ASB does not act as rendezvous node"),
Event::PeerNotRegistered { .. } => unreachable!("ASB does not act as rendezvous node"),
Event::PeerUnregistered { .. } => unreachable!("ASB does not act as rendezvous node"),
Event::RegistrationExpired(_) => unreachable!("ASB does not act as rendezvous node"),
}
impl From<()> for OutEvent {
fn from(_: ()) -> Self {
OutEvent::Other
}
}

@ -83,13 +83,6 @@ fn default_asb_data_dir() -> Result<PathBuf> {
.context("Could not generate default config file path")
}
// TODO: update this to the actual deployed rendezvous server
// Currently set to Staging ASB on raspi
const DEFAULT_RENDEZVOUS_PEER_ID: &str = "12D3KooWPZ69DRp4wbGB3wJsxxsg1XW1EVZ2evtVwcARCF3a1nrx";
// TODO: update this to the actual deployed rendezvous server
// Port still to be opened once running rendezvous node
const DEFAULT_RENDEZVOUS_ADDR: &str = "/ip4/141.168.172.35/tcp/7654";
const DEFAULT_MIN_BUY_AMOUNT: f64 = 0.002f64;
const DEFAULT_MAX_BUY_AMOUNT: f64 = 0.02f64;
const DEFAULT_SPREAD: f64 = 0.02f64;
@ -103,7 +96,7 @@ pub struct Config {
pub monero: Monero,
pub tor: TorConf,
pub maker: Maker,
pub rendezvous_node: Rendezvous,
pub rendezvous: Option<Rendezvous>,
}
impl Config {
@ -304,18 +297,6 @@ pub fn query_user_for_initial_config(testnet: bool) -> Result<Config> {
}
let ask_spread = Decimal::from_f64(ask_spread).context("Unable to parse spread")?;
let rendezvous_peer_id_str = Input::with_theme(&ColorfulTheme::default())
.with_prompt("Enter the peer id of the rendezvous node you wish to register with")
.default(DEFAULT_RENDEZVOUS_PEER_ID.to_string())
.interact_text()?;
let rendezvous_peer_id = PeerId::from_str(rendezvous_peer_id_str.as_str())?;
let rendezvous_addr_str = Input::with_theme(&ColorfulTheme::default())
.with_prompt("Enter the multiaddress of the rendezvous node you wish to register with")
.default(DEFAULT_RENDEZVOUS_ADDR.to_string())
.interact_text()?;
let rendezvous_addr = Multiaddr::from_str(rendezvous_addr_str.as_str())?;
println!();
Ok(Config {
@ -344,10 +325,7 @@ pub fn query_user_for_initial_config(testnet: bool) -> Result<Config> {
ask_spread,
price_ticker_ws_url: defaults.price_ticker_ws_url,
},
rendezvous_node: Rendezvous {
addr: rendezvous_addr,
peer_id: rendezvous_peer_id,
},
rendezvous: None,
})
}
@ -389,10 +367,7 @@ mod tests {
ask_spread: Decimal::from_f64(DEFAULT_SPREAD).unwrap(),
price_ticker_ws_url: defaults.price_ticker_ws_url,
},
rendezvous_node: Rendezvous {
addr: DEFAULT_RENDEZVOUS_ADDR.parse().unwrap(),
peer_id: PeerId::from_str(DEFAULT_RENDEZVOUS_PEER_ID).unwrap(),
},
rendezvous: None,
};
initial_setup(config_path.clone(), expected.clone()).unwrap();
@ -434,10 +409,7 @@ mod tests {
ask_spread: Decimal::from_f64(DEFAULT_SPREAD).unwrap(),
price_ticker_ws_url: defaults.price_ticker_ws_url,
},
rendezvous_node: Rendezvous {
addr: DEFAULT_RENDEZVOUS_ADDR.parse().unwrap(),
peer_id: PeerId::from_str(DEFAULT_RENDEZVOUS_PEER_ID).unwrap(),
},
rendezvous: None,
};
initial_setup(config_path.clone(), expected.clone()).unwrap();

@ -5,22 +5,19 @@ use crate::network::quote::BidQuote;
use crate::network::swap_setup::alice::WalletSnapshot;
use crate::network::transfer_proof;
use crate::protocol::alice::{AliceState, State3, Swap};
use crate::rendezvous::XmrBtcNamespace;
use crate::{bitcoin, env, kraken, monero};
use anyhow::{Context, Result};
use futures::future;
use futures::future::{BoxFuture, FutureExt};
use futures::stream::{FuturesUnordered, StreamExt};
use libp2p::rendezvous::Namespace;
use libp2p::request_response::{RequestId, ResponseChannel};
use libp2p::swarm::SwarmEvent;
use libp2p::{Multiaddr, PeerId, Swarm};
use libp2p::{PeerId, Swarm};
use rust_decimal::Decimal;
use std::collections::HashMap;
use std::convert::Infallible;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::mpsc;
use uuid::Uuid;
@ -63,14 +60,6 @@ where
/// Tracks [`transfer_proof::Request`]s which are currently inflight and
/// awaiting an acknowledgement.
inflight_transfer_proofs: HashMap<RequestId, bmrng::Responder<()>>,
// TODO: Potentially group together, potentially the whole rendezvous handling could go into a
// separate swarm? There is no dependency between swap and registration, so we could just
// build this completely separate.
rendezvous_node_peer_id: PeerId,
rendezvous_node_addr: Multiaddr,
rendezvous_namespace: XmrBtcNamespace,
rendezvous_reregister_timestamp: Option<Instant>,
}
impl<LR> EventLoop<LR>
@ -87,9 +76,6 @@ where
latest_rate: LR,
min_buy: bitcoin::Amount,
max_buy: bitcoin::Amount,
rendezvous_node_peer_id: PeerId,
rendezvous_node_addr: Multiaddr,
rendezvous_namespace: XmrBtcNamespace,
) -> Result<(Self, mpsc::Receiver<Swap>)> {
let swap_channel = MpscChannels::default();
@ -108,10 +94,6 @@ where
send_transfer_proof: Default::default(),
buffered_transfer_proofs: Default::default(),
inflight_transfer_proofs: Default::default(),
rendezvous_node_peer_id,
rendezvous_node_addr,
rendezvous_namespace,
rendezvous_reregister_timestamp: None,
};
Ok((event_loop, swap_channel.receiver))
}
@ -166,26 +148,9 @@ where
loop {
// rendezvous node re-registration
if let Some(rendezvous_reregister_timestamp) = self.rendezvous_reregister_timestamp {
if Instant::now() > rendezvous_reregister_timestamp {
if self.swarm.is_connected(&self.rendezvous_node_peer_id) {
self.swarm.behaviour_mut().rendezvous.register(
Namespace::new(self.rendezvous_namespace.to_string())
.expect("our namespace to be a correct string"),
self.rendezvous_node_peer_id,
None,
);
} else {
match Swarm::dial_addr(&mut self.swarm, self.rendezvous_node_addr.clone()) {
Ok(()) => {}
Err(error) => {
tracing::error!(
"Failed to redial rendezvous node for re-registration: {:#}",
error
);
}
}
}
if let Some(rendezvous_behaviour) = self.swarm.behaviour_mut().rendezvous.as_mut() {
if let Err(error) = rendezvous_behaviour.refresh_registration() {
tracing::error!("Failed to register with rendezvous point: {:#}", error);
}
}
@ -287,27 +252,6 @@ where
channel
}.boxed());
}
Some(SwarmEvent::Behaviour(OutEvent::Registered { rendezvous_node, ttl, namespace })) => {
// TODO: this can most likely not happen at all, potentially remove these checks
if rendezvous_node != self.rendezvous_node_peer_id {
tracing::error!(peer_id=%rendezvous_node, "Ignoring message from unknown rendezvous node");
continue;
}
// TODO: Consider implementing From for Namespace and XmrBtcNamespace
if namespace.to_string() != self.rendezvous_namespace.to_string() {
tracing::error!(peer_id=%rendezvous_node, %namespace, "Ignoring message from rendezvous node for unknown namespace");
continue;
}
tracing::info!("Successfully registered with rendezvous node");
// record re-registration after half the ttl has expired
self.rendezvous_reregister_timestamp = Some(Instant::now() + Duration::from_secs(ttl) / 2);
}
Some(SwarmEvent::Behaviour(OutEvent::RegisterFailed(error))) => {
tracing::error!(rendezvous_node=%self.rendezvous_node_peer_id, "Registration with rendezvous node failed: {:#}", error);
}
Some(SwarmEvent::Behaviour(OutEvent::Failure {peer, error})) => {
tracing::error!(
%peer,
@ -324,14 +268,6 @@ where
self.inflight_transfer_proofs.insert(id, responder);
}
}
if peer == self.rendezvous_node_peer_id {
self
.swarm
.behaviour_mut()
.rendezvous
.register(Namespace::new(self.rendezvous_namespace.to_string()).expect("our namespace to be a correct string"), self.rendezvous_node_peer_id, None);
}
}
Some(SwarmEvent::IncomingConnectionError { send_back_addr: address, error, .. }) => {
tracing::warn!(%address, "Failed to set up connection with peer. Error {:#}", error);

@ -0,0 +1,155 @@
use crate::rendezvous::XmrBtcNamespace;
use anyhow::Result;
use libp2p::core::connection::ConnectionId;
use libp2p::identity::Keypair;
use libp2p::multiaddr::Protocol;
use libp2p::rendezvous::{Event, Namespace};
use libp2p::swarm::{
IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, NetworkBehaviourEventProcess,
PollParameters, ProtocolsHandler,
};
use libp2p::{Multiaddr, PeerId};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
/// A `NetworkBehaviour` that handles registration of the xmr-btc swap service with a rendezvous point
pub struct Behaviour {
rendezvous_behaviour: libp2p::rendezvous::Rendezvous,
rendezvous_point_peer_id: PeerId,
rendezvous_point_addr: Multiaddr,
rendezvous_namespace: XmrBtcNamespace,
rendezvous_reregister_timestamp: Option<Instant>,
is_connected: bool,
events: Vec<NetworkBehaviourAction<BehaviourInEvent, ()>>,
}
impl Behaviour {
pub fn new(
keypair: Keypair,
peer_id: PeerId,
addr: Multiaddr,
namespace: XmrBtcNamespace,
) -> Self {
Self {
rendezvous_behaviour: libp2p::rendezvous::Rendezvous::new(
keypair,
libp2p::rendezvous::Config::default(),
),
rendezvous_point_peer_id: peer_id,
rendezvous_point_addr: addr,
rendezvous_namespace: namespace,
rendezvous_reregister_timestamp: None,
is_connected: false,
events: vec![],
}
}
pub fn refresh_registration(&mut self) -> Result<()> {
if self.is_connected {
if let Some(rendezvous_reregister_timestamp) = self.rendezvous_reregister_timestamp {
if Instant::now() > rendezvous_reregister_timestamp {
self.rendezvous_behaviour.register(
Namespace::new(self.rendezvous_namespace.to_string())
.expect("our namespace to be a correct string"),
self.rendezvous_point_peer_id,
None,
)?;
}
}
} else {
let p2p_suffix = Protocol::P2p(self.rendezvous_point_peer_id.into());
let address_with_p2p = if !self
.rendezvous_point_addr
.ends_with(&Multiaddr::empty().with(p2p_suffix.clone()))
{
self.rendezvous_point_addr.clone().with(p2p_suffix)
} else {
self.rendezvous_point_addr.clone()
};
self.events.push(NetworkBehaviourAction::DialAddress {
address: address_with_p2p,
})
}
Ok(())
}
}
type BehaviourInEvent =
<<<libp2p::rendezvous::Rendezvous as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent;
impl NetworkBehaviour for Behaviour {
type ProtocolsHandler = <libp2p::rendezvous::Rendezvous as NetworkBehaviour>::ProtocolsHandler;
type OutEvent = ();
fn new_handler(&mut self) -> Self::ProtocolsHandler {
<libp2p::rendezvous::Rendezvous as NetworkBehaviour>::ProtocolsHandler::default()
}
fn addresses_of_peer(&mut self, _peer_id: &PeerId) -> Vec<Multiaddr> {
vec![]
}
fn inject_connected(&mut self, peer_id: &PeerId) {
if *peer_id == self.rendezvous_point_peer_id {
self.is_connected = true;
}
}
fn inject_disconnected(&mut self, peer_id: &PeerId) {
if *peer_id == self.rendezvous_point_peer_id {
self.is_connected = false;
}
}
fn inject_event(
&mut self,
_peer_id: PeerId,
_connection: ConnectionId,
_event: <<libp2p::rendezvous::Rendezvous as NetworkBehaviour>::ProtocolsHandler as ProtocolsHandler>::OutEvent,
) {
}
fn poll(
&mut self,
_cx: &mut Context<'_>,
_params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<BehaviourInEvent, Self::OutEvent>> {
if let Some(event) = self.events.pop() {
return Poll::Ready(event);
}
Poll::Pending
}
}
impl NetworkBehaviourEventProcess<libp2p::rendezvous::Event> for Behaviour {
fn inject_event(&mut self, event: Event) {
match event {
Event::RegisterFailed(error) => {
tracing::error!(rendezvous_node=%self.rendezvous_point_peer_id, "Registration with rendezvous node failed: {:#}", error);
}
Event::RegistrationExpired(registration) => {
tracing::warn!("Registation expired: {:?}", registration)
}
Event::Registered {
rendezvous_node,
ttl,
namespace,
} => {
// TODO: this can most likely not happen at all, potentially remove these checks
if rendezvous_node != self.rendezvous_point_peer_id {
tracing::error!(peer_id=%rendezvous_node, "Ignoring message from unknown rendezvous node");
}
// TODO: Consider implementing From for Namespace and XmrBtcNamespace
if namespace.to_string() != self.rendezvous_namespace.to_string() {
tracing::error!(peer_id=%rendezvous_node, %namespace, "Ignoring message from rendezvous node for unknown namespace");
}
// record re-registration after half the ttl has expired
self.rendezvous_reregister_timestamp =
Some(Instant::now() + Duration::from_secs(ttl) / 2);
}
_ => {}
}
}
}

@ -32,7 +32,6 @@ use swap::database::Database;
use swap::monero::Amount;
use swap::network::swarm;
use swap::protocol::alice::run;
use swap::rendezvous::XmrBtcNamespace;
use swap::seed::Seed;
use swap::tor::AuthenticatedClient;
use swap::{asb, bitcoin, kraken, monero, tor};
@ -156,6 +155,7 @@ async fn main() -> Result<()> {
kraken_rate.clone(),
resume_only,
env_config,
config.rendezvous,
)?;
for listen in config.network.listen.clone() {
@ -165,26 +165,10 @@ async fn main() -> Result<()> {
tracing::info!(peer_id = %swarm.local_peer_id(), "Network layer initialized");
// todo: Option<Multiaddr> is being used as a rendezvous feature toggle.
// The fact that rendezvous is an optional feature could be expressed better.
if let Some(addr) = external_addr {
let _ = Swarm::add_external_address(&mut swarm, addr, AddressScore::Infinite);
Swarm::dial_addr(&mut swarm, config.rendezvous_node.addr.clone()).with_context(
|| {
format!(
"Failed to dial rendezvous node addr {}",
config.rendezvous_node.addr
)
},
)?;
}
let namespace = if testnet {
XmrBtcNamespace::Testnet
} else {
XmrBtcNamespace::Mainnet
};
let (event_loop, mut swap_receiver) = EventLoop::new(
swarm,
env_config,
@ -194,9 +178,6 @@ async fn main() -> Result<()> {
kraken_rate.clone(),
config.maker.min_buy_btc,
config.maker.max_buy_btc,
config.rendezvous_node.peer_id,
config.rendezvous_node.addr,
namespace,
)
.unwrap();

@ -14,6 +14,7 @@ pub fn asb<LR>(
latest_rate: LR,
resume_only: bool,
env_config: env::Config,
rendezvous_config: Option<asb::config::Rendezvous>,
) -> Result<Swarm<asb::Behaviour<LR>>>
where
LR: LatestRate + Send + 'static + Debug + Clone,
@ -27,6 +28,7 @@ where
resume_only,
env_config,
identity.clone(),
rendezvous_config,
);
let transport = asb::transport::new(&identity)?;

@ -22,7 +22,6 @@ use swap::network::swarm;
use swap::protocol::alice::{AliceState, Swap};
use swap::protocol::bob::BobState;
use swap::protocol::{alice, bob};
use swap::rendezvous::XmrBtcNamespace;
use swap::seed::Seed;
use swap::{asb, bitcoin, cli, env, monero};
use tempfile::tempdir;
@ -157,13 +156,16 @@ async fn init_containers(cli: &Cli) -> (Monero, Containers<'_>) {
.await
.unwrap();
(monero, Containers {
bitcoind_url,
bitcoind,
monerod_container,
monero_wallet_rpc_containers,
electrs,
})
(
monero,
Containers {
bitcoind_url,
bitcoind,
monerod_container,
monero_wallet_rpc_containers,
electrs,
},
)
}
async fn init_bitcoind_container(
@ -237,6 +239,7 @@ async fn start_alice(
latest_rate,
resume_only,
env_config,
None,
)
.unwrap();
swarm.listen_on(listen_address).unwrap();
@ -250,9 +253,6 @@ async fn start_alice(
FixedRate::default(),
min_buy,
max_buy,
PeerId::random(),
Multiaddr::empty(),
XmrBtcNamespace::Testnet,
)
.unwrap();

Loading…
Cancel
Save