use crate::asb::behaviour::{Behaviour, OutEvent}; use crate::asb::Rate; use crate::database::Database; use crate::network::quote::BidQuote; use crate::network::swap_setup::alice::WalletSnapshot; use crate::network::transfer_proof; use crate::protocol::alice::{AliceState, State3, Swap}; use crate::rendezvous::XmrBtcNamespace; use crate::{bitcoin, env, kraken, monero}; use anyhow::{Context, Result}; use futures::future; use futures::future::{BoxFuture, FutureExt}; use futures::stream::{FuturesUnordered, StreamExt}; use libp2p::rendezvous::Namespace; use libp2p::request_response::{RequestId, ResponseChannel}; use libp2p::swarm::SwarmEvent; use libp2p::{Multiaddr, PeerId, Swarm}; use rust_decimal::Decimal; use std::collections::HashMap; use std::convert::Infallible; use std::fmt::Debug; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::mpsc; use uuid::Uuid; /// A future that resolves to a tuple of `PeerId`, `transfer_proof::Request` and /// `Responder`. /// /// When this future resolves, the `transfer_proof::Request` shall be sent to /// the peer identified by the `PeerId`. Once the request has been acknowledged /// by the peer, i.e. a `()` response has been received, the `Responder` shall /// be used to let the original sender know about the successful transfer. type OutgoingTransferProof = BoxFuture<'static, Result<(PeerId, transfer_proof::Request, bmrng::Responder<()>)>>; #[allow(missing_debug_implementations)] pub struct EventLoop where LR: LatestRate + Send + 'static + Debug + Clone, { swarm: libp2p::Swarm>, env_config: env::Config, bitcoin_wallet: Arc, monero_wallet: Arc, db: Arc, latest_rate: LR, min_buy: bitcoin::Amount, max_buy: bitcoin::Amount, swap_sender: mpsc::Sender, /// Stores incoming [`EncryptedSignature`]s per swap. recv_encrypted_signature: HashMap>, inflight_encrypted_signatures: FuturesUnordered>>, send_transfer_proof: FuturesUnordered, /// Tracks [`transfer_proof::Request`]s which could not yet be sent because /// we are currently disconnected from the peer. buffered_transfer_proofs: HashMap)>>, /// Tracks [`transfer_proof::Request`]s which are currently inflight and /// awaiting an acknowledgement. inflight_transfer_proofs: HashMap>, // TODO: Potentially group together, potentially the whole rendezvous handling could go into a // separate swarm? There is no dependency between swap and registration, so we could just // build this completely separate. rendezvous_node_peer_id: PeerId, rendezvous_node_addr: Multiaddr, rendezvous_namespace: XmrBtcNamespace, rendezvous_reregister_timestamp: Option, } impl EventLoop where LR: LatestRate + Send + 'static + Debug + Clone, { #[allow(clippy::too_many_arguments)] pub fn new( swarm: Swarm>, env_config: env::Config, bitcoin_wallet: Arc, monero_wallet: Arc, db: Arc, latest_rate: LR, min_buy: bitcoin::Amount, max_buy: bitcoin::Amount, rendezvous_node_peer_id: PeerId, rendezvous_node_addr: Multiaddr, rendezvous_namespace: XmrBtcNamespace, ) -> Result<(Self, mpsc::Receiver)> { let swap_channel = MpscChannels::default(); let event_loop = EventLoop { swarm, env_config, bitcoin_wallet, monero_wallet, db, latest_rate, swap_sender: swap_channel.sender, min_buy, max_buy, recv_encrypted_signature: Default::default(), inflight_encrypted_signatures: Default::default(), send_transfer_proof: Default::default(), buffered_transfer_proofs: Default::default(), inflight_transfer_proofs: Default::default(), rendezvous_node_peer_id, rendezvous_node_addr, rendezvous_namespace, rendezvous_reregister_timestamp: None, }; Ok((event_loop, swap_channel.receiver)) } pub fn peer_id(&self) -> PeerId { *Swarm::local_peer_id(&self.swarm) } pub async fn run(mut self) { // ensure that these streams are NEVER empty, otherwise it will // terminate forever. self.send_transfer_proof.push(future::pending().boxed()); self.inflight_encrypted_signatures .push(future::pending().boxed()); let unfinished_swaps = match self.db.unfinished_alice() { Ok(unfinished_swaps) => unfinished_swaps, Err(_) => { tracing::error!("Failed to load unfinished swaps"); return; } }; for (swap_id, state) in unfinished_swaps { let peer_id = match self.db.get_peer_id(swap_id) { Ok(peer_id) => peer_id, Err(_) => { tracing::warn!(%swap_id, "Resuming swap skipped because no peer-id found for swap in database"); continue; } }; let handle = self.new_handle(peer_id, swap_id); let swap = Swap { event_loop_handle: handle, bitcoin_wallet: self.bitcoin_wallet.clone(), monero_wallet: self.monero_wallet.clone(), env_config: self.env_config, db: self.db.clone(), state: state.into(), swap_id, }; match self.swap_sender.send(swap).await { Ok(_) => tracing::info!(%swap_id, "Resuming swap"), Err(_) => { tracing::warn!(%swap_id, "Failed to resume swap because receiver has been dropped") } } } loop { // rendezvous node re-registration if let Some(rendezvous_reregister_timestamp) = self.rendezvous_reregister_timestamp { if Instant::now() > rendezvous_reregister_timestamp { if self.swarm.is_connected(&self.rendezvous_node_peer_id) { self.swarm.behaviour_mut().rendezvous.register( Namespace::new(self.rendezvous_namespace.to_string()) .expect("our namespace to be a correct string"), self.rendezvous_node_peer_id, None, ); } else { match Swarm::dial_addr(&mut self.swarm, self.rendezvous_node_addr.clone()) { Ok(()) => {} Err(error) => { tracing::error!( "Failed to redial rendezvous node for re-registration: {:#}", error ); } } } } } tokio::select! { swarm_event = self.swarm.next() => { match swarm_event { Some(SwarmEvent::Behaviour(OutEvent::SwapSetupInitiated { mut send_wallet_snapshot })) => { let (btc, responder) = match send_wallet_snapshot.recv().await { Ok((btc, responder)) => (btc, responder), Err(error) => { tracing::error!("Swap request will be ignored because of a failure when requesting information for the wallet snapshot: {:#}", error); continue; } }; let wallet_snapshot = match WalletSnapshot::capture(&self.bitcoin_wallet, &self.monero_wallet, btc).await { Ok(wallet_snapshot) => wallet_snapshot, Err(error) => { tracing::error!("Swap request will be ignored because we were unable to create wallet snapshot for swap: {:#}", error); continue; } }; // Ignore result, we should never hit this because the receiver will alive as long as the connection is. let _ = responder.respond(wallet_snapshot); } Some(SwarmEvent::Behaviour(OutEvent::SwapSetupCompleted{peer_id, swap_id, state3})) => { let _ = self.handle_execution_setup_done(peer_id, swap_id, *state3).await; } Some(SwarmEvent::Behaviour(OutEvent::SwapDeclined { peer, error })) => { tracing::warn!(%peer, "Ignoring spot price request because: {}", error); } Some(SwarmEvent::Behaviour(OutEvent::QuoteRequested { channel, peer })) => { let quote = match self.make_quote(self.min_buy, self.max_buy).await { Ok(quote) => quote, Err(error) => { tracing::warn!(%peer, "Failed to make quote. Error {:#}", error); continue; } }; if self.swarm.behaviour_mut().quote.send_response(channel, quote).is_err() { tracing::debug!(%peer, "Failed to respond with quote"); } } Some(SwarmEvent::Behaviour(OutEvent::TransferProofAcknowledged { peer, id })) => { tracing::debug!(%peer, "Bob acknowledged transfer proof"); if let Some(responder) = self.inflight_transfer_proofs.remove(&id) { let _ = responder.respond(()); } } Some(SwarmEvent::Behaviour(OutEvent::EncryptedSignatureReceived{ msg, channel, peer })) => { let swap_id = msg.swap_id; let swap_peer = self.db.get_peer_id(swap_id); // Ensure that an incoming encrypted signature is sent by the peer-id associated with the swap let swap_peer = match swap_peer { Ok(swap_peer) => swap_peer, Err(_) => { tracing::warn!( unknown_swap_id = %swap_id, from = %peer, "Ignoring encrypted signature for unknown swap"); continue; } }; if swap_peer != peer { tracing::warn!( %swap_id, received_from = %peer, expected_from = %swap_peer, "Ignoring malicious encrypted signature which was not expected from this peer", ); continue; } let sender = match self.recv_encrypted_signature.remove(&swap_id) { Some(sender) => sender, None => { // TODO: Don't just drop encsig if we currently don't have a running swap for it, save in db tracing::warn!(%swap_id, "No sender for encrypted signature, maybe already handled?"); continue; } }; let mut responder = match sender.send(msg.tx_redeem_encsig).await { Ok(responder) => responder, Err(_) => { tracing::warn!(%swap_id, "Failed to relay encrypted signature to swap"); continue; } }; self.inflight_encrypted_signatures.push(async move { let _ = responder.recv().await; channel }.boxed()); } Some(SwarmEvent::Behaviour(OutEvent::Registered { rendezvous_node, ttl, namespace })) => { // TODO: this can most likely not happen at all, potentially remove these checks if rendezvous_node != self.rendezvous_node_peer_id { tracing::error!(peer_id=%rendezvous_node, "Ignoring message from unknown rendezvous node"); continue; } // TODO: Consider implementing From for Namespace and XmrBtcNamespace if namespace.to_string() != self.rendezvous_namespace.to_string() { tracing::error!(peer_id=%rendezvous_node, %namespace, "Ignoring message from rendezvous node for unknown namespace"); continue; } tracing::info!("Successfully registered with rendezvous node"); // record re-registration after half the ttl has expired self.rendezvous_reregister_timestamp = Some(Instant::now() + Duration::from_secs(ttl) / 2); } Some(SwarmEvent::Behaviour(OutEvent::RegisterFailed(error))) => { tracing::error!(rendezvous_node=%self.rendezvous_node_peer_id, "Registration with rendezvous node failed: {:#}", error); } Some(SwarmEvent::Behaviour(OutEvent::Failure {peer, error})) => { tracing::error!( %peer, "Communication error. Error {:#}", error); } Some(SwarmEvent::ConnectionEstablished { peer_id: peer, endpoint, .. }) => { tracing::debug!(%peer, address = %endpoint.get_remote_address(), "New connection established"); if let Some(transfer_proofs) = self.buffered_transfer_proofs.remove(&peer) { for (transfer_proof, responder) in transfer_proofs { tracing::debug!(%peer, "Found buffered transfer proof for peer"); let id = self.swarm.behaviour_mut().transfer_proof.send_request(&peer, transfer_proof); self.inflight_transfer_proofs.insert(id, responder); } } if peer == self.rendezvous_node_peer_id { self .swarm .behaviour_mut() .rendezvous .register(Namespace::new(self.rendezvous_namespace.to_string()).expect("our namespace to be a correct string"), self.rendezvous_node_peer_id, None); } } Some(SwarmEvent::IncomingConnectionError { send_back_addr: address, error, .. }) => { tracing::warn!(%address, "Failed to set up connection with peer. Error {:#}", error); } Some(SwarmEvent::ConnectionClosed { peer_id: peer, num_established, endpoint, cause: Some(error) }) if num_established == 0 => { tracing::warn!(%peer, address = %endpoint.get_remote_address(), "Lost connection. Error {:#}", error); } Some(SwarmEvent::ConnectionClosed { peer_id: peer, num_established, endpoint, cause: None }) if num_established == 0 => { tracing::info!(%peer, address = %endpoint.get_remote_address(), "Successfully closed connection"); } Some(SwarmEvent::NewListenAddr(address)) => { tracing::info!(%address, "New listen address detected"); } _ => {} } }, next_transfer_proof = self.send_transfer_proof.next() => { match next_transfer_proof { Some(Ok((peer, transfer_proof, responder))) => { if !self.swarm.behaviour_mut().transfer_proof.is_connected(&peer) { tracing::warn!(%peer, "No active connection to peer, buffering transfer proof"); self.buffered_transfer_proofs.entry(peer).or_insert_with(Vec::new).push((transfer_proof, responder)); continue; } let id = self.swarm.behaviour_mut().transfer_proof.send_request(&peer, transfer_proof); self.inflight_transfer_proofs.insert(id, responder); }, Some(Err(error)) => { tracing::debug!("A swap stopped without sending a transfer proof. Error {:#}", error); } None => { unreachable!("stream of transfer proof receivers must never terminate") } } } Some(response_channel) = self.inflight_encrypted_signatures.next() => { let _ = self.swarm.behaviour_mut().encrypted_signature.send_response(response_channel, ()); } } } } async fn make_quote( &mut self, min_buy: bitcoin::Amount, max_buy: bitcoin::Amount, ) -> Result { let rate = self .latest_rate .latest_rate() .context("Failed to get latest rate")?; Ok(BidQuote { price: rate.ask().context("Failed to compute asking price")?, min_quantity: min_buy, max_quantity: max_buy, }) } async fn handle_execution_setup_done( &mut self, bob_peer_id: PeerId, swap_id: Uuid, state3: State3, ) { let handle = self.new_handle(bob_peer_id, swap_id); let initial_state = AliceState::Started { state3: Box::new(state3), }; let swap = Swap { event_loop_handle: handle, bitcoin_wallet: self.bitcoin_wallet.clone(), monero_wallet: self.monero_wallet.clone(), env_config: self.env_config, db: self.db.clone(), state: initial_state, swap_id, }; // TODO: Consider adding separate components for start/resume of swaps // swaps save peer id so we can resume match self.db.insert_peer_id(swap_id, bob_peer_id).await { Ok(_) => { if let Err(error) = self.swap_sender.send(swap).await { tracing::warn!(%swap_id, "Swap cannot be spawned: {}", error); } } Err(error) => { tracing::warn!(%swap_id, "Unable to save peer-id, swap cannot be spawned: {}", error); } } } /// Create a new [`EventLoopHandle`] that is scoped for communication with /// the given peer. fn new_handle(&mut self, peer: PeerId, swap_id: Uuid) -> EventLoopHandle { // we deliberately don't put timeouts on these channels because the swap always // races these futures against a timelock let (transfer_proof_sender, mut transfer_proof_receiver) = bmrng::channel(1); let encrypted_signature = bmrng::channel(1); self.recv_encrypted_signature .insert(swap_id, encrypted_signature.0); self.send_transfer_proof.push( async move { let (transfer_proof, responder) = transfer_proof_receiver.recv().await?; let request = transfer_proof::Request { swap_id, tx_lock_proof: transfer_proof, }; Ok((peer, request, responder)) } .boxed(), ); EventLoopHandle { recv_encrypted_signature: Some(encrypted_signature.1), send_transfer_proof: Some(transfer_proof_sender), } } } pub trait LatestRate { type Error: std::error::Error + Send + Sync + 'static; fn latest_rate(&mut self) -> Result; } #[derive(Clone, Debug)] pub struct FixedRate(Rate); impl FixedRate { pub const RATE: f64 = 0.01; pub fn value(&self) -> Rate { self.0 } } impl Default for FixedRate { fn default() -> Self { let ask = bitcoin::Amount::from_btc(Self::RATE).expect("Static value should never fail"); let spread = Decimal::from(0u64); Self(Rate::new(ask, spread)) } } impl LatestRate for FixedRate { type Error = Infallible; fn latest_rate(&mut self) -> Result { Ok(self.value()) } } /// Produces [`Rate`]s based on [`PriceUpdate`]s from kraken and a configured /// spread. #[derive(Debug, Clone)] pub struct KrakenRate { ask_spread: Decimal, price_updates: kraken::PriceUpdates, } impl KrakenRate { pub fn new(ask_spread: Decimal, price_updates: kraken::PriceUpdates) -> Self { Self { ask_spread, price_updates, } } } impl LatestRate for KrakenRate { type Error = kraken::Error; fn latest_rate(&mut self) -> Result { let update = self.price_updates.latest_update()?; let rate = Rate::new(update.ask, self.ask_spread); Ok(rate) } } #[derive(Debug)] pub struct EventLoopHandle { recv_encrypted_signature: Option>, send_transfer_proof: Option>, } impl EventLoopHandle { pub async fn recv_encrypted_signature(&mut self) -> Result { let (tx_redeem_encsig, responder) = self .recv_encrypted_signature .take() .context("Encrypted signature was already received")? .recv() .await?; responder .respond(()) .context("Failed to acknowledge receipt of encrypted signature")?; Ok(tx_redeem_encsig) } pub async fn send_transfer_proof(&mut self, msg: monero::TransferProof) -> Result<()> { self.send_transfer_proof .take() .context("Transfer proof was already sent")? .send_receive(msg) .await .context("Failed to send transfer proof")?; Ok(()) } } #[allow(missing_debug_implementations)] struct MpscChannels { sender: mpsc::Sender, receiver: mpsc::Receiver, } impl Default for MpscChannels { fn default() -> Self { let (sender, receiver) = mpsc::channel(100); MpscChannels { sender, receiver } } }