diff --git a/swap/src/alice/event_loop.rs b/swap/src/alice/event_loop.rs index f52f4ec1..b9cd08f4 100644 --- a/swap/src/alice/event_loop.rs +++ b/swap/src/alice/event_loop.rs @@ -242,4 +242,8 @@ impl EventLoop { } } } + + pub fn peer_id(&self) -> PeerId { + self.swarm.peer_id() + } } diff --git a/swap/src/bin/swap.rs b/swap/src/bin/swap.rs index 534cadcd..1af2b124 100644 --- a/swap/src/bin/swap.rs +++ b/swap/src/bin/swap.rs @@ -13,7 +13,7 @@ #![forbid(unsafe_code)] use anyhow::{Context, Result}; -use libp2p::core::Multiaddr; +use libp2p::{core::Multiaddr, PeerId}; use prettytable::{row, Table}; use rand::rngs::OsRng; use std::{convert::TryFrom, sync::Arc}; @@ -111,6 +111,7 @@ async fn main() -> Result<()> { .await?; } Command::BuyXmr { + alice_peer_id, alice_addr, bitcoind_url, bitcoin_wallet_name, @@ -144,7 +145,7 @@ async fn main() -> Result<()> { let bob_state = BobState::Started { state0, amounts, - addr: alice_addr, + alice_peer_id: alice_peer_id.clone(), }; let swap_id = Uuid::new_v4(); @@ -153,7 +154,16 @@ async fn main() -> Result<()> { send_bitcoin, receive_monero, swap_id ); - bob_swap(swap_id, bob_state, bitcoin_wallet, monero_wallet, db).await?; + bob_swap( + swap_id, + bob_state, + bitcoin_wallet, + monero_wallet, + db, + alice_peer_id, + alice_addr, + ) + .await?; } Command::History => { let mut table = Table::new(); @@ -173,6 +183,8 @@ async fn main() -> Result<()> { bitcoin_wallet_name, monero_wallet_rpc_url, listen_addr, + alice_peer_id, + alice_addr, } => { let db_swap = db.get_state(swap_id)?; @@ -202,7 +214,16 @@ async fn main() -> Result<()> { config, ) .await?; - bob_swap(swap_id, bob_state, bitcoin_wallet, monero_wallet, db).await?; + bob_swap( + swap_id, + bob_state, + bitcoin_wallet, + monero_wallet, + db, + alice_peer_id, + alice_addr, + ) + .await?; } else { anyhow::bail!("Unable to construct swap state for swap with id {}") } @@ -277,6 +298,8 @@ async fn bob_swap( bitcoin_wallet: Arc, monero_wallet: Arc, db: Database, + alice_peer_id: PeerId, + alice_addr: Multiaddr, ) -> Result { let bob_behaviour = bob::Behaviour::default(); let bob_transport = build(bob_behaviour.identity())?; @@ -291,6 +314,8 @@ async fn bob_swap( monero_wallet.clone(), OsRng, swap_id, + alice_peer_id, + alice_addr, ); tokio::spawn(async move { event_loop.run().await }); diff --git a/swap/src/bob.rs b/swap/src/bob.rs index 453e9fa1..852e0f0f 100644 --- a/swap/src/bob.rs +++ b/swap/src/bob.rs @@ -10,7 +10,10 @@ use crate::{ SwapAmounts, }; use anyhow::Result; -use libp2p::{core::identity::Keypair, NetworkBehaviour, PeerId}; +use libp2p::{ + core::{identity::Keypair, Multiaddr}, + NetworkBehaviour, PeerId, +}; use tracing::{debug, info}; use xmr_btc::{ alice, @@ -159,9 +162,9 @@ impl Behaviour { debug!("Sent Message3"); } - /// Returns Alice's peer id if we are connected. - pub fn peer_id_of_alice(&self) -> Option { - self.pt.counterparty_peer_id() + /// Add a known address for the given peer + pub fn add_address(&mut self, peer_id: PeerId, address: Multiaddr) { + self.pt.add_address(peer_id, address) } } diff --git a/swap/src/bob/event_loop.rs b/swap/src/bob/event_loop.rs index 28f3b833..0b1ff9c2 100644 --- a/swap/src/bob/event_loop.rs +++ b/swap/src/bob/event_loop.rs @@ -9,7 +9,7 @@ use tokio::{ stream::StreamExt, sync::mpsc::{Receiver, Sender}, }; -use tracing::info; +use tracing::{debug, error, info}; use xmr_btc::{alice, bitcoin::EncryptedSignature, bob}; pub struct Channels { @@ -36,7 +36,8 @@ pub struct EventLoopHandle { msg2: Receiver, request_amounts: Sender<(PeerId, ::bitcoin::Amount)>, conn_established: Receiver, - dial_alice: Sender, + dial_alice: Sender, + add_address: Sender<(PeerId, Multiaddr)>, send_msg0: Sender<(PeerId, bob::Message0)>, send_msg1: Sender<(PeerId, bob::Message1)>, send_msg2: Sender<(PeerId, bob::Message2)>, @@ -44,13 +45,6 @@ pub struct EventLoopHandle { } impl EventLoopHandle { - pub async fn recv_conn_established(&mut self) -> Result { - self.conn_established - .recv() - .await - .ok_or_else(|| anyhow!("Failed to receive connection established from Bob")) - } - pub async fn recv_message0(&mut self) -> Result { self.msg0 .recv() @@ -72,9 +66,24 @@ impl EventLoopHandle { .ok_or_else(|| anyhow!("Failed o receive message 2 from Bob")) } - pub async fn dial_alice(&mut self, addr: Multiaddr) -> Result<()> { - info!("sending msg to ourselves to dial alice: {}", addr); - let _ = self.dial_alice.send(addr).await?; + /// Dials other party and wait for the connection to be established. + /// Do nothing if we are already connected + pub async fn dial(&mut self, peer_id: PeerId) -> Result<()> { + let _ = self.dial_alice.send(peer_id).await?; + + std::thread::sleep(std::time::Duration::from_millis(100)); + + self.conn_established + .recv() + .await + .ok_or_else(|| anyhow!("Failed to receive connection established from Alice"))?; + + Ok(()) + } + + pub async fn add_address(&mut self, peer_id: PeerId, addr: Multiaddr) -> Result<()> { + debug!("Add address {} for peer id {}", addr, peer_id); + self.add_address.send((peer_id, addr)).await?; Ok(()) } @@ -119,7 +128,8 @@ pub struct EventLoop { msg2: Sender, conn_established: Sender, request_amounts: Receiver<(PeerId, ::bitcoin::Amount)>, - dial_alice: Receiver, + dial_alice: Receiver, + add_address: Receiver<(PeerId, Multiaddr)>, send_msg0: Receiver<(PeerId, bob::Message0)>, send_msg1: Receiver<(PeerId, bob::Message1)>, send_msg2: Receiver<(PeerId, bob::Message2)>, @@ -142,6 +152,7 @@ impl EventLoop { let msg2 = Channels::new(); let conn_established = Channels::new(); let dial_alice = Channels::new(); + let add_address = Channels::new(); let send_msg0 = Channels::new(); let send_msg1 = Channels::new(); let send_msg2 = Channels::new(); @@ -155,6 +166,7 @@ impl EventLoop { msg2: msg2.sender, conn_established: conn_established.sender, dial_alice: dial_alice.receiver, + add_address: add_address.receiver, send_msg0: send_msg0.receiver, send_msg1: send_msg1.receiver, send_msg2: send_msg2.receiver, @@ -168,6 +180,7 @@ impl EventLoop { msg2: msg2.receiver, conn_established: conn_established.receiver, dial_alice: dial_alice.sender, + add_address: add_address.sender, send_msg0: send_msg0.sender, send_msg1: send_msg1.sender, send_msg2: send_msg2.sender, @@ -182,8 +195,8 @@ impl EventLoop { tokio::select! { swarm_event = self.swarm.next().fuse() => { match swarm_event { - OutEvent::ConnectionEstablished(alice) => { - let _ = self.conn_established.send(alice).await; + OutEvent::ConnectionEstablished(peer_id) => { + let _ = self.conn_established.send(peer_id).await; } OutEvent::Amounts(_amounts) => info!("Amounts received from Alice"), OutEvent::Message0(msg) => { @@ -198,10 +211,25 @@ impl EventLoop { OutEvent::Message3 => info!("Alice acknowledged message 3 received"), } }, - addr = self.dial_alice.next().fuse() => { - if let Some(addr) = addr { - info!("dialing alice: {}", addr); - libp2p::Swarm::dial_addr(&mut self.swarm, addr).expect("Could not dial alice"); + peer_id_addr = self.add_address.next().fuse() => { + if let Some((peer_id, addr)) = peer_id_addr { + debug!("Add address for {}: {}", peer_id, addr); + self.swarm.add_address(peer_id, addr); + } + }, + peer_id = self.dial_alice.next().fuse() => { + if let Some(peer_id) = peer_id { + if self.swarm.pt.is_connected(&peer_id) { + debug!("Already connected to Alice: {}", peer_id); + let _ = self.conn_established.send(peer_id).await; + } else { + info!("dialing alice: {}", peer_id); + if let Err(err) = libp2p::Swarm::dial(&mut self.swarm, &peer_id) { + error!("Could not dial alice: {}", err); + // TODO(Franck): If Dial fails then we should report it. + } + + } } }, amounts = self.request_amounts.next().fuse() => { diff --git a/swap/src/bob/swap.rs b/swap/src/bob/swap.rs index 30350010..1a56d38c 100644 --- a/swap/src/bob/swap.rs +++ b/swap/src/bob/swap.rs @@ -23,12 +23,12 @@ pub enum BobState { Started { state0: bob::State0, amounts: SwapAmounts, - addr: Multiaddr, + alice_peer_id: PeerId, }, Negotiated(bob::State2, PeerId), BtcLocked(bob::State3, PeerId), XmrLocked(bob::State4, PeerId), - EncSigSent(bob::State4, PeerId), + EncSigSent(bob::State4), BtcRedeemed(bob::State5), T1Expired(bob::State4), Cancelled(bob::State4), @@ -67,7 +67,7 @@ impl From for state::Bob { BobState::Negotiated(state2, peer_id) => Bob::Negotiated { state2, peer_id }, BobState::BtcLocked(state3, peer_id) => Bob::BtcLocked { state3, peer_id }, BobState::XmrLocked(state4, peer_id) => Bob::XmrLocked { state4, peer_id }, - BobState::EncSigSent(state4, peer_id) => Bob::EncSigSent { state4, peer_id }, + BobState::EncSigSent(state4) => Bob::EncSigSent { state4 }, BobState::BtcRedeemed(state5) => Bob::BtcRedeemed(state5), BobState::T1Expired(state4) => Bob::T1Expired(state4), BobState::Cancelled(state4) => Bob::BtcCancelled(state4), @@ -88,7 +88,7 @@ impl TryFrom for BobState { Bob::Negotiated { state2, peer_id } => BobState::Negotiated(state2, peer_id), Bob::BtcLocked { state3, peer_id } => BobState::BtcLocked(state3, peer_id), Bob::XmrLocked { state4, peer_id } => BobState::XmrLocked(state4, peer_id), - Bob::EncSigSent { state4, peer_id } => BobState::EncSigSent(state4, peer_id), + Bob::EncSigSent { state4 } => BobState::EncSigSent(state4), Bob::BtcRedeemed(state5) => BobState::BtcRedeemed(state5), Bob::T1Expired(state4) => BobState::T1Expired(state4), Bob::BtcCancelled(state4) => BobState::Cancelled(state4), @@ -102,18 +102,26 @@ impl TryFrom for BobState { } } +// TODO(Franck): Make this a method on a struct +#[allow(clippy::too_many_arguments)] pub async fn swap( state: BobState, - event_loop_handle: EventLoopHandle, + mut event_loop_handle: EventLoopHandle, db: Database, bitcoin_wallet: Arc, monero_wallet: Arc, rng: R, swap_id: Uuid, + alice_peer_id: PeerId, + alice_addr: Multiaddr, ) -> Result where R: RngCore + CryptoRng + Send, { + event_loop_handle + .add_address(alice_peer_id, alice_addr) + .await?; + run_until( state, is_complete, @@ -173,13 +181,15 @@ where BobState::Started { state0, amounts, - addr, + alice_peer_id, } => { + event_loop_handle.dial(alice_peer_id.clone()).await?; + let (state2, alice_peer_id) = negotiate( state0, amounts, &mut event_loop_handle, - addr, + alice_peer_id.clone(), &mut rng, bitcoin_wallet.clone(), ) @@ -202,6 +212,8 @@ where .await } BobState::Negotiated(state2, alice_peer_id) => { + // Do not lock Bitcoin if not connected to Alice. + event_loop_handle.dial(alice_peer_id.clone()).await?; // Alice and Bob have exchanged info let state3 = state2.lock_btc(bitcoin_wallet.as_ref()).await?; @@ -224,7 +236,10 @@ where // Bob has locked Btc // Watch for Alice to Lock Xmr or for t1 to elapse BobState::BtcLocked(state3, alice_peer_id) => { - // todo: watch until t1, not indefinetely + // TODO(Franck): Refund if cannot connect to Alice. + event_loop_handle.dial(alice_peer_id.clone()).await?; + + // todo: watch until t1, not indefinitely let msg2 = event_loop_handle.recv_message2().await?; let state4 = state3 .watch_for_lock_xmr(monero_wallet.as_ref(), msg2) @@ -247,12 +262,16 @@ where .await } BobState::XmrLocked(state, alice_peer_id) => { + // TODO(Franck): Refund if cannot connect to Alice. + event_loop_handle.dial(alice_peer_id.clone()).await?; + let state = if let Epoch::T0 = state.current_epoch(bitcoin_wallet.as_ref()).await? { // Alice has locked Xmr // Bob sends Alice his key let tx_redeem_encsig = state.tx_redeem_encsig(); let state4_clone = state.clone(); + // TODO(Franck): Refund if message cannot be sent. let enc_sig_sent_watcher = event_loop_handle.send_message3(alice_peer_id.clone(), tx_redeem_encsig); let bitcoin_wallet = bitcoin_wallet.clone(); @@ -260,7 +279,7 @@ where select! { _ = enc_sig_sent_watcher => { - BobState::EncSigSent(state, alice_peer_id) + BobState::EncSigSent(state) }, _ = t1_timeout => { BobState::T1Expired(state) @@ -284,7 +303,7 @@ where ) .await } - BobState::EncSigSent(state, ..) => { + BobState::EncSigSent(state) => { let state = if let Epoch::T0 = state.current_epoch(bitcoin_wallet.as_ref()).await? { let state_clone = state.clone(); let redeem_watcher = state_clone.watch_for_redeem_btc(bitcoin_wallet.as_ref()); @@ -401,7 +420,7 @@ pub async fn negotiate( state0: xmr_btc::bob::State0, amounts: SwapAmounts, swarm: &mut EventLoopHandle, - addr: Multiaddr, + alice_peer_id: PeerId, mut rng: R, bitcoin_wallet: Arc, ) -> Result<(State2, PeerId)> @@ -409,10 +428,6 @@ where R: RngCore + CryptoRng + Send, { tracing::trace!("Starting negotiate"); - swarm.dial_alice(addr).await?; - - let alice_peer_id = swarm.recv_conn_established().await?; - swarm .request_amounts(alice_peer_id.clone(), amounts.btc) .await?; diff --git a/swap/src/cli.rs b/swap/src/cli.rs index 6724ffde..fe0a8121 100644 --- a/swap/src/cli.rs +++ b/swap/src/cli.rs @@ -1,4 +1,4 @@ -use libp2p::core::Multiaddr; +use libp2p::{core::Multiaddr, PeerId}; use url::Url; use uuid::Uuid; @@ -47,6 +47,9 @@ pub enum Command { receive_bitcoin: bitcoin::Amount, }, BuyXmr { + #[structopt(short = "p", long = "connect-peer-id")] + alice_peer_id: PeerId, + #[structopt(short = "a", long = "connect-addr")] alice_addr: Multiaddr, @@ -78,6 +81,12 @@ pub enum Command { #[structopt(short = "id", long = "swap-id")] swap_id: Uuid, + #[structopt(short = "p", long = "connect-peer-id")] + alice_peer_id: PeerId, + + #[structopt(short = "a", long = "connect-addr")] + alice_addr: Multiaddr, + #[structopt( short = "b", long = "bitcoind-rpc", diff --git a/swap/src/network/peer_tracker.rs b/swap/src/network/peer_tracker.rs index 08e97077..1401fdf6 100644 --- a/swap/src/network/peer_tracker.rs +++ b/swap/src/network/peer_tracker.rs @@ -7,7 +7,10 @@ use libp2p::{ }, Multiaddr, PeerId, }; -use std::{collections::VecDeque, task::Poll}; +use std::{ + collections::{HashMap, VecDeque}, + task::Poll, +}; #[derive(Debug)] pub enum OutEvent { @@ -21,10 +24,21 @@ pub enum OutEvent { #[derive(Default, Debug)] pub struct PeerTracker { connected: Option<(PeerId, Multiaddr)>, + address_of_peer: HashMap, events: VecDeque, } impl PeerTracker { + /// Return whether we are connected to the given peer. + pub fn is_connected(&self, peer_id: &PeerId) -> bool { + if let Some((connected_peer_id, _)) = &self.connected { + if connected_peer_id == peer_id { + return true; + } + } + false + } + /// Returns the peer id of counterparty if we are connected. pub fn counterparty_peer_id(&self) -> Option { if let Some((id, _)) = &self.connected { @@ -33,13 +47,18 @@ impl PeerTracker { None } - /// Returns the multiaddr of counterparty if we are connected. - pub fn counterparty_addr(&self) -> Option { - if let Some((_, addr)) = &self.connected { - return Some(addr.clone()); + /// Returns the peer_id and multiaddr of counterparty if we are connected. + pub fn counterparty(&self) -> Option<(PeerId, Multiaddr)> { + if let Some((peer_id, addr)) = &self.connected { + return Some((peer_id.clone(), addr.clone())); } None } + + /// Add an address for a given peer. We only store one address per peer. + pub fn add_address(&mut self, peer_id: PeerId, address: Multiaddr) { + self.address_of_peer.insert(peer_id, address); + } } impl NetworkBehaviour for PeerTracker { @@ -50,11 +69,17 @@ impl NetworkBehaviour for PeerTracker { DummyProtocolsHandler::default() } - fn addresses_of_peer(&mut self, _: &PeerId) -> Vec { + fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { let mut addresses: Vec = vec![]; - if let Some(addr) = self.counterparty_addr() { - addresses.push(addr) + if let Some((counterparty_peer_id, addr)) = self.counterparty() { + if counterparty_peer_id == *peer_id { + addresses.push(addr) + } + } + + if let Some(addr) = self.address_of_peer.get(peer_id) { + addresses.push(addr.clone()); } addresses diff --git a/swap/src/state.rs b/swap/src/state.rs index 8fb04539..3165b8b6 100644 --- a/swap/src/state.rs +++ b/swap/src/state.rs @@ -56,8 +56,6 @@ pub enum Bob { }, EncSigSent { state4: bob::State4, - #[serde(with = "crate::serde::peer_id")] - peer_id: PeerId, }, BtcRedeemed(bob::State5), T1Expired(bob::State4), diff --git a/swap/tests/happy_path.rs b/swap/tests/happy_path.rs index daabc75e..e4571f5a 100644 --- a/swap/tests/happy_path.rs +++ b/swap/tests/happy_path.rs @@ -63,7 +63,8 @@ async fn happy_path() { let (bob_state, bob_event_loop, bob_event_loop_handle, bob_btc_wallet, bob_xmr_wallet, bob_db) = init_bob( - alice_multiaddr, + alice_multiaddr.clone(), + alice_event_loop.peer_id(), &bitcoind, &monero, btc_to_swap, @@ -83,6 +84,8 @@ async fn happy_path() { alice_db, ); + let alice_peer_id = alice_event_loop.peer_id(); + let _alice_swarm_fut = tokio::spawn(async move { alice_event_loop.run().await }); let bob_swap_fut = bob::swap::swap( @@ -93,6 +96,8 @@ async fn happy_path() { bob_xmr_wallet.clone(), OsRng, Uuid::new_v4(), + alice_peer_id, + alice_multiaddr, ); let _bob_swarm_fut = tokio::spawn(async move { bob_event_loop.run().await }); diff --git a/swap/tests/happy_path_restart_alice.rs b/swap/tests/happy_path_restart_alice.rs index 75bf8a1a..e32035ce 100644 --- a/swap/tests/happy_path_restart_alice.rs +++ b/swap/tests/happy_path_restart_alice.rs @@ -56,9 +56,12 @@ async fn given_alice_restarts_after_encsig_is_learned_resume_swap() { ) .await; + let alice_peer_id = alice_event_loop.peer_id(); + let (bob_state, bob_event_loop, bob_event_loop_handle, bob_btc_wallet, bob_xmr_wallet, bob_db) = init_bob( alice_multiaddr.clone(), + alice_peer_id.clone(), &bitcoind, &monero, btc_to_swap, @@ -73,18 +76,17 @@ async fn given_alice_restarts_after_encsig_is_learned_resume_swap() { let bob_btc_wallet_clone = bob_btc_wallet.clone(); let bob_xmr_wallet_clone = bob_xmr_wallet.clone(); - let _ = tokio::spawn(async move { - bob::swap::swap( - bob_state, - bob_event_loop_handle, - bob_db, - bob_btc_wallet.clone(), - bob_xmr_wallet.clone(), - OsRng, - Uuid::new_v4(), - ) - .await - }); + let _ = tokio::spawn(bob::swap::swap( + bob_state, + bob_event_loop_handle, + bob_db, + bob_btc_wallet.clone(), + bob_xmr_wallet.clone(), + OsRng, + Uuid::new_v4(), + alice_peer_id, + alice_multiaddr.clone(), + )); let _bob_swarm_fut = tokio::spawn(async move { bob_event_loop.run().await }); diff --git a/swap/tests/happy_path_restart_bob.rs b/swap/tests/happy_path_restart_bob.rs index 98d19aa2..69f1b079 100644 --- a/swap/tests/happy_path_restart_bob.rs +++ b/swap/tests/happy_path_restart_bob.rs @@ -3,10 +3,11 @@ use get_port::get_port; use libp2p::Multiaddr; use rand::rngs::OsRng; use std::convert::TryFrom; -use swap::{alice, bitcoin, bob, bob::swap::BobState, storage::Database}; +use swap::{alice, alice::swap::AliceState, bitcoin, bob, bob::swap::BobState, storage::Database}; use tempfile::tempdir; use testcontainers::clients::Cli; use testutils::init_tracing; +use tokio::select; use uuid::Uuid; use xmr_btc::config::Config; @@ -56,9 +57,11 @@ async fn given_bob_restarts_after_encsig_is_sent_resume_swap() { ) .await; + let alice_peer_id = alice_event_loop.peer_id(); let (bob_state, bob_event_loop, bob_event_loop_handle, bob_btc_wallet, bob_xmr_wallet, _) = init_bob( alice_multiaddr.clone(), + alice_peer_id.clone(), &bitcoind, &monero, btc_to_swap, @@ -136,6 +139,8 @@ async fn given_bob_restarts_after_encsig_is_sent_resume_swap() { bob_xmr_wallet, OsRng, bob_swap_id, + alice_peer_id, + alice_multiaddr, ) .await .unwrap(); @@ -158,3 +163,149 @@ async fn given_bob_restarts_after_encsig_is_sent_resume_swap() { assert!(xmr_alice_final <= alice_xmr_starting_balance - xmr_to_swap); assert_eq!(xmr_bob_final, xmr_to_swap); } + +#[tokio::test] +async fn given_bob_restarts_after_xmr_is_locked_resume_swap() { + let _guard = init_tracing(); + + let cli = Cli::default(); + let ( + monero, + testutils::Containers { + bitcoind, + monerods: _monerods, + }, + ) = testutils::init_containers(&cli).await; + + let btc_to_swap = bitcoin::Amount::from_sat(1_000_000); + let xmr_to_swap = xmr_btc::monero::Amount::from_piconero(1_000_000_000_000); + + let bob_btc_starting_balance = btc_to_swap * 10; + let bob_xmr_starting_balance = xmr_btc::monero::Amount::from_piconero(0); + + let alice_btc_starting_balance = bitcoin::Amount::ZERO; + let alice_xmr_starting_balance = xmr_to_swap * 10; + + let port = get_port().expect("Failed to find a free port"); + let alice_multiaddr: Multiaddr = format!("/ip4/127.0.0.1/tcp/{}", port) + .parse() + .expect("failed to parse Alice's address"); + + let ( + alice_state, + mut alice_event_loop, + alice_event_loop_handle, + alice_btc_wallet, + alice_xmr_wallet, + alice_db, + ) = init_alice( + &bitcoind, + &monero, + btc_to_swap, + xmr_to_swap, + alice_xmr_starting_balance, + alice_multiaddr.clone(), + Config::regtest(), + ) + .await; + + let alice_peer_id = alice_event_loop.peer_id(); + let (bob_state, bob_event_loop_1, bob_event_loop_handle_1, bob_btc_wallet, bob_xmr_wallet, _) = + init_bob( + alice_multiaddr.clone(), + alice_peer_id.clone(), + &bitcoind, + &monero, + btc_to_swap, + bob_btc_starting_balance, + xmr_to_swap, + Config::regtest(), + ) + .await; + + let alice_fut = alice::swap::swap( + alice_state, + alice_event_loop_handle, + alice_btc_wallet.clone(), + alice_xmr_wallet.clone(), + Config::regtest(), + Uuid::new_v4(), + alice_db, + ); + + let bob_swap_id = Uuid::new_v4(); + let bob_db_datadir = tempdir().unwrap(); + + let bob_xmr_locked_fut = { + let bob_db = Database::open(bob_db_datadir.path()).unwrap(); + bob::swap::run_until( + bob_state, + bob::swap::is_xmr_locked, + bob_event_loop_handle_1, + bob_db, + bob_btc_wallet.clone(), + bob_xmr_wallet.clone(), + OsRng, + bob_swap_id, + ) + }; + + tokio::spawn(async move { alice_event_loop.run().await }); + + let alice_fut_handle = tokio::spawn(alice_fut); + + // We are selecting with bob_event_loop_1 so that we stop polling on it once + // bob reaches `xmr locked` state. + let bob_restart_state = select! { + res = bob_xmr_locked_fut => res.unwrap(), + _ = bob_event_loop_1.run() => panic!("The event loop should never finish") + }; + + // let tx_lock_id = if let AliceState::BtcRefunded { state3, .. } = alice_state + // { state3.tx_lock.txid() + // } else { + // panic!(format!("Alice in unexpected state: {}", alice_state)); + // }; + + let (bob_event_loop_2, bob_event_loop_handle_2) = testutils::init_bob_event_loop(); + + let bob_fut = bob::swap::swap( + bob_restart_state, + bob_event_loop_handle_2, + Database::open(bob_db_datadir.path()).unwrap(), + bob_btc_wallet.clone(), + bob_xmr_wallet.clone(), + OsRng, + bob_swap_id, + alice_peer_id, + alice_multiaddr, + ); + + let bob_final_state = select! { + bob_final_state = bob_fut => bob_final_state.unwrap(), + _ = bob_event_loop_2.run() => panic!("Event loop is not expected to stop") + }; + + assert!(matches!(bob_final_state, BobState::XmrRedeemed)); + + // Wait for Alice to finish too. + let alice_final_state = alice_fut_handle.await.unwrap().unwrap(); + assert!(matches!(alice_final_state, AliceState::BtcRedeemed)); + + let btc_alice_final = alice_btc_wallet.as_ref().balance().await.unwrap(); + let btc_bob_final = bob_btc_wallet.as_ref().balance().await.unwrap(); + + let xmr_alice_final = alice_xmr_wallet.as_ref().get_balance().await.unwrap(); + + bob_xmr_wallet.as_ref().0.refresh().await.unwrap(); + let xmr_bob_final = bob_xmr_wallet.as_ref().get_balance().await.unwrap(); + + assert_eq!( + btc_alice_final, + alice_btc_starting_balance + btc_to_swap - bitcoin::Amount::from_sat(bitcoin::TX_FEE) + ); + assert!(btc_bob_final <= bob_btc_starting_balance - btc_to_swap); + + assert!(xmr_alice_final <= alice_xmr_starting_balance - xmr_to_swap); + assert_eq!(xmr_bob_final, bob_xmr_starting_balance + xmr_to_swap); +} diff --git a/swap/tests/punish.rs b/swap/tests/punish.rs index 717983ef..91eb3164 100644 --- a/swap/tests/punish.rs +++ b/swap/tests/punish.rs @@ -62,6 +62,7 @@ async fn alice_punishes_if_bob_never_acts_after_fund() { let (bob_state, bob_event_loop, bob_event_loop_handle, bob_btc_wallet, bob_xmr_wallet, bob_db) = init_bob( alice_multiaddr, + alice_event_loop.peer_id(), &bitcoind, &monero, btc_to_swap, diff --git a/swap/tests/refund_restart_alice.rs b/swap/tests/refund_restart_alice.rs index abe9266e..6dfbf225 100644 --- a/swap/tests/refund_restart_alice.rs +++ b/swap/tests/refund_restart_alice.rs @@ -63,6 +63,7 @@ async fn given_alice_restarts_after_xmr_is_locked_abort_swap() { let (bob_state, bob_event_loop, bob_event_loop_handle, bob_btc_wallet, bob_xmr_wallet, bob_db) = init_bob( alice_multiaddr.clone(), + alice_event_loop_1.peer_id(), &bitcoind, &monero, btc_to_swap, @@ -80,6 +81,8 @@ async fn given_alice_restarts_after_xmr_is_locked_abort_swap() { bob_xmr_wallet.clone(), OsRng, Uuid::new_v4(), + alice_event_loop_1.peer_id(), + alice_multiaddr.clone(), ); let alice_swap_id = Uuid::new_v4(); diff --git a/swap/tests/testutils/mod.rs b/swap/tests/testutils/mod.rs index 64a64146..186a28e6 100644 --- a/swap/tests/testutils/mod.rs +++ b/swap/tests/testutils/mod.rs @@ -1,5 +1,5 @@ use bitcoin_harness::Bitcoind; -use libp2p::core::Multiaddr; +use libp2p::{core::Multiaddr, PeerId}; use monero_harness::{image, Monero}; use rand::rngs::OsRng; use std::sync::Arc; @@ -155,7 +155,7 @@ pub async fn init_alice( } pub async fn init_bob_state( - alice_multiaddr: Multiaddr, + alice_peer_id: PeerId, btc_to_swap: bitcoin::Amount, xmr_to_swap: xmr_btc::monero::Amount, bob_btc_wallet: Arc, @@ -179,7 +179,7 @@ pub async fn init_bob_state( BobState::Started { state0, amounts, - addr: alice_multiaddr, + alice_peer_id, } } @@ -192,6 +192,7 @@ pub fn init_bob_event_loop() -> (bob::event_loop::EventLoop, bob::event_loop::Ev #[allow(clippy::too_many_arguments)] pub async fn init_bob( alice_multiaddr: Multiaddr, + alice_peer_id: PeerId, bitcoind: &Bitcoind<'_>, monero: &Monero, btc_to_swap: bitcoin::Amount, @@ -217,7 +218,7 @@ pub async fn init_bob( .await; let bob_state = init_bob_state( - alice_multiaddr, + alice_peer_id.clone(), btc_to_swap, xmr_to_swap, bob_btc_wallet.clone(), @@ -225,7 +226,12 @@ pub async fn init_bob( ) .await; - let (event_loop, event_loop_handle) = init_bob_event_loop(); + let (event_loop, mut event_loop_handle) = init_bob_event_loop(); + + event_loop_handle + .add_address(alice_peer_id, alice_multiaddr) + .await + .unwrap(); let bob_db_dir = tempdir().unwrap(); let bob_db = Database::open(bob_db_dir.path()).unwrap();