@ -1,5 +1,5 @@
use crate ::asb ::{ LatestRate , Rate } ;
use anyhow::{ anyhow , bail , Result } ;
use bitcoin::util ::amount ::ParseAmountError ;
use futures ::{ SinkExt , StreamExt } ;
use reqwest ::Url ;
use serde ::{ Deserialize , Serialize } ;
@ -20,49 +20,89 @@ const SUBSCRIBE_XMR_BTC_TICKER_PAYLOAD: &str = r#"
#[ derive(Clone) ]
pub struct RateService {
receiver : Receiver < Rate > ,
receiver : Receiver < Result < Rate , Error > > ,
}
impl LatestRate for RateService {
fn latest_rate ( & mut self ) -> Rate {
* self . receiver . borrow ( )
type Error = Error ;
fn latest_rate ( & mut self ) -> Result < Rate , Self ::Error > {
( * self . receiver . borrow ( ) ) . clone ( )
}
}
#[ derive(Clone, Debug, thiserror::Error) ]
pub enum Error {
#[ error( " Rate has not yet been retrieved from Kraken websocket API " ) ]
NotYetRetrieved ,
#[ error( " Message is not text " ) ]
NonTextMessage ,
#[ error( " Websocket: " ) ]
WebSocket ( String ) ,
#[ error( " Serde: " ) ]
Serde ( String ) ,
#[ error( " Data field is missing " ) ]
DataFieldMissing ,
#[ error( " Ask Rate Element is of unexpected type " ) ]
UnexpectedAskRateElementType ,
#[ error( " Ask Rate Element is missing " ) ]
MissingAskRateElementType ,
#[ error( " Bitcoin amount parse error: " ) ]
BitcoinParseAmount ( #[ from ] ParseAmountError ) ,
}
impl From < tokio_tungstenite ::tungstenite ::Error > for Error {
fn from ( err : tokio_tungstenite ::tungstenite ::Error ) -> Self {
Error ::WebSocket ( format! ( "{:#}" , err ) )
}
}
impl From < serde_json ::Error > for Error {
fn from ( err : serde_json ::Error ) -> Self {
Error ::Serde ( format! ( "{:#}" , err ) )
}
}
impl RateService {
pub async fn new ( ) -> Result < Self > {
let ( tx , rx ) = watch ::channel ( Rate ::ZERO ) ;
pub async fn new ( ) -> anyhow ::Result < Self > {
let ( tx , rx ) = watch ::channel ( Err ( Error ::NotYetRetrieved ) ) ;
let ( ws , _response ) =
tokio_tungstenite ::connect_async ( Url ::parse ( KRAKEN_WS_URL ) . expect ( "valid url" ) ) . await ? ;
let ( mut write , mut read ) = ws . split ( ) ;
// TODO: Handle the possibility of losing the connection
// to the Kraken WS. Currently the stream would produce no
// further items, and consumers would assume that the rate
// is up to date
tokio ::spawn ( async move {
while let Some ( msg ) = read . next ( ) . await {
let msg = match msg {
Ok ( Message ::Text ( msg ) ) = > msg ,
_ = > continue ,
Ok ( _ ) = > {
let _ = tx . send ( Err ( Error ::NonTextMessage ) ) ;
continue ;
}
Err ( e ) = > {
let _ = tx . send ( Err ( e . into ( ) ) ) ;
continue ;
}
} ;
let ticker = match serde_json ::from_str ::< TickerUpdate > ( & msg ) {
Ok ( ticker ) = > ticker ,
_ = > continue ,
Err ( e ) = > {
let _ = tx . send ( Err ( e . into ( ) ) ) ;
continue ;
}
} ;
let rate = match Rate ::try_from ( ticker ) {
Ok ( rate ) = > rate ,
Err ( e ) = > {
log ::error ! ( "could not get rate from ticker update: {}" , e ) ;
let _ = tx . send ( Err ( e ) ) ;
continue ;
}
} ;
let _ = tx . send ( rate ) ;
let _ = tx . send ( Ok ( rate ) ) ;
}
} ) ;
@ -99,9 +139,9 @@ enum RateElement {
}
impl TryFrom < TickerUpdate > for Rate {
type Error = anyhow:: Error;
type Error = Error;
fn try_from ( value : TickerUpdate ) -> Result < Self > {
fn try_from ( value : TickerUpdate ) -> Result < Self , Error > {
let data = value
. 0
. iter ( )
@ -109,14 +149,14 @@ impl TryFrom<TickerUpdate> for Rate {
TickerField ::Data ( data ) = > Some ( data ) ,
TickerField ::Metadata ( _ ) = > None ,
} )
. ok_or _else( | | anyhow ! ( "ticker update does not contain data" ) ) ? ;
let ask = data . ask . first ( ) . ok_or _else( | | anyhow ! ( "no ask price" ) ) ? ;
. ok_or ( Error ::DataFieldMissing ) ? ;
// TODO: Ensure whether heartbeats returned by the api are being filtered.
let ask = data . ask . first ( ) . ok_or ( Error ::MissingAskRateElementType ) ? ;
let ask = match ask {
RateElement ::Text ( ask ) = > {
bitcoin ::Amount ::from_str_in ( ask , ::bitcoin ::Denomination ::Bitcoin ) ?
}
_ = > bail ! ( "unexpected ask rate element" ) ,
_ = > return Err ( Error ::UnexpectedAskRateElementType ) ,
} ;
Ok ( Self { ask } )
@ -129,8 +169,7 @@ mod tests {
#[ tokio::test ]
async fn deserialize_ticker_update ( ) {
let sample_response = r #"
[ 2308 , { "a" :[ "18215.60000" , 0 , "0.27454523" ] , "b" :[ "18197.50000" , 0 , "0.63711255" ] , "c" :[ "18197.50000" , "0.00413060" ] , "v" :[ "2.78915585" , "156.15766485" ] , "p" :[ "18200.94036" , "18275.19149" ] , "t" :[ 22 , 1561 ] , "l" :[ "18162.40000" , "17944.90000" ] , "h" :[ "18220.90000" , "18482.60000" ] , "o" :[ "18220.90000" , "18478.90000" ] } , "ticker" , "XBT/USDT" ] " #;
let sample_response = r#"[980,{"a":["0.00521900",4,"4.84775132"],"b":["0.00520600",70,"70.35668921"],"c":["0.00520700","0.00000186"],"v":["18530.40510860","18531.94887860"],"p":["0.00489493","0.00489490"],"t":[5017,5018],"l":["0.00448300","0.00448300"],"h":["0.00525000","0.00525000"],"o":["0.00450000","0.00451000"]},"ticker","XMR/XBT"]"# ;
let _ = serde_json ::from_str ::< TickerUpdate > ( sample_response ) . unwrap ( ) ;
}