Bob refunds swap after restart that requires communication

As Bob is dialing Alice, we now ensure that we are connected to Alice
at each step that needs communication.
If we are not connected, we proceed with dialing.

In an attempt to improve libp2p usage, we also add known address of
Alice first and only use peer_id to dial.
This ensures that we use the expected peer id.
pull/100/head
Franck Royer 4 years ago
parent d9ea7ab605
commit 1a4bd0e2b4
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4

@ -242,4 +242,8 @@ impl EventLoop {
} }
} }
} }
pub fn peer_id(&self) -> PeerId {
self.swarm.peer_id()
}
} }

@ -13,7 +13,7 @@
#![forbid(unsafe_code)] #![forbid(unsafe_code)]
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use libp2p::core::Multiaddr; use libp2p::{core::Multiaddr, PeerId};
use prettytable::{row, Table}; use prettytable::{row, Table};
use rand::rngs::OsRng; use rand::rngs::OsRng;
use std::{convert::TryFrom, sync::Arc}; use std::{convert::TryFrom, sync::Arc};
@ -111,6 +111,7 @@ async fn main() -> Result<()> {
.await?; .await?;
} }
Command::BuyXmr { Command::BuyXmr {
alice_peer_id,
alice_addr, alice_addr,
bitcoind_url, bitcoind_url,
bitcoin_wallet_name, bitcoin_wallet_name,
@ -144,7 +145,7 @@ async fn main() -> Result<()> {
let bob_state = BobState::Started { let bob_state = BobState::Started {
state0, state0,
amounts, amounts,
addr: alice_addr, alice_peer_id: alice_peer_id.clone(),
}; };
let swap_id = Uuid::new_v4(); let swap_id = Uuid::new_v4();
@ -153,7 +154,16 @@ async fn main() -> Result<()> {
send_bitcoin, receive_monero, swap_id send_bitcoin, receive_monero, swap_id
); );
bob_swap(swap_id, bob_state, bitcoin_wallet, monero_wallet, db).await?; bob_swap(
swap_id,
bob_state,
bitcoin_wallet,
monero_wallet,
db,
alice_peer_id,
alice_addr,
)
.await?;
} }
Command::History => { Command::History => {
let mut table = Table::new(); let mut table = Table::new();
@ -173,6 +183,8 @@ async fn main() -> Result<()> {
bitcoin_wallet_name, bitcoin_wallet_name,
monero_wallet_rpc_url, monero_wallet_rpc_url,
listen_addr, listen_addr,
alice_peer_id,
alice_addr,
} => { } => {
let db_swap = db.get_state(swap_id)?; let db_swap = db.get_state(swap_id)?;
@ -202,7 +214,16 @@ async fn main() -> Result<()> {
config, config,
) )
.await?; .await?;
bob_swap(swap_id, bob_state, bitcoin_wallet, monero_wallet, db).await?; bob_swap(
swap_id,
bob_state,
bitcoin_wallet,
monero_wallet,
db,
alice_peer_id,
alice_addr,
)
.await?;
} else { } else {
anyhow::bail!("Unable to construct swap state for swap with id {}") anyhow::bail!("Unable to construct swap state for swap with id {}")
} }
@ -277,6 +298,8 @@ async fn bob_swap(
bitcoin_wallet: Arc<bitcoin::Wallet>, bitcoin_wallet: Arc<bitcoin::Wallet>,
monero_wallet: Arc<monero::Wallet>, monero_wallet: Arc<monero::Wallet>,
db: Database, db: Database,
alice_peer_id: PeerId,
alice_addr: Multiaddr,
) -> Result<BobState> { ) -> Result<BobState> {
let bob_behaviour = bob::Behaviour::default(); let bob_behaviour = bob::Behaviour::default();
let bob_transport = build(bob_behaviour.identity())?; let bob_transport = build(bob_behaviour.identity())?;
@ -291,6 +314,8 @@ async fn bob_swap(
monero_wallet.clone(), monero_wallet.clone(),
OsRng, OsRng,
swap_id, swap_id,
alice_peer_id,
alice_addr,
); );
tokio::spawn(async move { event_loop.run().await }); tokio::spawn(async move { event_loop.run().await });

@ -10,7 +10,10 @@ use crate::{
SwapAmounts, SwapAmounts,
}; };
use anyhow::Result; use anyhow::Result;
use libp2p::{core::identity::Keypair, NetworkBehaviour, PeerId}; use libp2p::{
core::{identity::Keypair, Multiaddr},
NetworkBehaviour, PeerId,
};
use tracing::{debug, info}; use tracing::{debug, info};
use xmr_btc::{ use xmr_btc::{
alice, alice,
@ -159,9 +162,9 @@ impl Behaviour {
debug!("Sent Message3"); debug!("Sent Message3");
} }
/// Returns Alice's peer id if we are connected. /// Add a known address for the given peer
pub fn peer_id_of_alice(&self) -> Option<PeerId> { pub fn add_address(&mut self, peer_id: PeerId, address: Multiaddr) {
self.pt.counterparty_peer_id() self.pt.add_address(peer_id, address)
} }
} }

