diff --git a/Cargo.lock b/Cargo.lock index 3cc7ebf5..f226be16 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -294,6 +294,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4d73a8ae8ce52d09395e4cafc83b5b81c3deb70a97740e907669c8683c4dd50a" +[[package]] +name = "bimap" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50ae17cabbc8a38a1e3e4c1a6a664e9a09672dc14d0896fa8d865d3a5a446b07" + [[package]] name = "bincode" version = "1.3.1" @@ -1782,6 +1788,7 @@ dependencies = [ "libp2p-mplex", "libp2p-noise", "libp2p-ping", + "libp2p-rendezvous", "libp2p-request-response", "libp2p-swarm", "libp2p-swarm-derive", @@ -1892,6 +1899,28 @@ dependencies = [ "wasm-timer", ] +[[package]] +name = "libp2p-rendezvous" +version = "0.1.0" +source = "git+https://github.com/comit-network/rust-libp2p?branch=rendezvous#c3cd411cedb12eab72ba26b08c43c2467a8fd8e9" +dependencies = [ + "asynchronous-codec", + "bimap", + "futures", + "libp2p-core", + "libp2p-swarm", + "log 0.4.14", + "prost", + "prost-build", + "rand 0.8.3", + "sha2 0.9.5", + "thiserror", + "tokio", + "unsigned-varint 0.7.0", + "uuid", + "void", +] + [[package]] name = "libp2p-request-response" version = "0.12.0" diff --git a/swap/Cargo.toml b/swap/Cargo.toml index fb3c26e6..8e61a479 100644 --- a/swap/Cargo.toml +++ b/swap/Cargo.toml @@ -29,7 +29,7 @@ ecdsa_fun = { git = "https://github.com/LLFourn/secp256kfun", default-features = ed25519-dalek = "1" futures = { version = "0.3", default-features = false } itertools = "0.10" -libp2p = { git = "https://github.com/comit-network/rust-libp2p", branch = "rendezvous", default-features = false, features = [ "tcp-tokio", "yamux", "mplex", "dns-tokio", "noise", "request-response", "websocket", "ping" ] } +libp2p = { git = "https://github.com/comit-network/rust-libp2p", branch = "rendezvous", default-features = false, features = [ "tcp-tokio", "yamux", "mplex", "dns-tokio", "noise", "request-response", "websocket", "ping", "rendezvous" ] } miniscript = { version = "5", features = [ "serde" ] } monero = { version = "0.12", features = [ "serde_support" ] } monero-rpc = { path = "../monero-rpc" } diff --git a/swap/src/bin/swap.rs b/swap/src/bin/swap.rs index fda5ac3a..3e9d730b 100644 --- a/swap/src/bin/swap.rs +++ b/swap/src/bin/swap.rs @@ -24,7 +24,7 @@ use std::sync::Arc; use std::time::Duration; use swap::bitcoin::TxLock; use swap::cli::command::{parse_args_and_apply_defaults, Arguments, Command, ParseResult}; -use swap::cli::EventLoop; +use swap::cli::{list_sellers, EventLoop}; use swap::database::Database; use swap::env::Config; use swap::network::quote::BidQuote; @@ -68,7 +68,7 @@ async fn main() -> Result<()> { } => { let swap_id = Uuid::new_v4(); - cli::tracing::init(debug, json, data_dir.join("logs"), swap_id)?; + cli::tracing::init(debug, json, data_dir.join("logs"), Some(swap_id))?; let db = Database::open(data_dir.join("database").as_path()) .context("Failed to open database")?; let seed = Seed::from_file_or_generate(data_dir.as_path()) @@ -159,7 +159,7 @@ async fn main() -> Result<()> { monero_daemon_address, tor_socks5_port, } => { - cli::tracing::init(debug, json, data_dir.join("logs"), swap_id)?; + cli::tracing::init(debug, json, data_dir.join("logs"), Some(swap_id))?; let db = Database::open(data_dir.join("database").as_path()) .context("Failed to open database")?; let seed = Seed::from_file_or_generate(data_dir.as_path()) @@ -221,7 +221,7 @@ async fn main() -> Result<()> { bitcoin_electrum_rpc_url, bitcoin_target_block, } => { - cli::tracing::init(debug, json, data_dir.join("logs"), swap_id)?; + cli::tracing::init(debug, json, data_dir.join("logs"), Some(swap_id))?; let db = Database::open(data_dir.join("database").as_path()) .context("Failed to open database")?; let seed = Seed::from_file_or_generate(data_dir.as_path()) @@ -253,7 +253,7 @@ async fn main() -> Result<()> { bitcoin_electrum_rpc_url, bitcoin_target_block, } => { - cli::tracing::init(debug, json, data_dir.join("logs"), swap_id)?; + cli::tracing::init(debug, json, data_dir.join("logs"), Some(swap_id))?; let db = Database::open(data_dir.join("database").as_path()) .context("Failed to open database")?; let seed = Seed::from_file_or_generate(data_dir.as_path()) @@ -270,6 +270,30 @@ async fn main() -> Result<()> { cli::refund(swap_id, Arc::new(bitcoin_wallet), db, force).await??; } + Command::ListSellers { + rendezvous_node_peer_id, + rendezvous_node_addr, + namespace, + tor_socks5_port, + } => { + cli::tracing::init(debug, json, data_dir.join("logs"), None)?; + let seed = Seed::from_file_or_generate(data_dir.as_path()) + .context("Failed to read in seed file")?; + let identity = seed.derive_libp2p_identity(); + + let makers = list_sellers( + rendezvous_node_peer_id, + rendezvous_node_addr, + namespace, + tor_socks5_port, + identity, + ) + .await?; + + for maker in makers { + tracing::info!(peer_id=%maker.peer_id, multiaddr=%maker.multiaddr, price=%maker.quote.price, max_quantity=%maker.quote.max_quantity, min_quantity=%maker.quote.min_quantity); + } + } }; Ok(()) } diff --git a/swap/src/cli.rs b/swap/src/cli.rs index 30a7f7d6..e19a4d1c 100644 --- a/swap/src/cli.rs +++ b/swap/src/cli.rs @@ -2,6 +2,7 @@ mod behaviour; pub mod cancel; pub mod command; mod event_loop; +mod list_sellers; pub mod refund; pub mod tracing; pub mod transport; @@ -9,4 +10,5 @@ pub mod transport; pub use behaviour::{Behaviour, OutEvent}; pub use cancel::cancel; pub use event_loop::{EventLoop, EventLoopHandle}; +pub use list_sellers::{list_sellers, XmrBtcNamespace}; pub use refund::refund; diff --git a/swap/src/cli/command.rs b/swap/src/cli/command.rs index 815288a3..cf011dd7 100644 --- a/swap/src/cli/command.rs +++ b/swap/src/cli/command.rs @@ -1,3 +1,4 @@ +use crate::cli::list_sellers::XmrBtcNamespace; use crate::env::GetConfig; use crate::fs::system_data_dir; use crate::{env, monero}; @@ -193,6 +194,22 @@ where bitcoin_target_block: bitcoin_target_block_from(bitcoin_target_block, is_testnet), }, }, + RawCommand::ListSellers { + rendezvous_node_peer_id, + rendezvous_node_addr, + tor: Tor { tor_socks5_port }, + } => Arguments { + env_config: env_config_from(is_testnet), + debug, + json, + data_dir: data::data_dir_from(data, is_testnet)?, + cmd: Command::ListSellers { + rendezvous_node_peer_id, + rendezvous_node_addr, + namespace: rendezvous_namespace_from(is_testnet), + tor_socks5_port, + }, + }, }; Ok(ParseResult::Arguments(arguments)) @@ -231,6 +248,12 @@ pub enum Command { bitcoin_electrum_rpc_url: Url, bitcoin_target_block: usize, }, + ListSellers { + rendezvous_node_peer_id: PeerId, + rendezvous_node_addr: Multiaddr, + namespace: XmrBtcNamespace, + tor_socks5_port: u16, + }, } #[derive(structopt::StructOpt, Debug)] @@ -324,6 +347,24 @@ pub enum RawCommand { #[structopt(flatten)] bitcoin: Bitcoin, }, + ListSellers { + // TODO: sane default value + #[structopt( + long, + help = "The peer-id of a rendezvous node that sellers register with" + )] + rendezvous_node_peer_id: PeerId, + + // TODO: sane default value + #[structopt( + long, + help = "The multiaddr of a rendezvous node that sellers register with" + )] + rendezvous_node_addr: Multiaddr, + + #[structopt(flatten)] + tor: Tor, + }, } #[derive(structopt::StructOpt, Debug)] @@ -416,6 +457,14 @@ fn bitcoin_electrum_rpc_url_from(url: Option, testnet: bool) -> Result } } +fn rendezvous_namespace_from(is_testnet: bool) -> XmrBtcNamespace { + if is_testnet { + XmrBtcNamespace::Testnet + } else { + XmrBtcNamespace::Mainnet + } +} + fn bitcoin_target_block_from(target_block: Option, testnet: bool) -> usize { if let Some(target_block) = target_block { target_block diff --git a/swap/src/cli/list_sellers.rs b/swap/src/cli/list_sellers.rs new file mode 100644 index 00000000..4a215353 --- /dev/null +++ b/swap/src/cli/list_sellers.rs @@ -0,0 +1,233 @@ +use crate::network::quote::BidQuote; +use crate::network::{quote, swarm}; +use anyhow::Result; +use futures::StreamExt; +use libp2p::multiaddr::Protocol; +use libp2p::rendezvous::{Namespace, Rendezvous}; +use libp2p::request_response::{RequestResponseEvent, RequestResponseMessage}; +use libp2p::swarm::SwarmEvent; +use libp2p::{identity, rendezvous, Multiaddr, PeerId, Swarm}; +use std::collections::HashMap; +use std::fmt::{Display, Formatter}; + +pub async fn list_sellers( + rendezvous_node_peer_id: PeerId, + rendezvous_node_addr: Multiaddr, + namespace: XmrBtcNamespace, + tor_socks5_port: u16, + identity: identity::Keypair, +) -> Result> { + let behaviour = Behaviour { + rendezvous: Rendezvous::new(identity.clone(), rendezvous::Config::default()), + quote: quote::cli(), + }; + let mut swarm = swarm::cli(identity, tor_socks5_port, behaviour).await?; + + let _ = swarm.dial_addr(rendezvous_node_addr.clone()); + + let event_loop = EventLoop::new( + swarm, + rendezvous_node_peer_id, + rendezvous_node_addr, + namespace, + ); + let makers = event_loop.run().await; + + Ok(makers) +} + +pub struct Seller { + pub peer_id: PeerId, + pub multiaddr: Multiaddr, + pub quote: BidQuote, +} + +#[derive(Debug, PartialEq)] +pub enum XmrBtcNamespace { + Mainnet, + Testnet, +} + +impl Display for XmrBtcNamespace { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + XmrBtcNamespace::Mainnet => write!(f, "xmr-btc-swap-mainnet"), + XmrBtcNamespace::Testnet => write!(f, "xmr-btc-swap-mainnet"), + } + } +} + +#[derive(Debug)] +pub enum OutEvent { + Rendezvous(rendezvous::Event), + Quote(quote::OutEvent), +} + +impl From for OutEvent { + fn from(event: rendezvous::Event) -> Self { + OutEvent::Rendezvous(event) + } +} + +impl From for OutEvent { + fn from(event: quote::OutEvent) -> Self { + OutEvent::Quote(event) + } +} + +#[derive(libp2p::NetworkBehaviour)] +#[behaviour(event_process = false)] +#[behaviour(out_event = "OutEvent")] +pub struct Behaviour { + pub rendezvous: Rendezvous, + pub quote: quote::Behaviour, +} + +#[derive(Debug)] +enum QuoteStatus { + Pending, + Received(BidQuote), +} + +pub struct EventLoop { + swarm: Swarm, + rendezvous_peer_id: PeerId, + rendezvous_addr: Multiaddr, + namespace: XmrBtcNamespace, + asb_address: HashMap, + asb_quote_status: HashMap, +} + +impl EventLoop { + pub fn new( + swarm: Swarm, + rendezvous_peer_id: PeerId, + rendezvous_addr: Multiaddr, + namespace: XmrBtcNamespace, + ) -> Self { + Self { + swarm, + rendezvous_peer_id, + rendezvous_addr, + namespace, + asb_address: Default::default(), + asb_quote_status: Default::default(), + } + } + + pub async fn run(mut self) -> Vec { + loop { + tokio::select! { + swarm_event = self.swarm.select_next_some() => { + match swarm_event { + SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } => { + if peer_id == self.rendezvous_peer_id{ + tracing::info!( + "Connected to rendezvous point, discovering nodes in '{}' namespace ...", + self.namespace + ); + + self.swarm.behaviour_mut().rendezvous.discover( + Some(Namespace::new(self.namespace.to_string()).expect("our namespace to be a correct string")), + None, + None, + self.rendezvous_peer_id, + ); + } else { + let address = endpoint.get_remote_address(); + self.asb_address.insert(peer_id, address.clone()); + } + } + SwarmEvent::UnreachableAddr { error, address, .. } + | SwarmEvent::UnknownPeerUnreachableAddr { error, address, .. } => { + if address == self.rendezvous_addr { + tracing::error!( + "Failed to connect to rendezvous point at {}: {}", + address, + error + ); + todo!("Better error handling, return with error") + } + } + SwarmEvent::Behaviour(OutEvent::Rendezvous( + rendezvous::Event::Discovered { registrations, .. }, + )) => { + for registration in registrations { + let peer = registration.record.peer_id(); + for address in registration.record.addresses() { + tracing::info!("Discovered peer {} at {}", peer, address); + + let p2p_suffix = Protocol::P2p(*peer.as_ref()); + let _address_with_p2p = if !address + .ends_with(&Multiaddr::empty().with(p2p_suffix.clone())) + { + address.clone().with(p2p_suffix) + } else { + address.clone() + }; + + self.asb_quote_status.insert(peer, QuoteStatus::Pending); + + // add all external addresses of that peer to the quote behaviour + self.swarm.behaviour_mut().quote.add_address(&peer, address.clone()); + } + + // request the quote, if we are not connected to the peer it will be dialed automatically + let _request_id = self.swarm.behaviour_mut().quote.send_request(&peer, ()); + } + } + SwarmEvent::Behaviour(OutEvent::Quote(quote_response)) => { + match quote_response { + RequestResponseEvent::Message { peer, message } => { + match message { + RequestResponseMessage::Response { response, .. } => { + if self.asb_quote_status.insert(peer, QuoteStatus::Received(response)).is_none() { + tracing::error!(%peer, "Received bid quote from unexpected peer, this record will be removed!"); + self.asb_quote_status.remove(&peer); + } + } + RequestResponseMessage::Request { .. } => unreachable!() + } + } + RequestResponseEvent::OutboundFailure { peer, error, .. } => { + tracing::debug!(%peer, "Ignoring seller, because unable to request quote: {:#}", error); + self.asb_quote_status.remove(&peer); + } + RequestResponseEvent::InboundFailure { .. } => unreachable!(), + RequestResponseEvent::ResponseSent { .. } => unreachable!() + } + } + _ => {} + } + } + } + + let all_quotes_fetched = self + .asb_quote_status + .iter() + .map(|(peer_id, quote_status)| match quote_status { + QuoteStatus::Pending => Err(StillPending {}), + QuoteStatus::Received(quote) => { + let address = self + .asb_address + .get(&peer_id) + .expect("if we got a quote we must have stored an address"); + + Ok(Seller { + peer_id: *peer_id, + multiaddr: address.clone(), + quote: *quote, + }) + } + }) + .collect::, _>>(); + + match all_quotes_fetched { + Ok(makers) => break makers, + Err(StillPending {}) => continue, + } + } + } +} + +struct StillPending {} diff --git a/swap/src/cli/tracing.rs b/swap/src/cli/tracing.rs index 733b263e..1b9620f5 100644 --- a/swap/src/cli/tracing.rs +++ b/swap/src/cli/tracing.rs @@ -1,4 +1,5 @@ use anyhow::Result; +use std::option::Option::Some; use std::path::Path; use tracing::subscriber::set_global_default; use tracing::{Event, Level, Subscriber}; @@ -8,7 +9,7 @@ use tracing_subscriber::layer::{Context, SubscriberExt}; use tracing_subscriber::{fmt, EnvFilter, FmtSubscriber, Layer, Registry}; use uuid::Uuid; -pub fn init(debug: bool, json: bool, dir: impl AsRef, swap_id: Uuid) -> Result<()> { +pub fn init(debug: bool, json: bool, dir: impl AsRef, swap_id: Option) -> Result<()> { if json { let level = if debug { Level::DEBUG } else { Level::INFO }; @@ -24,7 +25,7 @@ pub fn init(debug: bool, json: bool, dir: impl AsRef, swap_id: Uuid) -> Re .init(); Ok(()) - } else { + } else if let Some(swap_id) = swap_id { let level_filter = EnvFilter::try_new("swap=debug")?; let registry = Registry::default().with(level_filter); @@ -45,6 +46,19 @@ pub fn init(debug: bool, json: bool, dir: impl AsRef, swap_id: Uuid) -> Re set_global_default(registry.with(file_logger).with(info_terminal_printer()))?; } + Ok(()) + } else { + let level = if debug { Level::DEBUG } else { Level::INFO }; + let is_terminal = atty::is(atty::Stream::Stderr); + + FmtSubscriber::builder() + .with_env_filter(format!("swap={}", level)) + .with_writer(std::io::stderr) + .with_ansi(is_terminal) + .with_timer(ChronoLocal::with_format("%F %T".to_owned())) + .with_target(false) + .init(); + Ok(()) } } diff --git a/swap/src/network/quote.rs b/swap/src/network/quote.rs index 76c1ffc5..fc2d93f4 100644 --- a/swap/src/network/quote.rs +++ b/swap/src/network/quote.rs @@ -9,7 +9,7 @@ use libp2p::PeerId; use serde::{Deserialize, Serialize}; const PROTOCOL: &str = "/comit/xmr/btc/bid-quote/1.0.0"; -type OutEvent = RequestResponseEvent<(), BidQuote>; +pub type OutEvent = RequestResponseEvent<(), BidQuote>; type Message = RequestResponseMessage<(), BidQuote>; pub type Behaviour = RequestResponse>; @@ -24,7 +24,7 @@ impl ProtocolName for BidQuoteProtocol { } /// Represents a quote for buying XMR. -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq)] pub struct BidQuote { /// The price at which the maker is willing to buy at. #[serde(with = "::bitcoin::util::amount::serde::as_sat")]