diff --git a/swap/src/asb.rs b/swap/src/asb.rs index 16fc21e6..68faf42b 100644 --- a/swap/src/asb.rs +++ b/swap/src/asb.rs @@ -4,6 +4,7 @@ pub mod config; mod event_loop; mod rate; mod recovery; +mod rendezvous; pub mod tracing; pub mod transport; diff --git a/swap/src/asb/behaviour.rs b/swap/src/asb/behaviour.rs index b1406c15..859cb07c 100644 --- a/swap/src/asb/behaviour.rs +++ b/swap/src/asb/behaviour.rs @@ -1,16 +1,18 @@ use crate::asb::event_loop::LatestRate; +use crate::asb::rendezvous; use crate::env; use crate::network::quote::BidQuote; use crate::network::swap_setup::alice; use crate::network::swap_setup::alice::WalletSnapshot; use crate::network::{encrypted_signature, quote, transfer_proof}; use crate::protocol::alice::State3; +use crate::rendezvous::XmrBtcNamespace; use anyhow::{anyhow, Error}; use libp2p::identity::Keypair; use libp2p::ping::{Ping, PingEvent}; -use libp2p::rendezvous::{Event, Namespace, RegisterError}; use libp2p::request_response::{RequestId, ResponseChannel}; -use libp2p::{rendezvous, NetworkBehaviour, PeerId}; +use libp2p::swarm::toggle::Toggle; +use libp2p::{NetworkBehaviour, PeerId}; use uuid::Uuid; #[derive(Debug)] @@ -44,12 +46,6 @@ pub enum OutEvent { peer: PeerId, error: Error, }, - Registered { - rendezvous_node: PeerId, - ttl: u64, - namespace: Namespace, - }, - RegisterFailed(RegisterError), /// "Fallback" variant that allows the event mapping code to swallow certain /// events that we don't want the caller to deal with. Other, @@ -79,7 +75,7 @@ pub struct Behaviour where LR: LatestRate + Send + 'static, { - pub rendezvous: rendezvous::Rendezvous, + pub rendezvous: Toggle, pub quote: quote::Behaviour, pub swap_setup: alice::Behaviour, pub transfer_proof: transfer_proof::Behaviour, @@ -102,9 +98,29 @@ where resume_only: bool, env_config: env::Config, keypair: Keypair, + rendezvous_config: Option, ) -> Self { Self { - rendezvous: rendezvous::Rendezvous::new(keypair, rendezvous::Config::default()), + rendezvous: Toggle::from(match rendezvous_config { + None => None, + Some(config) => { + let namespace = match (env_config.bitcoin_network, env_config.monero_network) { + (bitcoin::Network::Bitcoin, monero::Network::Mainnet) => { + XmrBtcNamespace::Mainnet + } + (bitcoin::Network::Testnet, monero::Network::Stagenet) => { + XmrBtcNamespace::Testnet + } + _ => panic!("Cannot determine rendezvous namespace compatible with the bitcoin network: {:?} and monero network: {:?} specified in env config", env_config.bitcoin_network, env_config.monero_network), + }; + Some(rendezvous::Behaviour::new( + keypair, + config.peer_id, + config.addr, + namespace, + )) + } + }), quote: quote::asb(), swap_setup: alice::Behaviour::new( min_buy, @@ -126,27 +142,8 @@ impl From for OutEvent { } } -impl From for OutEvent { - fn from(event: rendezvous::Event) -> Self { - match event { - Event::Discovered { .. } => unreachable!("The ASB does not discover other nodes"), - Event::DiscoverFailed { .. } => unreachable!("The ASB does not discover other nodes"), - Event::Registered { - rendezvous_node, - ttl, - namespace, - } => OutEvent::Registered { - rendezvous_node, - ttl, - namespace, - }, - Event::RegisterFailed(error) => OutEvent::RegisterFailed(error), - Event::DiscoverServed { .. } => unreachable!("ASB does not act as rendezvous node"), - Event::DiscoverNotServed { .. } => unreachable!("ASB does not act as rendezvous node"), - Event::PeerRegistered { .. } => unreachable!("ASB does not act as rendezvous node"), - Event::PeerNotRegistered { .. } => unreachable!("ASB does not act as rendezvous node"), - Event::PeerUnregistered { .. } => unreachable!("ASB does not act as rendezvous node"), - Event::RegistrationExpired(_) => unreachable!("ASB does not act as rendezvous node"), - } +impl From<()> for OutEvent { + fn from(_: ()) -> Self { + OutEvent::Other } } diff --git a/swap/src/asb/config.rs b/swap/src/asb/config.rs index 1d875fb2..f9e46ba9 100644 --- a/swap/src/asb/config.rs +++ b/swap/src/asb/config.rs @@ -83,13 +83,6 @@ fn default_asb_data_dir() -> Result { .context("Could not generate default config file path") } -// TODO: update this to the actual deployed rendezvous server -// Currently set to Staging ASB on raspi -const DEFAULT_RENDEZVOUS_PEER_ID: &str = "12D3KooWPZ69DRp4wbGB3wJsxxsg1XW1EVZ2evtVwcARCF3a1nrx"; -// TODO: update this to the actual deployed rendezvous server -// Port still to be opened once running rendezvous node -const DEFAULT_RENDEZVOUS_ADDR: &str = "/ip4/141.168.172.35/tcp/7654"; - const DEFAULT_MIN_BUY_AMOUNT: f64 = 0.002f64; const DEFAULT_MAX_BUY_AMOUNT: f64 = 0.02f64; const DEFAULT_SPREAD: f64 = 0.02f64; @@ -103,7 +96,7 @@ pub struct Config { pub monero: Monero, pub tor: TorConf, pub maker: Maker, - pub rendezvous_node: Rendezvous, + pub rendezvous: Option, } impl Config { @@ -304,18 +297,6 @@ pub fn query_user_for_initial_config(testnet: bool) -> Result { } let ask_spread = Decimal::from_f64(ask_spread).context("Unable to parse spread")?; - let rendezvous_peer_id_str = Input::with_theme(&ColorfulTheme::default()) - .with_prompt("Enter the peer id of the rendezvous node you wish to register with") - .default(DEFAULT_RENDEZVOUS_PEER_ID.to_string()) - .interact_text()?; - let rendezvous_peer_id = PeerId::from_str(rendezvous_peer_id_str.as_str())?; - - let rendezvous_addr_str = Input::with_theme(&ColorfulTheme::default()) - .with_prompt("Enter the multiaddress of the rendezvous node you wish to register with") - .default(DEFAULT_RENDEZVOUS_ADDR.to_string()) - .interact_text()?; - let rendezvous_addr = Multiaddr::from_str(rendezvous_addr_str.as_str())?; - println!(); Ok(Config { @@ -344,10 +325,7 @@ pub fn query_user_for_initial_config(testnet: bool) -> Result { ask_spread, price_ticker_ws_url: defaults.price_ticker_ws_url, }, - rendezvous_node: Rendezvous { - addr: rendezvous_addr, - peer_id: rendezvous_peer_id, - }, + rendezvous: None, }) } @@ -389,10 +367,7 @@ mod tests { ask_spread: Decimal::from_f64(DEFAULT_SPREAD).unwrap(), price_ticker_ws_url: defaults.price_ticker_ws_url, }, - rendezvous_node: Rendezvous { - addr: DEFAULT_RENDEZVOUS_ADDR.parse().unwrap(), - peer_id: PeerId::from_str(DEFAULT_RENDEZVOUS_PEER_ID).unwrap(), - }, + rendezvous: None, }; initial_setup(config_path.clone(), expected.clone()).unwrap(); @@ -434,10 +409,7 @@ mod tests { ask_spread: Decimal::from_f64(DEFAULT_SPREAD).unwrap(), price_ticker_ws_url: defaults.price_ticker_ws_url, }, - rendezvous_node: Rendezvous { - addr: DEFAULT_RENDEZVOUS_ADDR.parse().unwrap(), - peer_id: PeerId::from_str(DEFAULT_RENDEZVOUS_PEER_ID).unwrap(), - }, + rendezvous: None, }; initial_setup(config_path.clone(), expected.clone()).unwrap(); diff --git a/swap/src/asb/event_loop.rs b/swap/src/asb/event_loop.rs index d561ac9c..ffe639da 100644 --- a/swap/src/asb/event_loop.rs +++ b/swap/src/asb/event_loop.rs @@ -5,22 +5,19 @@ use crate::network::quote::BidQuote; use crate::network::swap_setup::alice::WalletSnapshot; use crate::network::transfer_proof; use crate::protocol::alice::{AliceState, State3, Swap}; -use crate::rendezvous::XmrBtcNamespace; use crate::{bitcoin, env, kraken, monero}; use anyhow::{Context, Result}; use futures::future; use futures::future::{BoxFuture, FutureExt}; use futures::stream::{FuturesUnordered, StreamExt}; -use libp2p::rendezvous::Namespace; use libp2p::request_response::{RequestId, ResponseChannel}; use libp2p::swarm::SwarmEvent; -use libp2p::{Multiaddr, PeerId, Swarm}; +use libp2p::{PeerId, Swarm}; use rust_decimal::Decimal; use std::collections::HashMap; use std::convert::Infallible; use std::fmt::Debug; use std::sync::Arc; -use std::time::{Duration, Instant}; use tokio::sync::mpsc; use uuid::Uuid; @@ -63,14 +60,6 @@ where /// Tracks [`transfer_proof::Request`]s which are currently inflight and /// awaiting an acknowledgement. inflight_transfer_proofs: HashMap>, - - // TODO: Potentially group together, potentially the whole rendezvous handling could go into a - // separate swarm? There is no dependency between swap and registration, so we could just - // build this completely separate. - rendezvous_node_peer_id: PeerId, - rendezvous_node_addr: Multiaddr, - rendezvous_namespace: XmrBtcNamespace, - rendezvous_reregister_timestamp: Option, } impl EventLoop @@ -87,9 +76,6 @@ where latest_rate: LR, min_buy: bitcoin::Amount, max_buy: bitcoin::Amount, - rendezvous_node_peer_id: PeerId, - rendezvous_node_addr: Multiaddr, - rendezvous_namespace: XmrBtcNamespace, ) -> Result<(Self, mpsc::Receiver)> { let swap_channel = MpscChannels::default(); @@ -108,10 +94,6 @@ where send_transfer_proof: Default::default(), buffered_transfer_proofs: Default::default(), inflight_transfer_proofs: Default::default(), - rendezvous_node_peer_id, - rendezvous_node_addr, - rendezvous_namespace, - rendezvous_reregister_timestamp: None, }; Ok((event_loop, swap_channel.receiver)) } @@ -166,26 +148,9 @@ where loop { // rendezvous node re-registration - if let Some(rendezvous_reregister_timestamp) = self.rendezvous_reregister_timestamp { - if Instant::now() > rendezvous_reregister_timestamp { - if self.swarm.is_connected(&self.rendezvous_node_peer_id) { - self.swarm.behaviour_mut().rendezvous.register( - Namespace::new(self.rendezvous_namespace.to_string()) - .expect("our namespace to be a correct string"), - self.rendezvous_node_peer_id, - None, - ); - } else { - match Swarm::dial_addr(&mut self.swarm, self.rendezvous_node_addr.clone()) { - Ok(()) => {} - Err(error) => { - tracing::error!( - "Failed to redial rendezvous node for re-registration: {:#}", - error - ); - } - } - } + if let Some(rendezvous_behaviour) = self.swarm.behaviour_mut().rendezvous.as_mut() { + if let Err(error) = rendezvous_behaviour.refresh_registration() { + tracing::error!("Failed to register with rendezvous point: {:#}", error); } } @@ -287,27 +252,6 @@ where channel }.boxed()); } - Some(SwarmEvent::Behaviour(OutEvent::Registered { rendezvous_node, ttl, namespace })) => { - - // TODO: this can most likely not happen at all, potentially remove these checks - if rendezvous_node != self.rendezvous_node_peer_id { - tracing::error!(peer_id=%rendezvous_node, "Ignoring message from unknown rendezvous node"); - continue; - } - - // TODO: Consider implementing From for Namespace and XmrBtcNamespace - if namespace.to_string() != self.rendezvous_namespace.to_string() { - tracing::error!(peer_id=%rendezvous_node, %namespace, "Ignoring message from rendezvous node for unknown namespace"); - continue; - } - - tracing::info!("Successfully registered with rendezvous node"); - // record re-registration after half the ttl has expired - self.rendezvous_reregister_timestamp = Some(Instant::now() + Duration::from_secs(ttl) / 2); - } - Some(SwarmEvent::Behaviour(OutEvent::RegisterFailed(error))) => { - tracing::error!(rendezvous_node=%self.rendezvous_node_peer_id, "Registration with rendezvous node failed: {:#}", error); - } Some(SwarmEvent::Behaviour(OutEvent::Failure {peer, error})) => { tracing::error!( %peer, @@ -324,14 +268,6 @@ where self.inflight_transfer_proofs.insert(id, responder); } } - - if peer == self.rendezvous_node_peer_id { - self - .swarm - .behaviour_mut() - .rendezvous - .register(Namespace::new(self.rendezvous_namespace.to_string()).expect("our namespace to be a correct string"), self.rendezvous_node_peer_id, None); - } } Some(SwarmEvent::IncomingConnectionError { send_back_addr: address, error, .. }) => { tracing::warn!(%address, "Failed to set up connection with peer. Error {:#}", error); diff --git a/swap/src/asb/rendezvous.rs b/swap/src/asb/rendezvous.rs new file mode 100644 index 00000000..2911dbf4 --- /dev/null +++ b/swap/src/asb/rendezvous.rs @@ -0,0 +1,155 @@ +use crate::rendezvous::XmrBtcNamespace; +use anyhow::Result; +use libp2p::core::connection::ConnectionId; +use libp2p::identity::Keypair; +use libp2p::multiaddr::Protocol; +use libp2p::rendezvous::{Event, Namespace}; +use libp2p::swarm::{ + IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, NetworkBehaviourEventProcess, + PollParameters, ProtocolsHandler, +}; +use libp2p::{Multiaddr, PeerId}; +use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; + +/// A `NetworkBehaviour` that handles registration of the xmr-btc swap service with a rendezvous point +pub struct Behaviour { + rendezvous_behaviour: libp2p::rendezvous::Rendezvous, + rendezvous_point_peer_id: PeerId, + rendezvous_point_addr: Multiaddr, + rendezvous_namespace: XmrBtcNamespace, + rendezvous_reregister_timestamp: Option, + is_connected: bool, + events: Vec>, +} + +impl Behaviour { + pub fn new( + keypair: Keypair, + peer_id: PeerId, + addr: Multiaddr, + namespace: XmrBtcNamespace, + ) -> Self { + Self { + rendezvous_behaviour: libp2p::rendezvous::Rendezvous::new( + keypair, + libp2p::rendezvous::Config::default(), + ), + rendezvous_point_peer_id: peer_id, + rendezvous_point_addr: addr, + rendezvous_namespace: namespace, + rendezvous_reregister_timestamp: None, + is_connected: false, + events: vec![], + } + } + + pub fn refresh_registration(&mut self) -> Result<()> { + if self.is_connected { + if let Some(rendezvous_reregister_timestamp) = self.rendezvous_reregister_timestamp { + if Instant::now() > rendezvous_reregister_timestamp { + self.rendezvous_behaviour.register( + Namespace::new(self.rendezvous_namespace.to_string()) + .expect("our namespace to be a correct string"), + self.rendezvous_point_peer_id, + None, + )?; + } + } + } else { + let p2p_suffix = Protocol::P2p(self.rendezvous_point_peer_id.into()); + let address_with_p2p = if !self + .rendezvous_point_addr + .ends_with(&Multiaddr::empty().with(p2p_suffix.clone())) + { + self.rendezvous_point_addr.clone().with(p2p_suffix) + } else { + self.rendezvous_point_addr.clone() + }; + self.events.push(NetworkBehaviourAction::DialAddress { + address: address_with_p2p, + }) + } + Ok(()) + } +} + +type BehaviourInEvent = +<<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent; + +impl NetworkBehaviour for Behaviour { + type ProtocolsHandler = ::ProtocolsHandler; + type OutEvent = (); + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + ::ProtocolsHandler::default() + } + + fn addresses_of_peer(&mut self, _peer_id: &PeerId) -> Vec { + vec![] + } + + fn inject_connected(&mut self, peer_id: &PeerId) { + if *peer_id == self.rendezvous_point_peer_id { + self.is_connected = true; + } + } + + fn inject_disconnected(&mut self, peer_id: &PeerId) { + if *peer_id == self.rendezvous_point_peer_id { + self.is_connected = false; + } + } + + fn inject_event( + &mut self, + _peer_id: PeerId, + _connection: ConnectionId, + _event: <::ProtocolsHandler as ProtocolsHandler>::OutEvent, + ) { + } + + fn poll( + &mut self, + _cx: &mut Context<'_>, + _params: &mut impl PollParameters, + ) -> Poll> { + if let Some(event) = self.events.pop() { + return Poll::Ready(event); + } + Poll::Pending + } +} + +impl NetworkBehaviourEventProcess for Behaviour { + fn inject_event(&mut self, event: Event) { + match event { + Event::RegisterFailed(error) => { + tracing::error!(rendezvous_node=%self.rendezvous_point_peer_id, "Registration with rendezvous node failed: {:#}", error); + } + Event::RegistrationExpired(registration) => { + tracing::warn!("Registation expired: {:?}", registration) + } + Event::Registered { + rendezvous_node, + ttl, + namespace, + } => { + // TODO: this can most likely not happen at all, potentially remove these checks + if rendezvous_node != self.rendezvous_point_peer_id { + tracing::error!(peer_id=%rendezvous_node, "Ignoring message from unknown rendezvous node"); + } + + // TODO: Consider implementing From for Namespace and XmrBtcNamespace + if namespace.to_string() != self.rendezvous_namespace.to_string() { + tracing::error!(peer_id=%rendezvous_node, %namespace, "Ignoring message from rendezvous node for unknown namespace"); + } + + // record re-registration after half the ttl has expired + self.rendezvous_reregister_timestamp = + Some(Instant::now() + Duration::from_secs(ttl) / 2); + } + _ => {} + } + } +} diff --git a/swap/src/bin/asb.rs b/swap/src/bin/asb.rs index 61ef0c41..c6cced35 100644 --- a/swap/src/bin/asb.rs +++ b/swap/src/bin/asb.rs @@ -32,7 +32,6 @@ use swap::database::Database; use swap::monero::Amount; use swap::network::swarm; use swap::protocol::alice::run; -use swap::rendezvous::XmrBtcNamespace; use swap::seed::Seed; use swap::tor::AuthenticatedClient; use swap::{asb, bitcoin, kraken, monero, tor}; @@ -156,6 +155,7 @@ async fn main() -> Result<()> { kraken_rate.clone(), resume_only, env_config, + config.rendezvous, )?; for listen in config.network.listen.clone() { @@ -165,26 +165,10 @@ async fn main() -> Result<()> { tracing::info!(peer_id = %swarm.local_peer_id(), "Network layer initialized"); - // todo: Option is being used as a rendezvous feature toggle. - // The fact that rendezvous is an optional feature could be expressed better. if let Some(addr) = external_addr { let _ = Swarm::add_external_address(&mut swarm, addr, AddressScore::Infinite); - Swarm::dial_addr(&mut swarm, config.rendezvous_node.addr.clone()).with_context( - || { - format!( - "Failed to dial rendezvous node addr {}", - config.rendezvous_node.addr - ) - }, - )?; } - let namespace = if testnet { - XmrBtcNamespace::Testnet - } else { - XmrBtcNamespace::Mainnet - }; - let (event_loop, mut swap_receiver) = EventLoop::new( swarm, env_config, @@ -194,9 +178,6 @@ async fn main() -> Result<()> { kraken_rate.clone(), config.maker.min_buy_btc, config.maker.max_buy_btc, - config.rendezvous_node.peer_id, - config.rendezvous_node.addr, - namespace, ) .unwrap(); diff --git a/swap/src/network/swarm.rs b/swap/src/network/swarm.rs index 20cd4a24..c7f611bf 100644 --- a/swap/src/network/swarm.rs +++ b/swap/src/network/swarm.rs @@ -14,6 +14,7 @@ pub fn asb( latest_rate: LR, resume_only: bool, env_config: env::Config, + rendezvous_config: Option, ) -> Result>> where LR: LatestRate + Send + 'static + Debug + Clone, @@ -27,6 +28,7 @@ where resume_only, env_config, identity.clone(), + rendezvous_config, ); let transport = asb::transport::new(&identity)?; diff --git a/swap/tests/harness/mod.rs b/swap/tests/harness/mod.rs index d08e9c88..c6e4bdfa 100644 --- a/swap/tests/harness/mod.rs +++ b/swap/tests/harness/mod.rs @@ -22,7 +22,6 @@ use swap::network::swarm; use swap::protocol::alice::{AliceState, Swap}; use swap::protocol::bob::BobState; use swap::protocol::{alice, bob}; -use swap::rendezvous::XmrBtcNamespace; use swap::seed::Seed; use swap::{asb, bitcoin, cli, env, monero}; use tempfile::tempdir; @@ -157,13 +156,16 @@ async fn init_containers(cli: &Cli) -> (Monero, Containers<'_>) { .await .unwrap(); - (monero, Containers { - bitcoind_url, - bitcoind, - monerod_container, - monero_wallet_rpc_containers, - electrs, - }) + ( + monero, + Containers { + bitcoind_url, + bitcoind, + monerod_container, + monero_wallet_rpc_containers, + electrs, + }, + ) } async fn init_bitcoind_container( @@ -237,6 +239,7 @@ async fn start_alice( latest_rate, resume_only, env_config, + None, ) .unwrap(); swarm.listen_on(listen_address).unwrap(); @@ -250,9 +253,6 @@ async fn start_alice( FixedRate::default(), min_buy, max_buy, - PeerId::random(), - Multiaddr::empty(), - XmrBtcNamespace::Testnet, ) .unwrap();