@ -9,7 +9,7 @@ use tokio::{
stream::StreamExt, stream::StreamExt,
sync::mpsc::{Receiver, Sender}, sync::mpsc::{Receiver, Sender},
}; };
use tracing::info; use tracing::{debug, error, info};
use xmr_btc::{alice, bitcoin::EncryptedSignature, bob}; use xmr_btc::{alice, bitcoin::EncryptedSignature, bob};
pub struct Channels<T> { pub struct Channels<T> {
@ -36,7 +36,8 @@ pub struct EventLoopHandle {
msg2: Receiver<alice::Message2>, msg2: Receiver<alice::Message2>,
request_amounts: Sender<(PeerId, ::bitcoin::Amount)>, request_amounts: Sender<(PeerId, ::bitcoin::Amount)>,
conn_established: Receiver<PeerId>, conn_established: Receiver<PeerId>,
dial_alice: Sender<Multiaddr>, dial_alice: Sender<PeerId>,
add_address: Sender<(PeerId, Multiaddr)>,
send_msg0: Sender<(PeerId, bob::Message0)>, send_msg0: Sender<(PeerId, bob::Message0)>,
send_msg1: Sender<(PeerId, bob::Message1)>, send_msg1: Sender<(PeerId, bob::Message1)>,
send_msg2: Sender<(PeerId, bob::Message2)>, send_msg2: Sender<(PeerId, bob::Message2)>,
@ -44,13 +45,6 @@ pub struct EventLoopHandle {
} }
impl EventLoopHandle { impl EventLoopHandle {
pub async fn recv_conn_established(&mut self) -> Result<PeerId> {
self.conn_established
.recv()
.await
.ok_or_else(|| anyhow!("Failed to receive connection established from Bob"))
}
pub async fn recv_message0(&mut self) -> Result<alice::Message0> { pub async fn recv_message0(&mut self) -> Result<alice::Message0> {
self.msg0 self.msg0
.recv() .recv()
@ -72,9 +66,24 @@ impl EventLoopHandle {
.ok_or_else(|| anyhow!("Failed o receive message 2 from Bob")) .ok_or_else(|| anyhow!("Failed o receive message 2 from Bob"))
} }
pub async fn dial_alice(&mut self, addr: Multiaddr) -> Result<()> { /// Dials other party and wait for the connection to be established.
info!("sending msg to ourselves to dial alice: {}", addr); /// Do nothing if we are already connected
let _ = self.dial_alice.send(addr).await?; pub async fn dial(&mut self, peer_id: PeerId) -> Result<()> {
let _ = self.dial_alice.send(peer_id).await?;
std::thread::sleep(std::time::Duration::from_millis(100));
self.conn_established
.recv()
.await
.ok_or_else(|| anyhow!("Failed to receive connection established from Alice"))?;
Ok(())
}
pub async fn add_address(&mut self, peer_id: PeerId, addr: Multiaddr) -> Result<()> {
debug!("Add address {} for peer id {}", addr, peer_id);
self.add_address.send((peer_id, addr)).await?;
Ok(()) Ok(())
} }
@ -119,7 +128,8 @@ pub struct EventLoop {
msg2: Sender<alice::Message2>, msg2: Sender<alice::Message2>,
conn_established: Sender<PeerId>, conn_established: Sender<PeerId>,
request_amounts: Receiver<(PeerId, ::bitcoin::Amount)>, request_amounts: Receiver<(PeerId, ::bitcoin::Amount)>,
dial_alice: Receiver<Multiaddr>, dial_alice: Receiver<PeerId>,
add_address: Receiver<(PeerId, Multiaddr)>,
send_msg0: Receiver<(PeerId, bob::Message0)>, send_msg0: Receiver<(PeerId, bob::Message0)>,
send_msg1: Receiver<(PeerId, bob::Message1)>, send_msg1: Receiver<(PeerId, bob::Message1)>,
send_msg2: Receiver<(PeerId, bob::Message2)>, send_msg2: Receiver<(PeerId, bob::Message2)>,
@ -142,6 +152,7 @@ impl EventLoop {
let msg2 = Channels::new(); let msg2 = Channels::new();
let conn_established = Channels::new(); let conn_established = Channels::new();
let dial_alice = Channels::new(); let dial_alice = Channels::new();
let add_address = Channels::new();
let send_msg0 = Channels::new(); let send_msg0 = Channels::new();
let send_msg1 = Channels::new(); let send_msg1 = Channels::new();
let send_msg2 = Channels::new(); let send_msg2 = Channels::new();
@ -155,6 +166,7 @@ impl EventLoop {
msg2: msg2.sender, msg2: msg2.sender,
conn_established: conn_established.sender, conn_established: conn_established.sender,
dial_alice: dial_alice.receiver, dial_alice: dial_alice.receiver,
add_address: add_address.receiver,
send_msg0: send_msg0.receiver, send_msg0: send_msg0.receiver,
send_msg1: send_msg1.receiver, send_msg1: send_msg1.receiver,
send_msg2: send_msg2.receiver, send_msg2: send_msg2.receiver,
@ -168,6 +180,7 @@ impl EventLoop {
msg2: msg2.receiver, msg2: msg2.receiver,
conn_established: conn_established.receiver, conn_established: conn_established.receiver,
dial_alice: dial_alice.sender, dial_alice: dial_alice.sender,
add_address: add_address.sender,
send_msg0: send_msg0.sender, send_msg0: send_msg0.sender,
send_msg1: send_msg1.sender, send_msg1: send_msg1.sender,
send_msg2: send_msg2.sender, send_msg2: send_msg2.sender,
@ -182,8 +195,8 @@ impl EventLoop {
tokio::select! { tokio::select! {
swarm_event = self.swarm.next().fuse() => { swarm_event = self.swarm.next().fuse() => {
match swarm_event { match swarm_event {
OutEvent::ConnectionEstablished(alice) => { OutEvent::ConnectionEstablished(peer_id) => {
let _ = self.conn_established.send(alice).await; let _ = self.conn_established.send(peer_id).await;
} }
OutEvent::Amounts(_amounts) => info!("Amounts received from Alice"), OutEvent::Amounts(_amounts) => info!("Amounts received from Alice"),
OutEvent::Message0(msg) => { OutEvent::Message0(msg) => {
@ -198,10 +211,25 @@ impl EventLoop {
OutEvent::Message3 => info!("Alice acknowledged message 3 received"), OutEvent::Message3 => info!("Alice acknowledged message 3 received"),
} }
}, },
addr = self.dial_alice.next().fuse() => { peer_id_addr = self.add_address.next().fuse() => {
if let Some(addr) = addr { if let Some((peer_id, addr)) = peer_id_addr {
info!("dialing alice: {}", addr); debug!("Add address for {}: {}", peer_id, addr);
libp2p::Swarm::dial_addr(&mut self.swarm, addr).expect("Could not dial alice"); self.swarm.add_address(peer_id, addr);
}
},
peer_id = self.dial_alice.next().fuse() => {
if let Some(peer_id) = peer_id {
if self.swarm.pt.is_connected(&peer_id) {
debug!("Already connected to Alice: {}", peer_id);
let _ = self.conn_established.send(peer_id).await;
} else {
info!("dialing alice: {}", peer_id);
if let Err(err) = libp2p::Swarm::dial(&mut self.swarm, &peer_id) {
error!("Could not dial alice: {}", err);
// TODO(Franck): If Dial fails then we should report it.
}
}
} }
}, },
amounts = self.request_amounts.next().fuse() => { amounts = self.request_amounts.next().fuse() => {

@ -23,12 +23,12 @@ pub enum BobState {
Started { Started {
state0: bob::State0, state0: bob::State0,
amounts: SwapAmounts, amounts: SwapAmounts,
addr: Multiaddr, alice_peer_id: PeerId,
}, },
Negotiated(bob::State2, PeerId), Negotiated(bob::State2, PeerId),
BtcLocked(bob::State3, PeerId), BtcLocked(bob::State3, PeerId),
XmrLocked(bob::State4, PeerId), XmrLocked(bob::State4, PeerId),
EncSigSent(bob::State4, PeerId), EncSigSent(bob::State4),
BtcRedeemed(bob::State5), BtcRedeemed(bob::State5),
T1Expired(bob::State4), T1Expired(bob::State4),
Cancelled(bob::State4), Cancelled(bob::State4),
@ -67,7 +67,7 @@ impl From<BobState> for state::Bob {
BobState::Negotiated(state2, peer_id) => Bob::Negotiated { state2, peer_id }, BobState::Negotiated(state2, peer_id) => Bob::Negotiated { state2, peer_id },
BobState::BtcLocked(state3, peer_id) => Bob::BtcLocked { state3, peer_id }, BobState::BtcLocked(state3, peer_id) => Bob::BtcLocked { state3, peer_id },
BobState::XmrLocked(state4, peer_id) => Bob::XmrLocked { state4, peer_id }, BobState::XmrLocked(state4, peer_id) => Bob::XmrLocked { state4, peer_id },
BobState::EncSigSent(state4, peer_id) => Bob::EncSigSent { state4, peer_id }, BobState::EncSigSent(state4) => Bob::EncSigSent { state4 },
BobState::BtcRedeemed(state5) => Bob::BtcRedeemed(state5), BobState::BtcRedeemed(state5) => Bob::BtcRedeemed(state5),
BobState::T1Expired(state4) => Bob::T1Expired(state4), BobState::T1Expired(state4) => Bob::T1Expired(state4),
BobState::Cancelled(state4) => Bob::BtcCancelled(state4), BobState::Cancelled(state4) => Bob::BtcCancelled(state4),
@ -88,7 +88,7 @@ impl TryFrom<state::Swap> for BobState {
Bob::Negotiated { state2, peer_id } => BobState::Negotiated(state2, peer_id), Bob::Negotiated { state2, peer_id } => BobState::Negotiated(state2, peer_id),
Bob::BtcLocked { state3, peer_id } => BobState::BtcLocked(state3, peer_id), Bob::BtcLocked { state3, peer_id } => BobState::BtcLocked(state3, peer_id),
Bob::XmrLocked { state4, peer_id } => BobState::XmrLocked(state4, peer_id), Bob::XmrLocked { state4, peer_id } => BobState::XmrLocked(state4, peer_id),
Bob::EncSigSent { state4, peer_id } => BobState::EncSigSent(state4, peer_id), Bob::EncSigSent { state4 } => BobState::EncSigSent(state4),
Bob::BtcRedeemed(state5) => BobState::BtcRedeemed(state5), Bob::BtcRedeemed(state5) => BobState::BtcRedeemed(state5),
Bob::T1Expired(state4) => BobState::T1Expired(state4), Bob::T1Expired(state4) => BobState::T1Expired(state4),
Bob::BtcCancelled(state4) => BobState::Cancelled(state4), Bob::BtcCancelled(state4) => BobState::Cancelled(state4),
@ -102,18 +102,26 @@ impl TryFrom<state::Swap> for BobState {
} }
} }
// TODO(Franck): Make this a method on a struct
#[allow(clippy::too_many_arguments)]
pub async fn swap<R>( pub async fn swap<R>(
state: BobState, state: BobState,
event_loop_handle: EventLoopHandle, mut event_loop_handle: EventLoopHandle,
db: Database, db: Database,
bitcoin_wallet: Arc<crate::bitcoin::Wallet>, bitcoin_wallet: Arc<crate::bitcoin::Wallet>,
monero_wallet: Arc<crate::monero::Wallet>, monero_wallet: Arc<crate::monero::Wallet>,
rng: R, rng: R,
swap_id: Uuid, swap_id: Uuid,
alice_peer_id: PeerId,
alice_addr: Multiaddr,
) -> Result<BobState> ) -> Result<BobState>
where where
R: RngCore + CryptoRng + Send, R: RngCore + CryptoRng + Send,
{ {
event_loop_handle
.add_address(alice_peer_id, alice_addr)
.await?;
run_until( run_until(
state, state,
is_complete, is_complete,
@ -173,13 +181,15 @@ where
BobState::Started { BobState::Started {
state0, state0,
amounts, amounts,
addr, alice_peer_id,
} => { } => {
event_loop_handle.dial(alice_peer_id.clone()).await?;
let (state2, alice_peer_id) = negotiate( let (state2, alice_peer_id) = negotiate(
state0, state0,
amounts, amounts,
&mut event_loop_handle, &mut event_loop_handle,
addr, alice_peer_id.clone(),
&mut rng, &mut rng,
bitcoin_wallet.clone(), bitcoin_wallet.clone(),
) )
@ -202,6 +212,8 @@ where
.await .await
} }
BobState::Negotiated(state2, alice_peer_id) => { BobState::Negotiated(state2, alice_peer_id) => {
// Do not lock Bitcoin if not connected to Alice.
event_loop_handle.dial(alice_peer_id.clone()).await?;
// Alice and Bob have exchanged info // Alice and Bob have exchanged info
let state3 = state2.lock_btc(bitcoin_wallet.as_ref()).await?; let state3 = state2.lock_btc(bitcoin_wallet.as_ref()).await?;
@ -224,7 +236,10 @@ where
// Bob has locked Btc // Bob has locked Btc
// Watch for Alice to Lock Xmr or for t1 to elapse // Watch for Alice to Lock Xmr or for t1 to elapse
BobState::BtcLocked(state3, alice_peer_id) => { BobState::BtcLocked(state3, alice_peer_id) => {
// todo: watch until t1, not indefinetely // TODO(Franck): Refund if cannot connect to Alice.
event_loop_handle.dial(alice_peer_id.clone()).await?;
// todo: watch until t1, not indefinitely
let msg2 = event_loop_handle.recv_message2().await?; let msg2 = event_loop_handle.recv_message2().await?;
let state4 = state3 let state4 = state3
.watch_for_lock_xmr(monero_wallet.as_ref(), msg2) .watch_for_lock_xmr(monero_wallet.as_ref(), msg2)
@ -247,12 +262,16 @@ where
.await .await
} }
BobState::XmrLocked(state, alice_peer_id) => { BobState::XmrLocked(state, alice_peer_id) => {
// TODO(Franck): Refund if cannot connect to Alice.
event_loop_handle.dial(alice_peer_id.clone()).await?;
let state = if let Epoch::T0 = state.current_epoch(bitcoin_wallet.as_ref()).await? { let state = if let Epoch::T0 = state.current_epoch(bitcoin_wallet.as_ref()).await? {
// Alice has locked Xmr // Alice has locked Xmr
// Bob sends Alice his key // Bob sends Alice his key
let tx_redeem_encsig = state.tx_redeem_encsig(); let tx_redeem_encsig = state.tx_redeem_encsig();
let state4_clone = state.clone(); let state4_clone = state.clone();
// TODO(Franck): Refund if message cannot be sent.
let enc_sig_sent_watcher = let enc_sig_sent_watcher =
event_loop_handle.send_message3(alice_peer_id.clone(), tx_redeem_encsig); event_loop_handle.send_message3(alice_peer_id.clone(), tx_redeem_encsig);
let bitcoin_wallet = bitcoin_wallet.clone(); let bitcoin_wallet = bitcoin_wallet.clone();
@ -260,7 +279,7 @@ where
select! { select! {
_ = enc_sig_sent_watcher => { _ = enc_sig_sent_watcher => {
BobState::EncSigSent(state, alice_peer_id) BobState::EncSigSent(state)
}, },
_ = t1_timeout => { _ = t1_timeout => {
BobState::T1Expired(state) BobState::T1Expired(state)
@ -284,7 +303,7 @@ where
) )
.await .await
} }
BobState::EncSigSent(state, ..) => { BobState::EncSigSent(state) => {
let state = if let Epoch::T0 = state.current_epoch(bitcoin_wallet.as_ref()).await? { let state = if let Epoch::T0 = state.current_epoch(bitcoin_wallet.as_ref()).await? {
let state_clone = state.clone(); let state_clone = state.clone();
let redeem_watcher = state_clone.watch_for_redeem_btc(bitcoin_wallet.as_ref()); let redeem_watcher = state_clone.watch_for_redeem_btc(bitcoin_wallet.as_ref());
@ -401,7 +420,7 @@ pub async fn negotiate<R>(
state0: xmr_btc::bob::State0, state0: xmr_btc::bob::State0,
amounts: SwapAmounts, amounts: SwapAmounts,
swarm: &mut EventLoopHandle, swarm: &mut EventLoopHandle,
addr: Multiaddr, alice_peer_id: PeerId,
mut rng: R, mut rng: R,
bitcoin_wallet: Arc<crate::bitcoin::Wallet>, bitcoin_wallet: Arc<crate::bitcoin::Wallet>,
) -> Result<(State2, PeerId)> ) -> Result<(State2, PeerId)>
@ -409,10 +428,6 @@ where
R: RngCore + CryptoRng + Send, R: RngCore + CryptoRng + Send,
{ {
tracing::trace!("Starting negotiate"); tracing::trace!("Starting negotiate");
swarm.dial_alice(addr).await?;
let alice_peer_id = swarm.recv_conn_established().await?;
swarm swarm
.request_amounts(alice_peer_id.clone(), amounts.btc) .request_amounts(alice_peer_id.clone(), amounts.btc)
.await?; .await?;

@ -1,4 +1,4 @@
use libp2p::core::Multiaddr; use libp2p::{core::Multiaddr, PeerId};
use url::Url; use url::Url;
use uuid::Uuid; use uuid::Uuid;
@ -47,6 +47,9 @@ pub enum Command {
receive_bitcoin: bitcoin::Amount, receive_bitcoin: bitcoin::Amount,
}, },
BuyXmr { BuyXmr {
#[structopt(short = "p", long = "connect-peer-id")]
alice_peer_id: PeerId,
#[structopt(short = "a", long = "connect-addr")] #[structopt(short = "a", long = "connect-addr")]
alice_addr: Multiaddr, alice_addr: Multiaddr,
@ -78,6 +81,12 @@ pub enum Command {
#[structopt(short = "id", long = "swap-id")] #[structopt(short = "id", long = "swap-id")]
swap_id: Uuid, swap_id: Uuid,
#[structopt(short = "p", long = "connect-peer-id")]
alice_peer_id: PeerId,
#[structopt(short = "a", long = "connect-addr")]
alice_addr: Multiaddr,
#[structopt( #[structopt(
short = "b", short = "b",
long = "bitcoind-rpc", long = "bitcoind-rpc",

@ -7,7 +7,10 @@ use libp2p::{
}, },
Multiaddr, PeerId, Multiaddr, PeerId,
}; };
use std::{collections::VecDeque, task::Poll}; use std::{
collections::{HashMap, VecDeque},
task::Poll,
};
#[derive(Debug)] #[derive(Debug)]
pub enum OutEvent { pub enum OutEvent {
@ -21,10 +24,21 @@ pub enum OutEvent {
#[derive(Default, Debug)] #[derive(Default, Debug)]
pub struct PeerTracker { pub struct PeerTracker {
connected: Option<(PeerId, Multiaddr)>, connected: Option<(PeerId, Multiaddr)>,
address_of_peer: HashMap<PeerId, Multiaddr>,
events: VecDeque<OutEvent>, events: VecDeque<OutEvent>,
} }
impl PeerTracker { impl PeerTracker {
/// Return whether we are connected to the given peer.
pub fn is_connected(&self, peer_id: &PeerId) -> bool {
if let Some((connected_peer_id, _)) = &self.connected {
if connected_peer_id == peer_id {
return true;
}
}
false
}
/// Returns the peer id of counterparty if we are connected. /// Returns the peer id of counterparty if we are connected.
pub fn counterparty_peer_id(&self) -> Option<PeerId> { pub fn counterparty_peer_id(&self) -> Option<PeerId> {
if let Some((id, _)) = &self.connected { if let Some((id, _)) = &self.connected {
@ -33,13 +47,18 @@ impl PeerTracker {
None None
} }
/// Returns the multiaddr of counterparty if we are connected. /// Returns the peer_id and multiaddr of counterparty if we are connected.
pub fn counterparty_addr(&self) -> Option<Multiaddr> { pub fn counterparty(&self) -> Option<(PeerId, Multiaddr)> {
if let Some((_, addr)) = &self.connected { if let Some((peer_id, addr)) = &self.connected {
return Some(addr.clone()); return Some((peer_id.clone(), addr.clone()));
} }
None None
} }
/// Add an address for a given peer. We only store one address per peer.
pub fn add_address(&mut self, peer_id: PeerId, address: Multiaddr) {
self.address_of_peer.insert(peer_id, address);
}
} }
impl NetworkBehaviour for PeerTracker { impl NetworkBehaviour for PeerTracker {
@ -50,11 +69,17 @@ impl NetworkBehaviour for PeerTracker {
DummyProtocolsHandler::default() DummyProtocolsHandler::default()
} }
fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> { fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
let mut addresses: Vec<Multiaddr> = vec![]; let mut addresses: Vec<Multiaddr> = vec![];
if let Some(addr) = self.counterparty_addr() { if let Some((counterparty_peer_id, addr)) = self.counterparty() {
addresses.push(addr) if counterparty_peer_id == *peer_id {
addresses.push(addr)
}
}
if let Some(addr) = self.address_of_peer.get(peer_id) {
addresses.push(addr.clone());
} }
addresses addresses

@ -56,8 +56,6 @@ pub enum Bob {
}, },
EncSigSent { EncSigSent {
state4: bob::State4, state4: bob::State4,
#[serde(with = "crate::serde::peer_id")]
peer_id: PeerId,
}, },
BtcRedeemed(bob::State5), BtcRedeemed(bob::State5),
T1Expired(bob::State4), T1Expired(bob::State4),

@ -63,7 +63,8 @@ async fn happy_path() {
let (bob_state, bob_event_loop, bob_event_loop_handle, bob_btc_wallet, bob_xmr_wallet, bob_db) = let (bob_state, bob_event_loop, bob_event_loop_handle, bob_btc_wallet, bob_xmr_wallet, bob_db) =
init_bob( init_bob(
alice_multiaddr, alice_multiaddr.clone(),
alice_event_loop.peer_id(),
&bitcoind, &bitcoind,
&monero, &monero,
btc_to_swap, btc_to_swap,
@ -83,6 +84,8 @@ async fn happy_path() {
alice_db, alice_db,
); );
let alice_peer_id = alice_event_loop.peer_id();
let _alice_swarm_fut = tokio::spawn(async move { alice_event_loop.run().await }); let _alice_swarm_fut = tokio::spawn(async move { alice_event_loop.run().await });
let bob_swap_fut = bob::swap::swap( let bob_swap_fut = bob::swap::swap(
@ -93,6 +96,8 @@ async fn happy_path() {
bob_xmr_wallet.clone(), bob_xmr_wallet.clone(),
OsRng, OsRng,
Uuid::new_v4(), Uuid::new_v4(),
alice_peer_id,
alice_multiaddr,
); );
let _bob_swarm_fut = tokio::spawn(async move { bob_event_loop.run().await }); let _bob_swarm_fut = tokio::spawn(async move { bob_event_loop.run().await });

@ -56,9 +56,12 @@ async fn given_alice_restarts_after_encsig_is_learned_resume_swap() {
) )
.await; .await;
let alice_peer_id = alice_event_loop.peer_id();
let (bob_state, bob_event_loop, bob_event_loop_handle, bob_btc_wallet, bob_xmr_wallet, bob_db) = let (bob_state, bob_event_loop, bob_event_loop_handle, bob_btc_wallet, bob_xmr_wallet, bob_db) =
init_bob( init_bob(
alice_multiaddr.clone(), alice_multiaddr.clone(),
alice_peer_id.clone(),
&bitcoind, &bitcoind,
&monero, &monero,
btc_to_swap, btc_to_swap,
@ -73,18 +76,17 @@ async fn given_alice_restarts_after_encsig_is_learned_resume_swap() {
let bob_btc_wallet_clone = bob_btc_wallet.clone(); let bob_btc_wallet_clone = bob_btc_wallet.clone();
let bob_xmr_wallet_clone = bob_xmr_wallet.clone(); let bob_xmr_wallet_clone = bob_xmr_wallet.clone();
let _ = tokio::spawn(async move { let _ = tokio::spawn(bob::swap::swap(
bob::swap::swap( bob_state,
bob_state, bob_event_loop_handle,
bob_event_loop_handle, bob_db,
bob_db, bob_btc_wallet.clone(),
bob_btc_wallet.clone(), bob_xmr_wallet.clone(),
bob_xmr_wallet.clone(), OsRng,
OsRng, Uuid::new_v4(),
Uuid::new_v4(), alice_peer_id,
) alice_multiaddr.clone(),
.await ));
});
let _bob_swarm_fut = tokio::spawn(async move { bob_event_loop.run().await }); let _bob_swarm_fut = tokio::spawn(async move { bob_event_loop.run().await });

@ -3,10 +3,11 @@ use get_port::get_port;
use libp2p::Multiaddr; use libp2p::Multiaddr;
use rand::rngs::OsRng; use rand::rngs::OsRng;
use std::convert::TryFrom; use std::convert::TryFrom;
use swap::{alice, bitcoin, bob, bob::swap::BobState, storage::Database}; use swap::{alice, alice::swap::AliceState, bitcoin, bob, bob::swap::BobState, storage::Database};
use tempfile::tempdir; use tempfile::tempdir;
use testcontainers::clients::Cli; use testcontainers::clients::Cli;
use testutils::init_tracing; use testutils::init_tracing;
use tokio::select;
use uuid::Uuid; use uuid::Uuid;
use xmr_btc::config::Config; use xmr_btc::config::Config;
@ -56,9 +57,11 @@ async fn given_bob_restarts_after_encsig_is_sent_resume_swap() {
) )
.await; .await;
let alice_peer_id = alice_event_loop.peer_id();
let (bob_state, bob_event_loop, bob_event_loop_handle, bob_btc_wallet, bob_xmr_wallet, _) = let (bob_state, bob_event_loop, bob_event_loop_handle, bob_btc_wallet, bob_xmr_wallet, _) =
init_bob( init_bob(
alice_multiaddr.clone(), alice_multiaddr.clone(),
alice_peer_id.clone(),
&bitcoind, &bitcoind,
&monero, &monero,
btc_to_swap, btc_to_swap,
@ -136,6 +139,8 @@ async fn given_bob_restarts_after_encsig_is_sent_resume_swap() {
bob_xmr_wallet, bob_xmr_wallet,
OsRng, OsRng,
bob_swap_id, bob_swap_id,
alice_peer_id,
alice_multiaddr,
) )
.await .await
.unwrap(); .unwrap();
@ -158,3 +163,149 @@ async fn given_bob_restarts_after_encsig_is_sent_resume_swap() {
assert!(xmr_alice_final <= alice_xmr_starting_balance - xmr_to_swap); assert!(xmr_alice_final <= alice_xmr_starting_balance - xmr_to_swap);
assert_eq!(xmr_bob_final, xmr_to_swap); assert_eq!(xmr_bob_final, xmr_to_swap);
} }
#[tokio::test]
async fn given_bob_restarts_after_xmr_is_locked_resume_swap() {
let _guard = init_tracing();
let cli = Cli::default();
let (
monero,
testutils::Containers {
bitcoind,
monerods: _monerods,
},
) = testutils::init_containers(&cli).await;
let btc_to_swap = bitcoin::Amount::from_sat(1_000_000);
let xmr_to_swap = xmr_btc::monero::Amount::from_piconero(1_000_000_000_000);
let bob_btc_starting_balance = btc_to_swap * 10;
let bob_xmr_starting_balance = xmr_btc::monero::Amount::from_piconero(0);
let alice_btc_starting_balance = bitcoin::Amount::ZERO;
let alice_xmr_starting_balance = xmr_to_swap * 10;
let port = get_port().expect("Failed to find a free port");
let alice_multiaddr: Multiaddr = format!("/ip4/127.0.0.1/tcp/{}", port)
.parse()
.expect("failed to parse Alice's address");
let (
alice_state,
mut alice_event_loop,
alice_event_loop_handle,
alice_btc_wallet,
alice_xmr_wallet,
alice_db,
) = init_alice(
&bitcoind,
&monero,
btc_to_swap,
xmr_to_swap,
alice_xmr_starting_balance,
alice_multiaddr.clone(),
Config::regtest(),
)
.await;
let alice_peer_id = alice_event_loop.peer_id();
let (bob_state, bob_event_loop_1, bob_event_loop_handle_1, bob_btc_wallet, bob_xmr_wallet, _) =
init_bob(
alice_multiaddr.clone(),
alice_peer_id.clone(),
&bitcoind,
&monero,
btc_to_swap,
bob_btc_starting_balance,
xmr_to_swap,
Config::regtest(),
)
.await;
let alice_fut = alice::swap::swap(
alice_state,
alice_event_loop_handle,
alice_btc_wallet.clone(),
alice_xmr_wallet.clone(),
Config::regtest(),
Uuid::new_v4(),
alice_db,
);
let bob_swap_id = Uuid::new_v4();
let bob_db_datadir = tempdir().unwrap();
let bob_xmr_locked_fut = {
let bob_db = Database::open(bob_db_datadir.path()).unwrap();
bob::swap::run_until(
bob_state,
bob::swap::is_xmr_locked,
bob_event_loop_handle_1,
bob_db,
bob_btc_wallet.clone(),
bob_xmr_wallet.clone(),
OsRng,
bob_swap_id,
)
};
tokio::spawn(async move { alice_event_loop.run().await });
let alice_fut_handle = tokio::spawn(alice_fut);
// We are selecting with bob_event_loop_1 so that we stop polling on it once
// bob reaches `xmr locked` state.
let bob_restart_state = select! {
res = bob_xmr_locked_fut => res.unwrap(),
_ = bob_event_loop_1.run() => panic!("The event loop should never finish")
};
// let tx_lock_id = if let AliceState::BtcRefunded { state3, .. } = alice_state
// { state3.tx_lock.txid()
// } else {
// panic!(format!("Alice in unexpected state: {}", alice_state));
// };
let (bob_event_loop_2, bob_event_loop_handle_2) = testutils::init_bob_event_loop();
let bob_fut = bob::swap::swap(
bob_restart_state,
bob_event_loop_handle_2,
Database::open(bob_db_datadir.path()).unwrap(),
bob_btc_wallet.clone(),
bob_xmr_wallet.clone(),
OsRng,
bob_swap_id,
alice_peer_id,
alice_multiaddr,
);
let bob_final_state = select! {
bob_final_state = bob_fut => bob_final_state.unwrap(),
_ = bob_event_loop_2.run() => panic!("Event loop is not expected to stop")
};
assert!(matches!(bob_final_state, BobState::XmrRedeemed));
// Wait for Alice to finish too.
let alice_final_state = alice_fut_handle.await.unwrap().unwrap();
assert!(matches!(alice_final_state, AliceState::BtcRedeemed));
let btc_alice_final = alice_btc_wallet.as_ref().balance().await.unwrap();
let btc_bob_final = bob_btc_wallet.as_ref().balance().await.unwrap();
let xmr_alice_final = alice_xmr_wallet.as_ref().get_balance().await.unwrap();
bob_xmr_wallet.as_ref().0.refresh().await.unwrap();
let xmr_bob_final = bob_xmr_wallet.as_ref().get_balance().await.unwrap();
assert_eq!(
btc_alice_final,
alice_btc_starting_balance + btc_to_swap - bitcoin::Amount::from_sat(bitcoin::TX_FEE)
);
assert!(btc_bob_final <= bob_btc_starting_balance - btc_to_swap);
assert!(xmr_alice_final <= alice_xmr_starting_balance - xmr_to_swap);
assert_eq!(xmr_bob_final, bob_xmr_starting_balance + xmr_to_swap);
}

@ -62,6 +62,7 @@ async fn alice_punishes_if_bob_never_acts_after_fund() {
let (bob_state, bob_event_loop, bob_event_loop_handle, bob_btc_wallet, bob_xmr_wallet, bob_db) = let (bob_state, bob_event_loop, bob_event_loop_handle, bob_btc_wallet, bob_xmr_wallet, bob_db) =
init_bob( init_bob(
alice_multiaddr, alice_multiaddr,
alice_event_loop.peer_id(),
&bitcoind, &bitcoind,
&monero, &monero,
btc_to_swap, btc_to_swap,

@ -63,6 +63,7 @@ async fn given_alice_restarts_after_xmr_is_locked_abort_swap() {
let (bob_state, bob_event_loop, bob_event_loop_handle, bob_btc_wallet, bob_xmr_wallet, bob_db) = let (bob_state, bob_event_loop, bob_event_loop_handle, bob_btc_wallet, bob_xmr_wallet, bob_db) =
init_bob( init_bob(
alice_multiaddr.clone(), alice_multiaddr.clone(),
alice_event_loop_1.peer_id(),
&bitcoind, &bitcoind,
&monero, &monero,
btc_to_swap, btc_to_swap,
@ -80,6 +81,8 @@ async fn given_alice_restarts_after_xmr_is_locked_abort_swap() {
bob_xmr_wallet.clone(), bob_xmr_wallet.clone(),
OsRng, OsRng,
Uuid::new_v4(), Uuid::new_v4(),
alice_event_loop_1.peer_id(),
alice_multiaddr.clone(),
); );
let alice_swap_id = Uuid::new_v4(); let alice_swap_id = Uuid::new_v4();

@ -1,5 +1,5 @@
use bitcoin_harness::Bitcoind; use bitcoin_harness::Bitcoind;
use libp2p::core::Multiaddr; use libp2p::{core::Multiaddr, PeerId};
use monero_harness::{image, Monero}; use monero_harness::{image, Monero};
use rand::rngs::OsRng; use rand::rngs::OsRng;
use std::sync::Arc; use std::sync::Arc;
@ -155,7 +155,7 @@ pub async fn init_alice(
} }
pub async fn init_bob_state( pub async fn init_bob_state(
alice_multiaddr: Multiaddr, alice_peer_id: PeerId,
btc_to_swap: bitcoin::Amount, btc_to_swap: bitcoin::Amount,
xmr_to_swap: xmr_btc::monero::Amount, xmr_to_swap: xmr_btc::monero::Amount,
bob_btc_wallet: Arc<bitcoin::Wallet>, bob_btc_wallet: Arc<bitcoin::Wallet>,
@ -179,7 +179,7 @@ pub async fn init_bob_state(
BobState::Started { BobState::Started {
state0, state0,
amounts, amounts,
addr: alice_multiaddr, alice_peer_id,
} }
} }
@ -192,6 +192,7 @@ pub fn init_bob_event_loop() -> (bob::event_loop::EventLoop, bob::event_loop::Ev
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub async fn init_bob( pub async fn init_bob(
alice_multiaddr: Multiaddr, alice_multiaddr: Multiaddr,
alice_peer_id: PeerId,
bitcoind: &Bitcoind<'_>, bitcoind: &Bitcoind<'_>,
monero: &Monero, monero: &Monero,
btc_to_swap: bitcoin::Amount, btc_to_swap: bitcoin::Amount,
@ -217,7 +218,7 @@ pub async fn init_bob(
.await; .await;
let bob_state = init_bob_state( let bob_state = init_bob_state(
alice_multiaddr, alice_peer_id.clone(),
btc_to_swap, btc_to_swap,
xmr_to_swap, xmr_to_swap,
bob_btc_wallet.clone(), bob_btc_wallet.clone(),
@ -225,7 +226,12 @@ pub async fn init_bob(
) )
.await; .await;
let (event_loop, event_loop_handle) = init_bob_event_loop(); let (event_loop, mut event_loop_handle) = init_bob_event_loop();
event_loop_handle
.add_address(alice_peer_id, alice_multiaddr)
.await
.unwrap();
let bob_db_dir = tempdir().unwrap(); let bob_db_dir = tempdir().unwrap();
let bob_db = Database::open(bob_db_dir.path()).unwrap(); let bob_db = Database::open(bob_db_dir.path()).unwrap();

Loading…
Cancel
Save