From adc5e926ce4b10891bc0c83d075d1d040f609bb2 Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Wed, 30 Jun 2021 18:29:51 +1000 Subject: [PATCH] fixup! Add the `list-sellers` command to the CLI --- swap/src/cli/list_sellers.rs | 106 ++++++++++++++++++++++++++--------- 1 file changed, 79 insertions(+), 27 deletions(-) diff --git a/swap/src/cli/list_sellers.rs b/swap/src/cli/list_sellers.rs index ec2bfffe..8963b82e 100644 --- a/swap/src/cli/list_sellers.rs +++ b/swap/src/cli/list_sellers.rs @@ -4,11 +4,13 @@ use crate::rendezvous::XmrBtcNamespace; use anyhow::Result; use futures::StreamExt; use libp2p::multiaddr::Protocol; +use libp2p::ping::{Ping, PingConfig, PingEvent}; use libp2p::rendezvous::{Namespace, Rendezvous}; use libp2p::request_response::{RequestResponseEvent, RequestResponseMessage}; use libp2p::swarm::SwarmEvent; use libp2p::{identity, rendezvous, Multiaddr, PeerId, Swarm}; use std::collections::HashMap; +use std::time::Duration; pub async fn list_sellers( rendezvous_node_peer_id: PeerId, @@ -20,6 +22,11 @@ pub async fn list_sellers( let behaviour = Behaviour { rendezvous: Rendezvous::new(identity.clone(), rendezvous::Config::default()), quote: quote::cli(), + ping: Ping::new( + PingConfig::new() + .with_keep_alive(false) + .with_interval(Duration::from_secs(3_154_000_000)), + ), }; let mut swarm = swarm::cli(identity, tor_socks5_port, behaviour).await?; @@ -46,6 +53,7 @@ pub struct Seller { pub enum OutEvent { Rendezvous(rendezvous::Event), Quote(quote::OutEvent), + Ping(PingEvent), } impl From for OutEvent { @@ -66,6 +74,7 @@ impl From for OutEvent { pub struct Behaviour { pub rendezvous: Rendezvous, pub quote: quote::Behaviour, + pub ping: Ping, } #[derive(Debug)] @@ -74,6 +83,11 @@ enum QuoteStatus { Received(BidQuote), } +enum State { + WaitForDiscovery, + WaitForQuoteCompletion, +} + pub struct EventLoop { swarm: Swarm, rendezvous_peer_id: PeerId, @@ -81,6 +95,7 @@ pub struct EventLoop { namespace: XmrBtcNamespace, asb_address: HashMap, asb_quote_status: HashMap, + state: State, } impl EventLoop { @@ -97,6 +112,7 @@ impl EventLoop { namespace, asb_address: Default::default(), asb_quote_status: Default::default(), + state: State::WaitForDiscovery, } } @@ -123,20 +139,32 @@ impl EventLoop { self.asb_address.insert(peer_id, address.clone()); } } - SwarmEvent::UnreachableAddr { error, address, .. } - | SwarmEvent::UnknownPeerUnreachableAddr { error, address, .. } => { + SwarmEvent::UnreachableAddr { peer_id, error, address, .. } => { if address == self.rendezvous_addr { tracing::error!( "Failed to connect to rendezvous point at {}: {}", address, error ); - todo!("Better error handling, return with error") + + // if the rendezvous node is unreachable we just stop + return Vec::new(); + } else { + tracing::debug!( + "Failed to connect to peer at {}: {}", + address, + error + ); + + // if a different peer than the rendezvous node is unreachable (i.e. a seller) we remove that seller from the quote status state + self.asb_quote_status.remove(&peer_id); } } SwarmEvent::Behaviour(OutEvent::Rendezvous( rendezvous::Event::Discovered { registrations, .. }, )) => { + self.state = State::WaitForQuoteCompletion; + for registration in registrations { let peer = registration.record.peer_id(); for address in registration.record.addresses() { @@ -175,10 +203,21 @@ impl EventLoop { } } RequestResponseEvent::OutboundFailure { peer, error, .. } => { - tracing::debug!(%peer, "Ignoring seller, because unable to request quote: {:#}", error); - self.asb_quote_status.remove(&peer); + if peer == self.rendezvous_peer_id { + tracing::debug!(%peer, "Outbound failure when communicating with rendezvous node: {:#}", error); + } else { + tracing::debug!(%peer, "Ignoring seller, because unable to request quote: {:#}", error); + self.asb_quote_status.remove(&peer); + } } - RequestResponseEvent::InboundFailure { .. } => unreachable!(), + RequestResponseEvent::InboundFailure { peer, error, .. } => { + if peer == self.rendezvous_peer_id { + tracing::debug!(%peer, "Inbound failure when communicating with rendezvous node: {:#}", error); + } else { + tracing::debug!(%peer, "Ignoring seller, because unable to request quote: {:#}", error); + self.asb_quote_status.remove(&peer); + } + }, RequestResponseEvent::ResponseSent { .. } => unreachable!() } } @@ -187,32 +226,45 @@ impl EventLoop { } } - let all_quotes_fetched = self - .asb_quote_status - .iter() - .map(|(peer_id, quote_status)| match quote_status { - QuoteStatus::Pending => Err(StillPending {}), - QuoteStatus::Received(quote) => { - let address = self - .asb_address - .get(&peer_id) - .expect("if we got a quote we must have stored an address"); - - Ok(Seller { - peer_id: *peer_id, - multiaddr: address.clone(), - quote: *quote, + match self.state { + State::WaitForDiscovery => { + continue; + } + State::WaitForQuoteCompletion => { + let all_quotes_fetched = self + .asb_quote_status + .iter() + .map(|(peer_id, quote_status)| match quote_status { + QuoteStatus::Pending => Err(StillPending {}), + QuoteStatus::Received(quote) => { + let address = self + .asb_address + .get(&peer_id) + .expect("if we got a quote we must have stored an address"); + + Ok(Seller { + peer_id: *peer_id, + multiaddr: address.clone(), + quote: *quote, + }) + } }) - } - }) - .collect::, _>>(); + .collect::, _>>(); - match all_quotes_fetched { - Ok(sellers) => break sellers, - Err(StillPending {}) => continue, + match all_quotes_fetched { + Ok(sellers) => break sellers, + Err(StillPending {}) => continue, + } + } } } } } struct StillPending {} + +impl From for OutEvent { + fn from(event: PingEvent) -> Self { + OutEvent::Ping(event) + } +}