From 1822886cd06a5bca918d17a6d5fae52e55969637 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 5 Mar 2021 13:52:24 +1100 Subject: [PATCH] Provide stronger isolation of kraken module Instead of leaking the tokio::sync::watch::Receiver type in our return value, we create a newtype that implements the desired interface. This allows us to get rid of the `RateService` structs and instead implement `LatestRate` directly on top of this struct. Given that `LatestRate` is only used within the event_loop module, we move the definition of this type into there. --- swap/src/asb.rs | 14 +++------ swap/src/asb/fixed_rate.rs | 21 ++++++-------- swap/src/asb/kraken.rs | 25 ---------------- swap/src/asb/{amounts.rs => rate.rs} | 0 swap/src/bin/asb.rs | 7 ++--- swap/src/bin/kraken_ticker.rs | 4 +-- swap/src/kraken.rs | 23 +++++++++++++-- swap/src/protocol/alice/event_loop.rs | 41 +++++++++++++++++++++------ swap/tests/testutils/mod.rs | 7 ++--- 9 files changed, 73 insertions(+), 69 deletions(-) delete mode 100644 swap/src/asb/kraken.rs rename swap/src/asb/{amounts.rs => rate.rs} (100%) diff --git a/swap/src/asb.rs b/swap/src/asb.rs index 78ac0dde..f755822e 100644 --- a/swap/src/asb.rs +++ b/swap/src/asb.rs @@ -1,13 +1,7 @@ -mod amounts; pub mod command; pub mod config; -pub mod fixed_rate; -pub mod kraken; +mod fixed_rate; +mod rate; -pub use amounts::Rate; - -pub trait LatestRate { - type Error: std::error::Error + Send + Sync + 'static; - - fn latest_rate(&mut self) -> Result; -} +pub use self::fixed_rate::FixedRate; +pub use self::rate::Rate; diff --git a/swap/src/asb/fixed_rate.rs b/swap/src/asb/fixed_rate.rs index 1bf56364..b78da380 100644 --- a/swap/src/asb/fixed_rate.rs +++ b/swap/src/asb/fixed_rate.rs @@ -1,23 +1,20 @@ -use crate::asb::{LatestRate, Rate}; -use std::convert::Infallible; +use crate::asb::Rate; -pub const RATE: f64 = 0.01; +#[derive(Clone, Debug)] +pub struct FixedRate(Rate); -#[derive(Clone)] -pub struct RateService(Rate); +impl FixedRate { + pub const RATE: f64 = 0.01; -impl LatestRate for RateService { - type Error = Infallible; - - fn latest_rate(&mut self) -> Result { - Ok(self.0) + pub fn value(&self) -> Rate { + self.0 } } -impl Default for RateService { +impl Default for FixedRate { fn default() -> Self { Self(Rate { - ask: bitcoin::Amount::from_btc(RATE).expect("Static value should never fail"), + ask: bitcoin::Amount::from_btc(Self::RATE).expect("Static value should never fail"), }) } } diff --git a/swap/src/asb/kraken.rs b/swap/src/asb/kraken.rs deleted file mode 100644 index b2460956..00000000 --- a/swap/src/asb/kraken.rs +++ /dev/null @@ -1,25 +0,0 @@ -use crate::asb::{LatestRate, Rate}; -use crate::kraken; -use anyhow::Result; -use tokio::sync::watch::Receiver; - -#[derive(Clone)] -pub struct RateService { - receiver: Receiver>, -} - -impl LatestRate for RateService { - type Error = kraken::Error; - - fn latest_rate(&mut self) -> Result { - (*self.receiver.borrow()).clone() - } -} - -impl RateService { - pub async fn new() -> Result { - Ok(Self { - receiver: kraken::connect().await?, - }) - } -} diff --git a/swap/src/asb/amounts.rs b/swap/src/asb/rate.rs similarity index 100% rename from swap/src/asb/amounts.rs rename to swap/src/asb/rate.rs diff --git a/swap/src/bin/asb.rs b/swap/src/bin/asb.rs index 8ffd3a93..94f144eb 100644 --- a/swap/src/bin/asb.rs +++ b/swap/src/bin/asb.rs @@ -23,7 +23,6 @@ use swap::asb::command::{Arguments, Command}; use swap::asb::config::{ initial_setup, query_user_for_initial_testnet_config, read_config, Config, ConfigNotInitialized, }; -use swap::asb::kraken; use swap::database::Database; use swap::execution_params::GetExecutionParams; use swap::fs::default_config_path; @@ -31,7 +30,7 @@ use swap::monero::Amount; use swap::protocol::alice::EventLoop; use swap::seed::Seed; use swap::trace::init_tracing; -use swap::{bitcoin, execution_params, monero}; +use swap::{bitcoin, execution_params, kraken, monero}; use tracing::{info, warn}; use tracing_subscriber::filter::LevelFilter; @@ -93,7 +92,7 @@ async fn main() -> Result<()> { bitcoin_wallet.new_address().await? ); - let rate_service = kraken::RateService::new().await?; + let kraken_rate_updates = kraken::connect().await?; let (event_loop, _) = EventLoop::new( config.network.listen, @@ -102,7 +101,7 @@ async fn main() -> Result<()> { Arc::new(bitcoin_wallet), Arc::new(monero_wallet), Arc::new(db), - rate_service, + kraken_rate_updates, max_buy, ) .unwrap(); diff --git a/swap/src/bin/kraken_ticker.rs b/swap/src/bin/kraken_ticker.rs index ea04779b..2dca382a 100644 --- a/swap/src/bin/kraken_ticker.rs +++ b/swap/src/bin/kraken_ticker.rs @@ -11,9 +11,7 @@ async fn main() -> Result<()> { .context("Failed to connect to kraken")?; loop { - ticker.changed().await?; - - match &*ticker.borrow() { + match ticker.wait_for_update().await? { Ok(rate) => println!("Rate update: {}", rate), Err(e) => println!("Error: {:#}", e), } diff --git a/swap/src/kraken.rs b/swap/src/kraken.rs index ad66d2c6..e15c2f9f 100644 --- a/swap/src/kraken.rs +++ b/swap/src/kraken.rs @@ -10,7 +10,7 @@ use tokio::sync::watch; use tokio_tungstenite::tungstenite; use tracing::{error, trace}; -pub async fn connect() -> Result>> { +pub async fn connect() -> Result { let (rate_update, rate_update_receiver) = watch::channel(Err(Error::NotYetRetrieved)); let (rate_stream, _response) = @@ -88,7 +88,26 @@ pub async fn connect() -> Result>> { .send(SUBSCRIBE_XMR_BTC_TICKER_PAYLOAD.into()) .await?; - Ok(rate_update_receiver) + Ok(RateUpdateStream { + inner: rate_update_receiver, + }) +} + +#[derive(Clone, Debug)] +pub struct RateUpdateStream { + inner: watch::Receiver>, +} + +impl RateUpdateStream { + pub async fn wait_for_update(&mut self) -> Result> { + self.inner.changed().await?; + + Ok(self.inner.borrow().clone()) + } + + pub fn latest_update(&mut self) -> Result { + self.inner.borrow().clone() + } } const KRAKEN_WS_URL: &str = "wss://ws.kraken.com"; diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index 7df924ff..6838f914 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -1,4 +1,4 @@ -use crate::asb::LatestRate; +use crate::asb::{FixedRate, Rate}; use crate::database::Database; use crate::execution_params::ExecutionParams; use crate::monero::BalanceTooLow; @@ -8,13 +8,14 @@ use crate::protocol::alice; use crate::protocol::alice::{AliceState, Behaviour, OutEvent, State3, Swap, TransferProof}; use crate::protocol::bob::EncryptedSignature; use crate::seed::Seed; -use crate::{bitcoin, monero}; +use crate::{bitcoin, kraken, monero}; use anyhow::{bail, Context, Result}; use futures::future::RemoteHandle; use libp2p::core::Multiaddr; use libp2p::futures::FutureExt; use libp2p::{PeerId, Swarm}; use rand::rngs::OsRng; +use std::convert::Infallible; use std::sync::Arc; use tokio::sync::mpsc::error::SendError; use tokio::sync::{broadcast, mpsc}; @@ -80,7 +81,7 @@ pub struct EventLoop { bitcoin_wallet: Arc, monero_wallet: Arc, db: Arc, - rate_service: RS, + latest_rate: RS, max_buy: bitcoin::Amount, recv_encrypted_signature: broadcast::Sender, @@ -92,9 +93,31 @@ pub struct EventLoop { swap_handle_sender: mpsc::Sender>>, } -impl EventLoop +pub trait LatestRate { + type Error: std::error::Error + Send + Sync + 'static; + + fn latest_rate(&mut self) -> Result; +} + +impl LatestRate for FixedRate { + type Error = Infallible; + + fn latest_rate(&mut self) -> Result { + Ok(self.value()) + } +} + +impl LatestRate for kraken::RateUpdateStream { + type Error = kraken::Error; + + fn latest_rate(&mut self) -> Result { + self.latest_update() + } +} + +impl EventLoop where - RS: LatestRate, + LR: LatestRate, { #[allow(clippy::too_many_arguments)] pub fn new( @@ -104,7 +127,7 @@ where bitcoin_wallet: Arc, monero_wallet: Arc, db: Arc, - rate_service: RS, + latest_rate: LR, max_buy: bitcoin::Amount, ) -> Result<(Self, mpsc::Receiver>>)> { let identity = seed.derive_libp2p_identity(); @@ -132,7 +155,7 @@ where bitcoin_wallet, monero_wallet, db, - rate_service, + latest_rate, recv_encrypted_signature: recv_encrypted_signature.sender, send_transfer_proof: send_transfer_proof.receiver, send_transfer_proof_sender: send_transfer_proof.sender, @@ -239,7 +262,7 @@ where monero_wallet: Arc, ) -> Result { let rate = self - .rate_service + .latest_rate .latest_rate() .context("Failed to get latest rate")?; @@ -265,7 +288,7 @@ where async fn make_quote(&mut self, max_buy: bitcoin::Amount) -> Result { let rate = self - .rate_service + .latest_rate .latest_rate() .context("Failed to get latest rate")?; diff --git a/swap/tests/testutils/mod.rs b/swap/tests/testutils/mod.rs index ad16dbb8..c1b3d3bc 100644 --- a/swap/tests/testutils/mod.rs +++ b/swap/tests/testutils/mod.rs @@ -14,8 +14,7 @@ use std::convert::Infallible; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; -use swap::asb::fixed_rate; -use swap::asb::fixed_rate::RATE; +use swap::asb::FixedRate; use swap::bitcoin::{CancelTimelock, PunishTimelock}; use swap::database::Database; use swap::execution_params::{ExecutionParams, GetExecutionParams}; @@ -344,7 +343,7 @@ where let (monero, containers) = testutils::init_containers(&cli).await; let btc_amount = bitcoin::Amount::from_sat(1_000_000); - let xmr_amount = monero::Amount::from_monero(btc_amount.as_btc() / RATE).unwrap(); + let xmr_amount = monero::Amount::from_monero(btc_amount.as_btc() / FixedRate::RATE).unwrap(); let alice_starting_balances = StartingBalances { xmr: xmr_amount * 10, @@ -410,7 +409,7 @@ where alice_bitcoin_wallet.clone(), alice_monero_wallet.clone(), alice_db, - fixed_rate::RateService::default(), + FixedRate::default(), bitcoin::Amount::ONE_BTC, ) .unwrap();