fixup! Add the `list-sellers` command to the CLI

rendezvous-asb-refactor
Daniel Karzel 3 years ago
parent dcf5923d3e
commit adc5e926ce
No known key found for this signature in database
GPG Key ID: 30C3FC2E438ADB6E

@ -4,11 +4,13 @@ use crate::rendezvous::XmrBtcNamespace;
use anyhow::Result;
use futures::StreamExt;
use libp2p::multiaddr::Protocol;
use libp2p::ping::{Ping, PingConfig, PingEvent};
use libp2p::rendezvous::{Namespace, Rendezvous};
use libp2p::request_response::{RequestResponseEvent, RequestResponseMessage};
use libp2p::swarm::SwarmEvent;
use libp2p::{identity, rendezvous, Multiaddr, PeerId, Swarm};
use std::collections::HashMap;
use std::time::Duration;
pub async fn list_sellers(
rendezvous_node_peer_id: PeerId,
@ -20,6 +22,11 @@ pub async fn list_sellers(
let behaviour = Behaviour {
rendezvous: Rendezvous::new(identity.clone(), rendezvous::Config::default()),
quote: quote::cli(),
ping: Ping::new(
PingConfig::new()
.with_keep_alive(false)
.with_interval(Duration::from_secs(3_154_000_000)),
),
};
let mut swarm = swarm::cli(identity, tor_socks5_port, behaviour).await?;
@ -46,6 +53,7 @@ pub struct Seller {
pub enum OutEvent {
Rendezvous(rendezvous::Event),
Quote(quote::OutEvent),
Ping(PingEvent),
}
impl From<rendezvous::Event> for OutEvent {
@ -66,6 +74,7 @@ impl From<quote::OutEvent> for OutEvent {
pub struct Behaviour {
pub rendezvous: Rendezvous,
pub quote: quote::Behaviour,
pub ping: Ping,
}
#[derive(Debug)]
@ -74,6 +83,11 @@ enum QuoteStatus {
Received(BidQuote),
}
enum State {
WaitForDiscovery,
WaitForQuoteCompletion,
}
pub struct EventLoop {
swarm: Swarm<Behaviour>,
rendezvous_peer_id: PeerId,
@ -81,6 +95,7 @@ pub struct EventLoop {
namespace: XmrBtcNamespace,
asb_address: HashMap<PeerId, Multiaddr>,
asb_quote_status: HashMap<PeerId, QuoteStatus>,
state: State,
}
impl EventLoop {
@ -97,6 +112,7 @@ impl EventLoop {
namespace,
asb_address: Default::default(),
asb_quote_status: Default::default(),
state: State::WaitForDiscovery,
}
}
@ -123,20 +139,32 @@ impl EventLoop {
self.asb_address.insert(peer_id, address.clone());
}
}
SwarmEvent::UnreachableAddr { error, address, .. }
| SwarmEvent::UnknownPeerUnreachableAddr { error, address, .. } => {
SwarmEvent::UnreachableAddr { peer_id, error, address, .. } => {
if address == self.rendezvous_addr {
tracing::error!(
"Failed to connect to rendezvous point at {}: {}",
address,
error
);
todo!("Better error handling, return with error")
// if the rendezvous node is unreachable we just stop
return Vec::new();
} else {
tracing::debug!(
"Failed to connect to peer at {}: {}",
address,
error
);
// if a different peer than the rendezvous node is unreachable (i.e. a seller) we remove that seller from the quote status state
self.asb_quote_status.remove(&peer_id);
}
}
SwarmEvent::Behaviour(OutEvent::Rendezvous(
rendezvous::Event::Discovered { registrations, .. },
)) => {
self.state = State::WaitForQuoteCompletion;
for registration in registrations {
let peer = registration.record.peer_id();
for address in registration.record.addresses() {
@ -175,10 +203,21 @@ impl EventLoop {
}
}
RequestResponseEvent::OutboundFailure { peer, error, .. } => {
tracing::debug!(%peer, "Ignoring seller, because unable to request quote: {:#}", error);
self.asb_quote_status.remove(&peer);
if peer == self.rendezvous_peer_id {
tracing::debug!(%peer, "Outbound failure when communicating with rendezvous node: {:#}", error);
} else {
tracing::debug!(%peer, "Ignoring seller, because unable to request quote: {:#}", error);
self.asb_quote_status.remove(&peer);
}
}
RequestResponseEvent::InboundFailure { .. } => unreachable!(),
RequestResponseEvent::InboundFailure { peer, error, .. } => {
if peer == self.rendezvous_peer_id {
tracing::debug!(%peer, "Inbound failure when communicating with rendezvous node: {:#}", error);
} else {
tracing::debug!(%peer, "Ignoring seller, because unable to request quote: {:#}", error);
self.asb_quote_status.remove(&peer);
}
},
RequestResponseEvent::ResponseSent { .. } => unreachable!()
}
}
@ -187,32 +226,45 @@ impl EventLoop {
}
}
let all_quotes_fetched = self
.asb_quote_status
.iter()
.map(|(peer_id, quote_status)| match quote_status {
QuoteStatus::Pending => Err(StillPending {}),
QuoteStatus::Received(quote) => {
let address = self
.asb_address
.get(&peer_id)
.expect("if we got a quote we must have stored an address");
Ok(Seller {
peer_id: *peer_id,
multiaddr: address.clone(),
quote: *quote,
match self.state {
State::WaitForDiscovery => {
continue;
}
State::WaitForQuoteCompletion => {
let all_quotes_fetched = self
.asb_quote_status
.iter()
.map(|(peer_id, quote_status)| match quote_status {
QuoteStatus::Pending => Err(StillPending {}),
QuoteStatus::Received(quote) => {
let address = self
.asb_address
.get(&peer_id)
.expect("if we got a quote we must have stored an address");
Ok(Seller {
peer_id: *peer_id,
multiaddr: address.clone(),
quote: *quote,
})
}
})
}
})
.collect::<Result<Vec<_>, _>>();
.collect::<Result<Vec<_>, _>>();
match all_quotes_fetched {
Ok(sellers) => break sellers,
Err(StillPending {}) => continue,
match all_quotes_fetched {
Ok(sellers) => break sellers,
Err(StillPending {}) => continue,
}
}
}
}
}
}
struct StillPending {}
impl From<PingEvent> for OutEvent {
fn from(event: PingEvent) -> Self {
OutEvent::Ping(event)
}
}

Loading…
Cancel
Save