Introduce a more flexible transaction subscription system

Instead of watching for status changes directly on bitcoin::Wallet,
we return a Subscription object back to the caller. This subscription
object can be re-used multiple times.

Among other things, this now allows callers of `broadcast` to decide
on what to wait for given the returned Subscription object.

The new API is also more concise which allows us to remove some of
the functions on the actor states in favor of simple inline calls.

Co-authored-by: rishflab <rishflab@hotmail.com>
pull/361/head
Thomas Eizinger 3 years ago
parent 6fb495b6ab
commit 01739eddb1
No known key found for this signature in database
GPG Key ID: 651AC83A6C6C8B96

@ -15,7 +15,7 @@ use miniscript::{Descriptor, DescriptorTrait};
use sha2::Sha256;
use std::collections::HashMap;
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct TxRedeem {
inner: Transaction,
digest: SigHash,

@ -11,14 +11,13 @@ use bdk::keys::DerivableKey;
use bdk::{FeeRate, KeychainKind};
use bitcoin::Script;
use reqwest::Url;
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::convert::TryFrom;
use std::fmt;
use std::future::Future;
use std::path::Path;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Mutex;
use tokio::sync::{watch, Mutex};
const SLED_TREE_NAME: &str = "default_tree";
@ -162,14 +161,13 @@ impl Wallet {
&self,
transaction: Transaction,
kind: &str,
) -> Result<(Txid, impl Future<Output = Result<()>> + '_)> {
) -> Result<(Txid, Subscription)> {
let txid = transaction.txid();
// to watch for confirmations, watching a single output is enough
let watcher = self.wait_for_transaction_finality(
(txid, transaction.output[0].script_pubkey.clone()),
kind.to_owned(),
);
let subscription = self
.subscribe_to((txid, transaction.output[0].script_pubkey.clone()))
.await;
self.wallet
.lock()
@ -181,7 +179,7 @@ impl Wallet {
tracing::info!(%txid, "Published Bitcoin {} transaction", kind);
Ok((txid, watcher))
Ok((txid, subscription))
}
pub async fn sign_and_finalize(&self, psbt: PartiallySignedTransaction) -> Result<Transaction> {
@ -209,53 +207,91 @@ impl Wallet {
self.client.lock().await.status_of_script(tx)
}
pub async fn watch_until_status<T>(
&self,
tx: &T,
mut status_fn: impl FnMut(ScriptStatus) -> bool,
) -> Result<()>
where
T: Watchable,
{
pub async fn subscribe_to(&self, tx: impl Watchable + Send + 'static) -> Subscription {
let txid = tx.id();
let script = tx.script();
let mut last_status = None;
loop {
let new_status = self.client.lock().await.status_of_script(tx)?;
if Some(new_status) != last_status {
tracing::debug!(%txid, "Transaction is {}", new_status);
}
last_status = Some(new_status);
if status_fn(new_status) {
break;
}
let sub = self
.client
.lock()
.await
.subscriptions
.entry((txid, script.clone()))
.or_insert_with(|| {
let (sender, receiver) = watch::channel(ScriptStatus::Unseen);
let client = self.client.clone();
tokio::spawn(async move {
let mut last_status = None;
loop {
tokio::time::sleep(Duration::from_secs(5)).await;
let new_status = match client.lock().await.status_of_script(&tx) {
Ok(new_status) => new_status,
Err(e) => {
tracing::warn!(%txid, "Failed to get status of script: {:#}", e);
return;
}
};
if Some(new_status) != last_status {
tracing::debug!(%txid, "Transaction is {}", new_status);
}
last_status = Some(new_status);
let all_receivers_gone = sender.send(new_status).is_err();
if all_receivers_gone {
tracing::debug!(%txid, "All receivers gone, removing subscription");
client.lock().await.subscriptions.remove(&(txid, script));
return;
}
}
});
Subscription {
receiver,
finality_confirmations: self.finality_confirmations,
txid,
}
})
.clone();
tokio::time::sleep(Duration::from_secs(5)).await;
}
sub
}
Ok(())
/// Selects an appropriate [`FeeRate`] to be used for getting transactions
/// confirmed within a reasonable amount of time.
fn select_feerate(&self) -> FeeRate {
// TODO: This should obviously not be a const :)
FeeRate::from_sat_per_vb(5.0)
}
}
async fn wait_for_transaction_finality<T>(&self, tx: T, kind: String) -> Result<()>
where
T: Watchable,
{
/// Represents a subscription to the status of a given transaction.
#[derive(Debug, Clone)]
pub struct Subscription {
receiver: watch::Receiver<ScriptStatus>,
finality_confirmations: u32,
txid: Txid,
}
impl Subscription {
pub async fn wait_until_final(&self) -> Result<()> {
let conf_target = self.finality_confirmations;
let txid = tx.id();
let txid = self.txid;
tracing::info!(%txid, "Waiting for {} confirmation{} of Bitcoin {} transaction", conf_target, if conf_target > 1 { "s" } else { "" }, kind);
tracing::info!(%txid, "Waiting for {} confirmation{} of Bitcoin transaction", conf_target, if conf_target > 1 { "s" } else { "" });
let mut seen_confirmations = 0;
self.watch_until_status(&tx, |status| match status {
self.wait_until(|status| match status {
ScriptStatus::Confirmed(inner) => {
let confirmations = inner.confirmations();
if confirmations > seen_confirmations {
tracing::info!(%txid, "Bitcoin {} tx has {} out of {} confirmation{}", kind, confirmations, conf_target, if conf_target > 1 { "s" } else { "" });
tracing::info!(%txid, "Bitcoin tx has {} out of {} confirmation{}", confirmations, conf_target, if conf_target > 1 { "s" } else { "" });
seen_confirmations = confirmations;
}
@ -263,16 +299,33 @@ impl Wallet {
},
_ => false
})
.await?;
.await
}
Ok(())
pub async fn wait_until_seen(&self) -> Result<()> {
self.wait_until(ScriptStatus::has_been_seen).await
}
/// Selects an appropriate [`FeeRate`] to be used for getting transactions
/// confirmed within a reasonable amount of time.
fn select_feerate(&self) -> FeeRate {
// TODO: This should obviously not be a const :)
FeeRate::from_sat_per_vb(5.0)
pub async fn wait_until_confirmed_with<T>(&self, target: T) -> Result<()>
where
u32: PartialOrd<T>,
T: Copy,
{
self.wait_until(|status| status.is_confirmed_with(target))
.await
}
async fn wait_until(&self, mut predicate: impl FnMut(&ScriptStatus) -> bool) -> Result<()> {
let mut receiver = self.receiver.clone();
while !predicate(&receiver.borrow()) {
receiver
.changed()
.await
.context("Failed while waiting for next status update")?;
}
Ok(())
}
}
@ -303,6 +356,7 @@ struct Client {
last_ping: Instant,
interval: Duration,
script_history: BTreeMap<Script, Vec<GetHistoryRes>>,
subscriptions: HashMap<(Txid, Script), Subscription>,
}
impl Client {
@ -317,6 +371,7 @@ impl Client {
last_ping: Instant::now(),
interval,
script_history: Default::default(),
subscriptions: Default::default(),
})
}

@ -315,19 +315,6 @@ pub struct State3 {
}
impl State3 {
pub async fn wait_for_cancel_timelock_to_expire(
&self,
bitcoin_wallet: &bitcoin::Wallet,
) -> Result<()> {
bitcoin_wallet
.watch_until_status(&self.tx_lock, |status| {
status.is_confirmed_with(self.cancel_timelock)
})
.await?;
Ok(())
}
pub async fn expired_timelocks(
&self,
bitcoin_wallet: &bitcoin::Wallet,

@ -68,18 +68,12 @@ async fn next_state(
Ok(match state {
AliceState::Started { state3 } => {
timeout(
env_config.bob_time_to_act,
bitcoin_wallet.watch_until_status(&state3.tx_lock, |status| status.has_been_seen()),
)
.await
.context("Failed to find lock Bitcoin tx")??;
bitcoin_wallet
.watch_until_status(&state3.tx_lock, |status| {
status.is_confirmed_with(env_config.bitcoin_finality_confirmations)
})
.await?;
let tx_lock_status = bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await;
timeout(env_config.bob_time_to_act, tx_lock_status.wait_until_seen())
.await
.context("Failed to find lock Bitcoin tx")??;
tx_lock_status.wait_until_final().await?;
AliceState::BtcLocked { state3 }
}
@ -116,37 +110,42 @@ async fn next_state(
AliceState::XmrLocked {
state3,
monero_wallet_restore_blockheight,
} => match state3.expired_timelocks(bitcoin_wallet).await? {
ExpiredTimelocks::None => {
select! {
_ = state3.wait_for_cancel_timelock_to_expire(bitcoin_wallet) => {
AliceState::CancelTimelockExpired {
state3,
monero_wallet_restore_blockheight,
} => {
let tx_lock_status = bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await;
match state3.expired_timelocks(bitcoin_wallet).await? {
ExpiredTimelocks::None => {
select! {
_ = tx_lock_status.wait_until_confirmed_with(state3.cancel_timelock) => {
AliceState::CancelTimelockExpired {
state3,
monero_wallet_restore_blockheight,
}
}
}
enc_sig = event_loop_handle.recv_encrypted_signature() => {
tracing::info!("Received encrypted signature");
enc_sig = event_loop_handle.recv_encrypted_signature() => {
tracing::info!("Received encrypted signature");
AliceState::EncSigLearned {
state3,
encrypted_signature: Box::new(enc_sig?),
monero_wallet_restore_blockheight,
AliceState::EncSigLearned {
state3,
encrypted_signature: Box::new(enc_sig?),
monero_wallet_restore_blockheight,
}
}
}
}
_ => AliceState::CancelTimelockExpired {
state3,
monero_wallet_restore_blockheight,
},
}
_ => AliceState::CancelTimelockExpired {
state3,
monero_wallet_restore_blockheight,
},
},
}
AliceState::EncSigLearned {
state3,
encrypted_signature,
monero_wallet_restore_blockheight,
} => match state3.expired_timelocks(bitcoin_wallet).await? {
ExpiredTimelocks::None => {
let tx_lock_status = bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await;
match TxRedeem::new(&state3.tx_lock, &state3.redeem_address).complete(
*encrypted_signature,
state3.a.clone(),
@ -154,7 +153,7 @@ async fn next_state(
state3.B,
) {
Ok(tx) => match bitcoin_wallet.broadcast(tx, "redeem").await {
Ok((_, finality)) => match finality.await {
Ok((_, subscription)) => match subscription.wait_until_final().await {
Ok(_) => AliceState::BtcRedeemed,
Err(e) => {
bail!("Waiting for Bitcoin transaction finality failed with {}! The redeem transaction was published, but it is not ensured that the transaction was included! You're screwed.", e)
@ -162,8 +161,8 @@ async fn next_state(
},
Err(e) => {
error!("Publishing the redeem transaction failed with {}, attempting to wait for cancellation now. If you restart the application before the timelock is expired publishing the redeem transaction will be retried.", e);
state3
.wait_for_cancel_timelock_to_expire(bitcoin_wallet)
tx_lock_status
.wait_until_confirmed_with(state3.cancel_timelock)
.await?;
AliceState::CancelTimelockExpired {
@ -174,8 +173,8 @@ async fn next_state(
},
Err(e) => {
error!("Constructing the redeem transaction failed with {}, attempting to wait for cancellation now.", e);
state3
.wait_for_cancel_timelock_to_expire(bitcoin_wallet)
tx_lock_status
.wait_until_confirmed_with(state3.cancel_timelock)
.await?;
AliceState::CancelTimelockExpired {
@ -226,22 +225,15 @@ async fn next_state(
state3,
monero_wallet_restore_blockheight,
} => {
let tx_refund = state3.tx_refund();
let tx_cancel = state3.tx_cancel();
let seen_refund_tx =
bitcoin_wallet.watch_until_status(&tx_refund, |status| status.has_been_seen());
let punish_timelock_expired = bitcoin_wallet.watch_until_status(&tx_cancel, |status| {
status.is_confirmed_with(state3.punish_timelock)
});
let tx_refund_status = bitcoin_wallet.subscribe_to(state3.tx_refund()).await;
let tx_cancel_status = bitcoin_wallet.subscribe_to(state3.tx_cancel()).await;
select! {
seen_refund = seen_refund_tx => {
seen_refund = tx_refund_status.wait_until_seen() => {
seen_refund.context("Failed to monitor refund transaction")?;
let published_refund_tx = bitcoin_wallet.get_raw_transaction(tx_refund.txid()).await?;
let published_refund_tx = bitcoin_wallet.get_raw_transaction(state3.tx_refund().txid()).await?;
let spend_key = tx_refund.extract_monero_private_key(
let spend_key = state3.tx_refund().extract_monero_private_key(
published_refund_tx,
state3.s_a,
state3.a.clone(),
@ -254,7 +246,7 @@ async fn next_state(
monero_wallet_restore_blockheight,
}
}
_ = punish_timelock_expired => {
_ = tx_cancel_status.wait_until_confirmed_with(state3.punish_timelock) => {
AliceState::BtcPunishable {
state3,
monero_wallet_restore_blockheight,
@ -286,8 +278,9 @@ async fn next_state(
)?;
let punish = async {
let (txid, finality) = bitcoin_wallet.broadcast(signed_tx_punish, "punish").await?;
finality.await?;
let (txid, subscription) =
bitcoin_wallet.broadcast(signed_tx_punish, "punish").await?;
subscription.wait_until_final().await?;
Result::<_, anyhow::Error>::Ok(txid)
}

@ -295,11 +295,11 @@ pub struct State3 {
S_a_bitcoin: bitcoin::PublicKey,
v: monero::PrivateViewKey,
xmr: monero::Amount,
cancel_timelock: CancelTimelock,
pub cancel_timelock: CancelTimelock,
punish_timelock: PunishTimelock,
refund_address: bitcoin::Address,
redeem_address: bitcoin::Address,
tx_lock: bitcoin::TxLock,
pub tx_lock: bitcoin::TxLock,
tx_cancel_sig_a: Signature,
tx_refund_encsig: bitcoin::EncryptedSignature,
min_monero_confirmations: u64,
@ -338,18 +338,6 @@ impl State3 {
}
}
pub async fn wait_for_cancel_timelock_to_expire(
&self,
bitcoin_wallet: &bitcoin::Wallet,
) -> Result<()> {
bitcoin_wallet
.watch_until_status(&self.tx_lock, |status| {
status.is_confirmed_with(self.cancel_timelock)
})
.await?;
Ok(())
}
pub fn cancel(&self) -> State6 {
State6 {
A: self.A,
@ -393,11 +381,11 @@ pub struct State4 {
s_b: monero::Scalar,
S_a_bitcoin: bitcoin::PublicKey,
v: monero::PrivateViewKey,
cancel_timelock: CancelTimelock,
pub cancel_timelock: CancelTimelock,
punish_timelock: PunishTimelock,
refund_address: bitcoin::Address,
redeem_address: bitcoin::Address,
tx_lock: bitcoin::TxLock,
pub tx_lock: bitcoin::TxLock,
tx_cancel_sig_a: Signature,
tx_refund_encsig: bitcoin::EncryptedSignature,
monero_wallet_restore_blockheight: BlockHeight,
@ -414,7 +402,9 @@ impl State4 {
let tx_redeem_encsig = self.b.encsign(self.S_a_bitcoin, tx_redeem.digest());
bitcoin_wallet
.watch_until_status(&tx_redeem, |status| status.has_been_seen())
.subscribe_to(tx_redeem.clone())
.await
.wait_until_seen()
.await?;
let tx_redeem_candidate = bitcoin_wallet.get_raw_transaction(tx_redeem.txid()).await?;
@ -433,19 +423,6 @@ impl State4 {
})
}
pub async fn wait_for_cancel_timelock_to_expire(
&self,
bitcoin_wallet: &bitcoin::Wallet,
) -> Result<()> {
bitcoin_wallet
.watch_until_status(&self.tx_lock, |status| {
status.is_confirmed_with(self.cancel_timelock)
})
.await?;
Ok(())
}
pub async fn expired_timelock(
&self,
bitcoin_wallet: &bitcoin::Wallet,
@ -569,9 +546,9 @@ impl State6 {
let signed_tx_refund =
tx_refund.add_signatures((self.A, sig_a), (self.b.public(), sig_b))?;
let (_, finality) = bitcoin_wallet.broadcast(signed_tx_refund, "refund").await?;
let (_, subscription) = bitcoin_wallet.broadcast(signed_tx_refund, "refund").await?;
finality.await?;
subscription.wait_until_final().await?;
Ok(())
}

@ -8,7 +8,7 @@ use crate::{bitcoin, monero};
use anyhow::{bail, Context, Result};
use rand::rngs::OsRng;
use tokio::select;
use tracing::trace;
use tracing::{info, trace};
pub fn is_complete(state: &BobState) -> bool {
matches!(
@ -89,10 +89,12 @@ async fn next_state(
// Bob has locked Btc
// Watch for Alice to Lock Xmr or for cancel timelock to elapse
BobState::BtcLocked(state3) => {
let tx_lock_status = bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await;
if let ExpiredTimelocks::None = state3.current_epoch(bitcoin_wallet).await? {
let transfer_proof_watcher = event_loop_handle.recv_transfer_proof();
let cancel_timelock_expires =
state3.wait_for_cancel_timelock_to_expire(bitcoin_wallet);
tx_lock_status.wait_until_confirmed_with(state3.cancel_timelock);
// Record the current monero wallet block height so we don't have to scan from
// block 0 once we create the redeem wallet.
@ -129,6 +131,8 @@ async fn next_state(
lock_transfer_proof,
monero_wallet_restore_blockheight,
} => {
let tx_lock_status = bitcoin_wallet.subscribe_to(state.tx_lock.clone()).await;
if let ExpiredTimelocks::None = state.current_epoch(bitcoin_wallet).await? {
let watch_request = state.lock_xmr_watch_request(lock_transfer_proof);
@ -138,13 +142,13 @@ async fn next_state(
Ok(()) => BobState::XmrLocked(state.xmr_locked(monero_wallet_restore_blockheight)),
Err(e) => {
tracing::warn!("Waiting for refund because insufficient Monero have been locked! {}", e);
state.wait_for_cancel_timelock_to_expire(bitcoin_wallet).await?;
tx_lock_status.wait_until_confirmed_with(state.cancel_timelock).await?;
BobState::CancelTimelockExpired(state.cancel())
},
}
}
_ = state.wait_for_cancel_timelock_to_expire(bitcoin_wallet) => {
_ = tx_lock_status.wait_until_confirmed_with(state.cancel_timelock) => {
BobState::CancelTimelockExpired(state.cancel())
}
}
@ -153,6 +157,10 @@ async fn next_state(
}
}
BobState::XmrLocked(state) => {
let tx_lock_status = bitcoin_wallet.subscribe_to(state.tx_lock.clone()).await;
info!("{:?}", tx_lock_status);
if let ExpiredTimelocks::None = state.expired_timelock(bitcoin_wallet).await? {
// Alice has locked Xmr
// Bob sends Alice his key
@ -161,7 +169,7 @@ async fn next_state(
_ = event_loop_handle.send_encrypted_signature(state.tx_redeem_encsig()) => {
BobState::EncSigSent(state)
},
_ = state.wait_for_cancel_timelock_to_expire(bitcoin_wallet) => {
_ = tx_lock_status.wait_until_confirmed_with(state.cancel_timelock) => {
BobState::CancelTimelockExpired(state.cancel())
}
}
@ -170,12 +178,14 @@ async fn next_state(
}
}
BobState::EncSigSent(state) => {
let tx_lock_status = bitcoin_wallet.subscribe_to(state.tx_lock.clone()).await;
if let ExpiredTimelocks::None = state.expired_timelock(bitcoin_wallet).await? {
select! {
state5 = state.watch_for_redeem_btc(bitcoin_wallet) => {
BobState::BtcRedeemed(state5?)
},
_ = state.wait_for_cancel_timelock_to_expire(bitcoin_wallet) => {
_ = tx_lock_status.wait_until_confirmed_with(state.cancel_timelock) => {
BobState::CancelTimelockExpired(state.cancel())
}
}

@ -21,8 +21,11 @@ async fn given_bob_manually_refunds_after_btc_locked_bob_refunds() {
// Ensure Bob's timelock is expired
if let BobState::BtcLocked(state3) = bob_swap.state.clone() {
state3
.wait_for_cancel_timelock_to_expire(bob_swap.bitcoin_wallet.as_ref())
bob_swap
.bitcoin_wallet
.subscribe_to(state3.tx_lock)
.await
.wait_until_confirmed_with(state3.cancel_timelock)
.await?;
} else {
panic!("Bob in unexpected state {}", bob_swap.state);

Loading…
Cancel
Save