diff --git a/swap/src/bitcoin/redeem.rs b/swap/src/bitcoin/redeem.rs index 6d14c201..da073693 100644 --- a/swap/src/bitcoin/redeem.rs +++ b/swap/src/bitcoin/redeem.rs @@ -15,7 +15,7 @@ use miniscript::{Descriptor, DescriptorTrait}; use sha2::Sha256; use std::collections::HashMap; -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct TxRedeem { inner: Transaction, digest: SigHash, diff --git a/swap/src/bitcoin/wallet.rs b/swap/src/bitcoin/wallet.rs index 97ae5424..e4ba57d4 100644 --- a/swap/src/bitcoin/wallet.rs +++ b/swap/src/bitcoin/wallet.rs @@ -11,14 +11,13 @@ use bdk::keys::DerivableKey; use bdk::{FeeRate, KeychainKind}; use bitcoin::Script; use reqwest::Url; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::convert::TryFrom; use std::fmt; -use std::future::Future; use std::path::Path; use std::sync::Arc; use std::time::{Duration, Instant}; -use tokio::sync::Mutex; +use tokio::sync::{watch, Mutex}; const SLED_TREE_NAME: &str = "default_tree"; @@ -162,14 +161,13 @@ impl Wallet { &self, transaction: Transaction, kind: &str, - ) -> Result<(Txid, impl Future> + '_)> { + ) -> Result<(Txid, Subscription)> { let txid = transaction.txid(); // to watch for confirmations, watching a single output is enough - let watcher = self.wait_for_transaction_finality( - (txid, transaction.output[0].script_pubkey.clone()), - kind.to_owned(), - ); + let subscription = self + .subscribe_to((txid, transaction.output[0].script_pubkey.clone())) + .await; self.wallet .lock() @@ -181,7 +179,7 @@ impl Wallet { tracing::info!(%txid, "Published Bitcoin {} transaction", kind); - Ok((txid, watcher)) + Ok((txid, subscription)) } pub async fn sign_and_finalize(&self, psbt: PartiallySignedTransaction) -> Result { @@ -209,53 +207,91 @@ impl Wallet { self.client.lock().await.status_of_script(tx) } - pub async fn watch_until_status( - &self, - tx: &T, - mut status_fn: impl FnMut(ScriptStatus) -> bool, - ) -> Result<()> - where - T: Watchable, - { + pub async fn subscribe_to(&self, tx: impl Watchable + Send + 'static) -> Subscription { let txid = tx.id(); + let script = tx.script(); - let mut last_status = None; - - loop { - let new_status = self.client.lock().await.status_of_script(tx)?; - - if Some(new_status) != last_status { - tracing::debug!(%txid, "Transaction is {}", new_status); - } - last_status = Some(new_status); - - if status_fn(new_status) { - break; - } + let sub = self + .client + .lock() + .await + .subscriptions + .entry((txid, script.clone())) + .or_insert_with(|| { + let (sender, receiver) = watch::channel(ScriptStatus::Unseen); + let client = self.client.clone(); + + tokio::spawn(async move { + let mut last_status = None; + + loop { + tokio::time::sleep(Duration::from_secs(5)).await; + + let new_status = match client.lock().await.status_of_script(&tx) { + Ok(new_status) => new_status, + Err(e) => { + tracing::warn!(%txid, "Failed to get status of script: {:#}", e); + return; + } + }; + + if Some(new_status) != last_status { + tracing::debug!(%txid, "Transaction is {}", new_status); + } + last_status = Some(new_status); + + let all_receivers_gone = sender.send(new_status).is_err(); + + if all_receivers_gone { + tracing::debug!(%txid, "All receivers gone, removing subscription"); + client.lock().await.subscriptions.remove(&(txid, script)); + return; + } + } + }); + + Subscription { + receiver, + finality_confirmations: self.finality_confirmations, + txid, + } + }) + .clone(); - tokio::time::sleep(Duration::from_secs(5)).await; - } + sub + } - Ok(()) + /// Selects an appropriate [`FeeRate`] to be used for getting transactions + /// confirmed within a reasonable amount of time. + fn select_feerate(&self) -> FeeRate { + // TODO: This should obviously not be a const :) + FeeRate::from_sat_per_vb(5.0) } +} - async fn wait_for_transaction_finality(&self, tx: T, kind: String) -> Result<()> - where - T: Watchable, - { +/// Represents a subscription to the status of a given transaction. +#[derive(Debug, Clone)] +pub struct Subscription { + receiver: watch::Receiver, + finality_confirmations: u32, + txid: Txid, +} + +impl Subscription { + pub async fn wait_until_final(&self) -> Result<()> { let conf_target = self.finality_confirmations; - let txid = tx.id(); + let txid = self.txid; - tracing::info!(%txid, "Waiting for {} confirmation{} of Bitcoin {} transaction", conf_target, if conf_target > 1 { "s" } else { "" }, kind); + tracing::info!(%txid, "Waiting for {} confirmation{} of Bitcoin transaction", conf_target, if conf_target > 1 { "s" } else { "" }); let mut seen_confirmations = 0; - self.watch_until_status(&tx, |status| match status { + self.wait_until(|status| match status { ScriptStatus::Confirmed(inner) => { let confirmations = inner.confirmations(); if confirmations > seen_confirmations { - tracing::info!(%txid, "Bitcoin {} tx has {} out of {} confirmation{}", kind, confirmations, conf_target, if conf_target > 1 { "s" } else { "" }); + tracing::info!(%txid, "Bitcoin tx has {} out of {} confirmation{}", confirmations, conf_target, if conf_target > 1 { "s" } else { "" }); seen_confirmations = confirmations; } @@ -263,16 +299,33 @@ impl Wallet { }, _ => false }) - .await?; + .await + } - Ok(()) + pub async fn wait_until_seen(&self) -> Result<()> { + self.wait_until(ScriptStatus::has_been_seen).await } - /// Selects an appropriate [`FeeRate`] to be used for getting transactions - /// confirmed within a reasonable amount of time. - fn select_feerate(&self) -> FeeRate { - // TODO: This should obviously not be a const :) - FeeRate::from_sat_per_vb(5.0) + pub async fn wait_until_confirmed_with(&self, target: T) -> Result<()> + where + u32: PartialOrd, + T: Copy, + { + self.wait_until(|status| status.is_confirmed_with(target)) + .await + } + + async fn wait_until(&self, mut predicate: impl FnMut(&ScriptStatus) -> bool) -> Result<()> { + let mut receiver = self.receiver.clone(); + + while !predicate(&receiver.borrow()) { + receiver + .changed() + .await + .context("Failed while waiting for next status update")?; + } + + Ok(()) } } @@ -303,6 +356,7 @@ struct Client { last_ping: Instant, interval: Duration, script_history: BTreeMap>, + subscriptions: HashMap<(Txid, Script), Subscription>, } impl Client { @@ -317,6 +371,7 @@ impl Client { last_ping: Instant::now(), interval, script_history: Default::default(), + subscriptions: Default::default(), }) } diff --git a/swap/src/protocol/alice/state.rs b/swap/src/protocol/alice/state.rs index e0ff69df..ae9e58dd 100644 --- a/swap/src/protocol/alice/state.rs +++ b/swap/src/protocol/alice/state.rs @@ -315,19 +315,6 @@ pub struct State3 { } impl State3 { - pub async fn wait_for_cancel_timelock_to_expire( - &self, - bitcoin_wallet: &bitcoin::Wallet, - ) -> Result<()> { - bitcoin_wallet - .watch_until_status(&self.tx_lock, |status| { - status.is_confirmed_with(self.cancel_timelock) - }) - .await?; - - Ok(()) - } - pub async fn expired_timelocks( &self, bitcoin_wallet: &bitcoin::Wallet, diff --git a/swap/src/protocol/alice/swap.rs b/swap/src/protocol/alice/swap.rs index bf84bbea..b07b92ae 100644 --- a/swap/src/protocol/alice/swap.rs +++ b/swap/src/protocol/alice/swap.rs @@ -68,18 +68,12 @@ async fn next_state( Ok(match state { AliceState::Started { state3 } => { - timeout( - env_config.bob_time_to_act, - bitcoin_wallet.watch_until_status(&state3.tx_lock, |status| status.has_been_seen()), - ) - .await - .context("Failed to find lock Bitcoin tx")??; - - bitcoin_wallet - .watch_until_status(&state3.tx_lock, |status| { - status.is_confirmed_with(env_config.bitcoin_finality_confirmations) - }) - .await?; + let tx_lock_status = bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await; + timeout(env_config.bob_time_to_act, tx_lock_status.wait_until_seen()) + .await + .context("Failed to find lock Bitcoin tx")??; + + tx_lock_status.wait_until_final().await?; AliceState::BtcLocked { state3 } } @@ -116,37 +110,42 @@ async fn next_state( AliceState::XmrLocked { state3, monero_wallet_restore_blockheight, - } => match state3.expired_timelocks(bitcoin_wallet).await? { - ExpiredTimelocks::None => { - select! { - _ = state3.wait_for_cancel_timelock_to_expire(bitcoin_wallet) => { - AliceState::CancelTimelockExpired { - state3, - monero_wallet_restore_blockheight, + } => { + let tx_lock_status = bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await; + + match state3.expired_timelocks(bitcoin_wallet).await? { + ExpiredTimelocks::None => { + select! { + _ = tx_lock_status.wait_until_confirmed_with(state3.cancel_timelock) => { + AliceState::CancelTimelockExpired { + state3, + monero_wallet_restore_blockheight, + } } - } - enc_sig = event_loop_handle.recv_encrypted_signature() => { - tracing::info!("Received encrypted signature"); + enc_sig = event_loop_handle.recv_encrypted_signature() => { + tracing::info!("Received encrypted signature"); - AliceState::EncSigLearned { - state3, - encrypted_signature: Box::new(enc_sig?), - monero_wallet_restore_blockheight, + AliceState::EncSigLearned { + state3, + encrypted_signature: Box::new(enc_sig?), + monero_wallet_restore_blockheight, + } } } } + _ => AliceState::CancelTimelockExpired { + state3, + monero_wallet_restore_blockheight, + }, } - _ => AliceState::CancelTimelockExpired { - state3, - monero_wallet_restore_blockheight, - }, - }, + } AliceState::EncSigLearned { state3, encrypted_signature, monero_wallet_restore_blockheight, } => match state3.expired_timelocks(bitcoin_wallet).await? { ExpiredTimelocks::None => { + let tx_lock_status = bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await; match TxRedeem::new(&state3.tx_lock, &state3.redeem_address).complete( *encrypted_signature, state3.a.clone(), @@ -154,7 +153,7 @@ async fn next_state( state3.B, ) { Ok(tx) => match bitcoin_wallet.broadcast(tx, "redeem").await { - Ok((_, finality)) => match finality.await { + Ok((_, subscription)) => match subscription.wait_until_final().await { Ok(_) => AliceState::BtcRedeemed, Err(e) => { bail!("Waiting for Bitcoin transaction finality failed with {}! The redeem transaction was published, but it is not ensured that the transaction was included! You're screwed.", e) @@ -162,8 +161,8 @@ async fn next_state( }, Err(e) => { error!("Publishing the redeem transaction failed with {}, attempting to wait for cancellation now. If you restart the application before the timelock is expired publishing the redeem transaction will be retried.", e); - state3 - .wait_for_cancel_timelock_to_expire(bitcoin_wallet) + tx_lock_status + .wait_until_confirmed_with(state3.cancel_timelock) .await?; AliceState::CancelTimelockExpired { @@ -174,8 +173,8 @@ async fn next_state( }, Err(e) => { error!("Constructing the redeem transaction failed with {}, attempting to wait for cancellation now.", e); - state3 - .wait_for_cancel_timelock_to_expire(bitcoin_wallet) + tx_lock_status + .wait_until_confirmed_with(state3.cancel_timelock) .await?; AliceState::CancelTimelockExpired { @@ -226,22 +225,15 @@ async fn next_state( state3, monero_wallet_restore_blockheight, } => { - let tx_refund = state3.tx_refund(); - let tx_cancel = state3.tx_cancel(); - - let seen_refund_tx = - bitcoin_wallet.watch_until_status(&tx_refund, |status| status.has_been_seen()); - - let punish_timelock_expired = bitcoin_wallet.watch_until_status(&tx_cancel, |status| { - status.is_confirmed_with(state3.punish_timelock) - }); + let tx_refund_status = bitcoin_wallet.subscribe_to(state3.tx_refund()).await; + let tx_cancel_status = bitcoin_wallet.subscribe_to(state3.tx_cancel()).await; select! { - seen_refund = seen_refund_tx => { + seen_refund = tx_refund_status.wait_until_seen() => { seen_refund.context("Failed to monitor refund transaction")?; - let published_refund_tx = bitcoin_wallet.get_raw_transaction(tx_refund.txid()).await?; + let published_refund_tx = bitcoin_wallet.get_raw_transaction(state3.tx_refund().txid()).await?; - let spend_key = tx_refund.extract_monero_private_key( + let spend_key = state3.tx_refund().extract_monero_private_key( published_refund_tx, state3.s_a, state3.a.clone(), @@ -254,7 +246,7 @@ async fn next_state( monero_wallet_restore_blockheight, } } - _ = punish_timelock_expired => { + _ = tx_cancel_status.wait_until_confirmed_with(state3.punish_timelock) => { AliceState::BtcPunishable { state3, monero_wallet_restore_blockheight, @@ -286,8 +278,9 @@ async fn next_state( )?; let punish = async { - let (txid, finality) = bitcoin_wallet.broadcast(signed_tx_punish, "punish").await?; - finality.await?; + let (txid, subscription) = + bitcoin_wallet.broadcast(signed_tx_punish, "punish").await?; + subscription.wait_until_final().await?; Result::<_, anyhow::Error>::Ok(txid) } diff --git a/swap/src/protocol/bob/state.rs b/swap/src/protocol/bob/state.rs index 1ccfc476..dd3a3bea 100644 --- a/swap/src/protocol/bob/state.rs +++ b/swap/src/protocol/bob/state.rs @@ -295,11 +295,11 @@ pub struct State3 { S_a_bitcoin: bitcoin::PublicKey, v: monero::PrivateViewKey, xmr: monero::Amount, - cancel_timelock: CancelTimelock, + pub cancel_timelock: CancelTimelock, punish_timelock: PunishTimelock, refund_address: bitcoin::Address, redeem_address: bitcoin::Address, - tx_lock: bitcoin::TxLock, + pub tx_lock: bitcoin::TxLock, tx_cancel_sig_a: Signature, tx_refund_encsig: bitcoin::EncryptedSignature, min_monero_confirmations: u64, @@ -338,18 +338,6 @@ impl State3 { } } - pub async fn wait_for_cancel_timelock_to_expire( - &self, - bitcoin_wallet: &bitcoin::Wallet, - ) -> Result<()> { - bitcoin_wallet - .watch_until_status(&self.tx_lock, |status| { - status.is_confirmed_with(self.cancel_timelock) - }) - .await?; - Ok(()) - } - pub fn cancel(&self) -> State6 { State6 { A: self.A, @@ -393,11 +381,11 @@ pub struct State4 { s_b: monero::Scalar, S_a_bitcoin: bitcoin::PublicKey, v: monero::PrivateViewKey, - cancel_timelock: CancelTimelock, + pub cancel_timelock: CancelTimelock, punish_timelock: PunishTimelock, refund_address: bitcoin::Address, redeem_address: bitcoin::Address, - tx_lock: bitcoin::TxLock, + pub tx_lock: bitcoin::TxLock, tx_cancel_sig_a: Signature, tx_refund_encsig: bitcoin::EncryptedSignature, monero_wallet_restore_blockheight: BlockHeight, @@ -414,7 +402,9 @@ impl State4 { let tx_redeem_encsig = self.b.encsign(self.S_a_bitcoin, tx_redeem.digest()); bitcoin_wallet - .watch_until_status(&tx_redeem, |status| status.has_been_seen()) + .subscribe_to(tx_redeem.clone()) + .await + .wait_until_seen() .await?; let tx_redeem_candidate = bitcoin_wallet.get_raw_transaction(tx_redeem.txid()).await?; @@ -433,19 +423,6 @@ impl State4 { }) } - pub async fn wait_for_cancel_timelock_to_expire( - &self, - bitcoin_wallet: &bitcoin::Wallet, - ) -> Result<()> { - bitcoin_wallet - .watch_until_status(&self.tx_lock, |status| { - status.is_confirmed_with(self.cancel_timelock) - }) - .await?; - - Ok(()) - } - pub async fn expired_timelock( &self, bitcoin_wallet: &bitcoin::Wallet, @@ -569,9 +546,9 @@ impl State6 { let signed_tx_refund = tx_refund.add_signatures((self.A, sig_a), (self.b.public(), sig_b))?; - let (_, finality) = bitcoin_wallet.broadcast(signed_tx_refund, "refund").await?; + let (_, subscription) = bitcoin_wallet.broadcast(signed_tx_refund, "refund").await?; - finality.await?; + subscription.wait_until_final().await?; Ok(()) } diff --git a/swap/src/protocol/bob/swap.rs b/swap/src/protocol/bob/swap.rs index ac0ef989..2df76c08 100644 --- a/swap/src/protocol/bob/swap.rs +++ b/swap/src/protocol/bob/swap.rs @@ -8,7 +8,7 @@ use crate::{bitcoin, monero}; use anyhow::{bail, Context, Result}; use rand::rngs::OsRng; use tokio::select; -use tracing::trace; +use tracing::{info, trace}; pub fn is_complete(state: &BobState) -> bool { matches!( @@ -89,10 +89,12 @@ async fn next_state( // Bob has locked Btc // Watch for Alice to Lock Xmr or for cancel timelock to elapse BobState::BtcLocked(state3) => { + let tx_lock_status = bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await; + if let ExpiredTimelocks::None = state3.current_epoch(bitcoin_wallet).await? { let transfer_proof_watcher = event_loop_handle.recv_transfer_proof(); let cancel_timelock_expires = - state3.wait_for_cancel_timelock_to_expire(bitcoin_wallet); + tx_lock_status.wait_until_confirmed_with(state3.cancel_timelock); // Record the current monero wallet block height so we don't have to scan from // block 0 once we create the redeem wallet. @@ -129,6 +131,8 @@ async fn next_state( lock_transfer_proof, monero_wallet_restore_blockheight, } => { + let tx_lock_status = bitcoin_wallet.subscribe_to(state.tx_lock.clone()).await; + if let ExpiredTimelocks::None = state.current_epoch(bitcoin_wallet).await? { let watch_request = state.lock_xmr_watch_request(lock_transfer_proof); @@ -138,13 +142,13 @@ async fn next_state( Ok(()) => BobState::XmrLocked(state.xmr_locked(monero_wallet_restore_blockheight)), Err(e) => { tracing::warn!("Waiting for refund because insufficient Monero have been locked! {}", e); - state.wait_for_cancel_timelock_to_expire(bitcoin_wallet).await?; + tx_lock_status.wait_until_confirmed_with(state.cancel_timelock).await?; BobState::CancelTimelockExpired(state.cancel()) }, } } - _ = state.wait_for_cancel_timelock_to_expire(bitcoin_wallet) => { + _ = tx_lock_status.wait_until_confirmed_with(state.cancel_timelock) => { BobState::CancelTimelockExpired(state.cancel()) } } @@ -153,6 +157,10 @@ async fn next_state( } } BobState::XmrLocked(state) => { + let tx_lock_status = bitcoin_wallet.subscribe_to(state.tx_lock.clone()).await; + + info!("{:?}", tx_lock_status); + if let ExpiredTimelocks::None = state.expired_timelock(bitcoin_wallet).await? { // Alice has locked Xmr // Bob sends Alice his key @@ -161,7 +169,7 @@ async fn next_state( _ = event_loop_handle.send_encrypted_signature(state.tx_redeem_encsig()) => { BobState::EncSigSent(state) }, - _ = state.wait_for_cancel_timelock_to_expire(bitcoin_wallet) => { + _ = tx_lock_status.wait_until_confirmed_with(state.cancel_timelock) => { BobState::CancelTimelockExpired(state.cancel()) } } @@ -170,12 +178,14 @@ async fn next_state( } } BobState::EncSigSent(state) => { + let tx_lock_status = bitcoin_wallet.subscribe_to(state.tx_lock.clone()).await; + if let ExpiredTimelocks::None = state.expired_timelock(bitcoin_wallet).await? { select! { state5 = state.watch_for_redeem_btc(bitcoin_wallet) => { BobState::BtcRedeemed(state5?) }, - _ = state.wait_for_cancel_timelock_to_expire(bitcoin_wallet) => { + _ = tx_lock_status.wait_until_confirmed_with(state.cancel_timelock) => { BobState::CancelTimelockExpired(state.cancel()) } } diff --git a/swap/tests/bob_refunds_using_cancel_and_refund_command.rs b/swap/tests/bob_refunds_using_cancel_and_refund_command.rs index dc2ea482..e04eab58 100644 --- a/swap/tests/bob_refunds_using_cancel_and_refund_command.rs +++ b/swap/tests/bob_refunds_using_cancel_and_refund_command.rs @@ -21,8 +21,11 @@ async fn given_bob_manually_refunds_after_btc_locked_bob_refunds() { // Ensure Bob's timelock is expired if let BobState::BtcLocked(state3) = bob_swap.state.clone() { - state3 - .wait_for_cancel_timelock_to_expire(bob_swap.bitcoin_wallet.as_ref()) + bob_swap + .bitcoin_wallet + .subscribe_to(state3.tx_lock) + .await + .wait_until_confirmed_with(state3.cancel_timelock) .await?; } else { panic!("Bob in unexpected state {}", bob_swap.state);