From 644f4c1732fd260225d51d52efbead2b0b15c344 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Thu, 18 Feb 2021 12:27:15 +1100 Subject: [PATCH] Bubble up ws error to consumer Note that because we are using `watch` channel, only a reference to the channel value can be returned. Hence, using custom Error that can be cloned to be able to pass `Result` through the channel. --- swap/src/asb.rs | 4 +- swap/src/asb/fixed_rate.rs | 6 +- swap/src/asb/kraken.rs | 83 ++++++++++++++++++++------- swap/src/protocol/alice/event_loop.rs | 11 +++- 4 files changed, 76 insertions(+), 28 deletions(-) diff --git a/swap/src/asb.rs b/swap/src/asb.rs index d788b27a..2fda0610 100644 --- a/swap/src/asb.rs +++ b/swap/src/asb.rs @@ -8,5 +8,7 @@ mod amounts; pub use amounts::Rate; pub trait LatestRate { - fn latest_rate(&mut self) -> Rate; + type Error: std::fmt::Debug; + + fn latest_rate(&mut self) -> Result; } diff --git a/swap/src/asb/fixed_rate.rs b/swap/src/asb/fixed_rate.rs index 2dc28a2c..31ce83bf 100644 --- a/swap/src/asb/fixed_rate.rs +++ b/swap/src/asb/fixed_rate.rs @@ -6,8 +6,10 @@ pub const RATE: f64 = 0.01; pub struct RateService(Rate); impl LatestRate for RateService { - fn latest_rate(&mut self) -> Rate { - self.0 + type Error = anyhow::Error; + + fn latest_rate(&mut self) -> anyhow::Result { + Ok(self.0) } } diff --git a/swap/src/asb/kraken.rs b/swap/src/asb/kraken.rs index 35da8e29..6113fabb 100644 --- a/swap/src/asb/kraken.rs +++ b/swap/src/asb/kraken.rs @@ -1,5 +1,5 @@ use crate::asb::{LatestRate, Rate}; -use anyhow::{anyhow, bail, Result}; +use bitcoin::util::amount::ParseAmountError; use futures::{SinkExt, StreamExt}; use reqwest::Url; use serde::{Deserialize, Serialize}; @@ -20,49 +20,89 @@ const SUBSCRIBE_XMR_BTC_TICKER_PAYLOAD: &str = r#" #[derive(Clone)] pub struct RateService { - receiver: Receiver, + receiver: Receiver>, } impl LatestRate for RateService { - fn latest_rate(&mut self) -> Rate { - *self.receiver.borrow() + type Error = Error; + + fn latest_rate(&mut self) -> Result { + (*self.receiver.borrow()).clone() + } +} + +#[derive(Clone, Debug, thiserror::Error)] +pub enum Error { + #[error("Rate has not yet been retrieved from Kraken websocket API")] + NotYetRetrieved, + #[error("Message is not text")] + NonTextMessage, + #[error("Websocket: ")] + WebSocket(String), + #[error("Serde: ")] + Serde(String), + #[error("Data field is missing")] + DataFieldMissing, + #[error("Ask Rate Element is of unexpected type")] + UnexpectedAskRateElementType, + #[error("Ask Rate Element is missing")] + MissingAskRateElementType, + #[error("Bitcoin amount parse error: ")] + BitcoinParseAmount(#[from] ParseAmountError), +} + +impl From for Error { + fn from(err: tokio_tungstenite::tungstenite::Error) -> Self { + Error::WebSocket(format!("{:#}", err)) + } +} + +impl From for Error { + fn from(err: serde_json::Error) -> Self { + Error::Serde(format!("{:#}", err)) } } impl RateService { - pub async fn new() -> Result { - let (tx, rx) = watch::channel(Rate::ZERO); + pub async fn new() -> anyhow::Result { + let (tx, rx) = watch::channel(Err(Error::NotYetRetrieved)); let (ws, _response) = tokio_tungstenite::connect_async(Url::parse(KRAKEN_WS_URL).expect("valid url")).await?; let (mut write, mut read) = ws.split(); - // TODO: Handle the possibility of losing the connection - // to the Kraken WS. Currently the stream would produce no - // further items, and consumers would assume that the rate - // is up to date tokio::spawn(async move { while let Some(msg) = read.next().await { let msg = match msg { Ok(Message::Text(msg)) => msg, - _ => continue, + Ok(_) => { + let _ = tx.send(Err(Error::NonTextMessage)); + continue; + } + Err(e) => { + let _ = tx.send(Err(e.into())); + continue; + } }; let ticker = match serde_json::from_str::(&msg) { Ok(ticker) => ticker, - _ => continue, + Err(e) => { + let _ = tx.send(Err(e.into())); + continue; + } }; let rate = match Rate::try_from(ticker) { Ok(rate) => rate, Err(e) => { - log::error!("could not get rate from ticker update: {}", e); + let _ = tx.send(Err(e)); continue; } }; - let _ = tx.send(rate); + let _ = tx.send(Ok(rate)); } }); @@ -99,9 +139,9 @@ enum RateElement { } impl TryFrom for Rate { - type Error = anyhow::Error; + type Error = Error; - fn try_from(value: TickerUpdate) -> Result { + fn try_from(value: TickerUpdate) -> Result { let data = value .0 .iter() @@ -109,14 +149,14 @@ impl TryFrom for Rate { TickerField::Data(data) => Some(data), TickerField::Metadata(_) => None, }) - .ok_or_else(|| anyhow!("ticker update does not contain data"))?; - - let ask = data.ask.first().ok_or_else(|| anyhow!("no ask price"))?; + .ok_or(Error::DataFieldMissing)?; + // TODO: Ensure whether heartbeats returned by the api are being filtered. + let ask = data.ask.first().ok_or(Error::MissingAskRateElementType)?; let ask = match ask { RateElement::Text(ask) => { bitcoin::Amount::from_str_in(ask, ::bitcoin::Denomination::Bitcoin)? } - _ => bail!("unexpected ask rate element"), + _ => return Err(Error::UnexpectedAskRateElementType), }; Ok(Self { ask }) @@ -129,8 +169,7 @@ mod tests { #[tokio::test] async fn deserialize_ticker_update() { - let sample_response = r#" -[2308,{"a":["18215.60000",0,"0.27454523"],"b":["18197.50000",0,"0.63711255"],"c":["18197.50000","0.00413060"],"v":["2.78915585","156.15766485"],"p":["18200.94036","18275.19149"],"t":[22,1561],"l":["18162.40000","17944.90000"],"h":["18220.90000","18482.60000"],"o":["18220.90000","18478.90000"]},"ticker","XBT/USDT"]"#; + let sample_response = r#"[980,{"a":["0.00521900",4,"4.84775132"],"b":["0.00520600",70,"70.35668921"],"c":["0.00520700","0.00000186"],"v":["18530.40510860","18531.94887860"],"p":["0.00489493","0.00489490"],"t":[5017,5018],"l":["0.00448300","0.00448300"],"h":["0.00525000","0.00525000"],"o":["0.00450000","0.00451000"]},"ticker","XMR/XBT"]"#; let _ = serde_json::from_str::(sample_response).unwrap(); } diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index 0e836451..2cc498bf 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -14,7 +14,7 @@ use crate::{ }, seed::Seed, }; -use anyhow::{Context, Result}; +use anyhow::{anyhow, Context, Result}; use futures::future::RemoteHandle; use libp2p::{ core::Multiaddr, futures::FutureExt, request_response::ResponseChannel, PeerId, Swarm, @@ -164,7 +164,9 @@ where debug!("Connection Established with {}", alice); } OutEvent::QuoteRequest { msg, channel, bob_peer_id } => { - let _ = self.handle_quote_request(msg, channel, bob_peer_id).await; + if let Err(error) = self.handle_quote_request(msg, channel, bob_peer_id).await { + error!("Failed to handle quote request: {:#}", error); + } } OutEvent::ExecutionSetupDone{bob_peer_id, state3} => { let _ = self.handle_execution_setup_done(bob_peer_id, *state3).await; @@ -203,7 +205,10 @@ where // 1. Check if acceptable request // 2. Send response - let rate = self.rate_service.latest_rate(); + let rate = self + .rate_service + .latest_rate() + .map_err(|e| anyhow!("Failed to get latest rate: {:?}", e))?; let btc_amount = quote_request.btc_amount; let xmr_amount = rate.sell_quote(btc_amount)?;