@ -3,15 +3,14 @@ use crate::monero::{
} ;
use ::monero ::{ Address , Network , PrivateKey , PublicKey } ;
use anyhow ::{ Context , Result } ;
use backoff ::backoff ::Constant as ConstantBackoff ;
use backoff ::future ::retry ;
use bitcoin ::hashes ::core ::sync ::atomic ::AtomicU32 ;
use monero_rpc ::wallet ;
use monero_rpc ::wallet ::{ BlockHeight , Refreshed } ;
use monero_rpc ::wallet ::{ BlockHeight , CheckTxKey , Refreshed } ;
use std ::cmp ::min ;
use std ::future ::Future ;
use std ::str ::FromStr ;
use std ::sync ::atomic ::Ordering ;
use std ::time ::Duration ;
use tokio ::sync ::Mutex ;
use tokio ::time ::Interval ;
use tracing ::{ debug , info } ;
use url ::Url ;
@ -20,22 +19,30 @@ pub struct Wallet {
inner : Mutex < wallet ::Client > ,
network : Network ,
name : String ,
avg_block_time : Duration ,
}
impl Wallet {
pub fn new ( url : Url , network : Network , name : String ) -> Self {
pub fn new ( url : Url , network : Network , name : String , avg_block_time : Duration ) -> Self {
Self {
inner : Mutex ::new ( wallet ::Client ::new ( url ) ) ,
network ,
name ,
avg_block_time ,
}
}
pub fn new_with_client ( client : wallet ::Client , network : Network , name : String ) -> Self {
pub fn new_with_client (
client : wallet ::Client ,
network : Network ,
name : String ,
avg_block_time : Duration ,
) -> Self {
Self {
inner : Mutex ::new ( client ) ,
network ,
name ,
avg_block_time ,
}
}
@ -157,63 +164,33 @@ impl Wallet {
public_spend_key : PublicKey ,
public_view_key : PublicViewKey ,
transfer_proof : TransferProof ,
expected _amount : Amount ,
expected : Amount ,
conf_target : u32 ,
) -> Result < ( ) , InsufficientFunds > {
let txid = & transfer_proof . tx_hash ( ) ;
let txid = transfer_proof . tx_hash ( ) ;
tracing ::info ! ( % txid , "Waiting for {} confirmation{} of Monero transaction" , conf_target , if conf_target > 1 { "s" } else { "" } ) ;
enum Error {
TxNotFound ,
InsufficientConfirmations ,
InsufficientFunds { expected : Amount , actual : Amount } ,
}
let address = Address ::standard ( self . network , public_spend_key , public_view_key . into ( ) ) ;
let confirmations = AtomicU32 ::new ( 0 u32 ) ;
let res = retry ( ConstantBackoff ::new ( Duration ::from_secs ( 1 ) ) , | | async {
// NOTE: Currently, this is conflicting IO errors with the transaction not being
// in the blockchain yet, or not having enough confirmations on it. All these
// errors warrant a retry, but the strategy should probably differ per case
let proof = self
. inner
. lock ( )
. await
. check_tx_key (
& String ::from ( transfer_proof . tx_hash ( ) ) ,
& transfer_proof . tx_key ( ) . to_string ( ) ,
& address . to_string ( ) ,
)
. await
. map_err ( | _ | backoff ::Error ::Transient ( Error ::TxNotFound ) ) ? ;
if proof . received ! = expected_amount . as_piconero ( ) {
return Err ( backoff ::Error ::Permanent ( Error ::InsufficientFunds {
expected : expected_amount ,
actual : Amount ::from_piconero ( proof . received ) ,
} ) ) ;
}
if proof . confirmations > = confirmations . load ( Ordering ::SeqCst ) {
confirmations . store ( proof . confirmations , Ordering ::SeqCst ) ;
info ! ( % txid , "Monero lock tx has {} out of {} confirmations" , proof . confirmations , conf_target ) ;
}
if proof . confirmations < conf_target {
return Err ( backoff ::Error ::Transient ( Error ::InsufficientConfirmations ) ) ;
}
Ok ( proof )
} )
. await ;
if let Err ( Error ::InsufficientFunds { expected , actual } ) = res {
return Err ( InsufficientFunds { expected , actual } ) ;
} ;
let check_interval =
tokio ::time ::interval ( min ( self . avg_block_time / 10 , Duration ::from_secs ( 1 ) ) ) ;
let key = & transfer_proof . tx_key ( ) . to_string ( ) ;
wait_for_confirmations (
txid . 0 ,
| txid | async move {
self . inner
. lock ( )
. await
. check_tx_key ( & txid , & key , & address . to_string ( ) )
. await
} ,
check_interval ,
expected ,
conf_target ,
)
. await ? ;
Ok ( ( ) )
}
@ -255,3 +232,124 @@ impl Wallet {
Amount ::from_monero ( 0.000_03 f64 ) . expect ( "static fee to be convertible without problems" )
}
}
async fn wait_for_confirmations < Fut > (
txid : String ,
fetch_tx : impl Fn ( String ) -> Fut ,
mut check_interval : Interval ,
expected : Amount ,
conf_target : u32 ,
) -> Result < ( ) , InsufficientFunds >
where
Fut : Future < Output = Result < CheckTxKey > > ,
{
let mut seen_confirmations = 0 u32 ;
while seen_confirmations < conf_target {
let tx = match fetch_tx ( txid . clone ( ) ) . await {
Ok ( proof ) = > proof ,
Err ( error ) = > {
tracing ::debug ! ( % txid , "Failed to retrieve tx from blockchain: {:#}" , error ) ;
continue ; // treating every error as transient and retrying
// is obviously wrong but the jsonrpc client is
// too primitive to differentiate between all the
// cases
}
} ;
let received = Amount ::from_piconero ( tx . received ) ;
if received ! = expected {
return Err ( InsufficientFunds {
expected ,
actual : received ,
} ) ;
}
if tx . confirmations > seen_confirmations {
seen_confirmations = tx . confirmations ;
info ! ( % txid , "Monero lock tx has {} out of {} confirmations" , tx . confirmations , conf_target ) ;
}
check_interval . tick ( ) . await ;
}
Ok ( ( ) )
}
#[ cfg(test) ]
mod tests {
use super ::* ;
use monero_rpc ::wallet ::CheckTxKey ;
use std ::sync ::atomic ::{ AtomicU32 , Ordering } ;
use std ::sync ::Arc ;
#[ tokio::test ]
async fn given_exact_confirmations_does_not_fetch_tx_again ( ) {
let requests = Arc ::new ( AtomicU32 ::new ( 0 ) ) ;
let result = wait_for_confirmations (
String ::from ( "TXID" ) ,
move | _ | {
let requests = requests . clone ( ) ;
async move {
match requests . fetch_add ( 1 , Ordering ::SeqCst ) {
0 = > Ok ( CheckTxKey {
confirmations : 10 ,
received : 100 ,
} ) ,
_ = > panic! ( "should not be called more than once" ) ,
}
}
} ,
tokio ::time ::interval ( Duration ::from_millis ( 10 ) ) ,
Amount ::from_piconero ( 100 ) ,
10 ,
)
. await ;
assert! ( result . is_ok ( ) )
}
/// A test that allows us to easily, visually verify if the log output is as
/// we desire.
///
/// We want the following properties:
/// - Only print confirmations if they changed i.e. not every time we
/// request them
/// - Also print the last one, i.e. 10 / 10
#[ tokio::test ]
async fn visual_log_check ( ) {
let _ = tracing_subscriber ::fmt ( ) . with_test_writer ( ) . try_init ( ) ;
const MAX_REQUESTS : u32 = 20 ;
let requests = Arc ::new ( AtomicU32 ::new ( 0 ) ) ;
let result = wait_for_confirmations (
String ::from ( "TXID" ) ,
move | _ | {
let requests = requests . clone ( ) ;
async move {
match requests . fetch_add ( 1 , Ordering ::SeqCst ) {
requests if requests < = MAX_REQUESTS = > {
Ok ( CheckTxKey {
confirmations : requests / 2 , /* every 2nd request "yields" a
* confirmation * /
received : 100 ,
} )
}
_ = > panic! ( "should not be called more than {} times" , MAX_REQUESTS ) ,
}
}
} ,
tokio ::time ::interval ( Duration ::from_millis ( 10 ) ) ,
Amount ::from_piconero ( 100 ) ,
10 ,
)
. await ;
assert! ( result . is_ok ( ) )
}
}