@ -1,7 +1,6 @@
use crate ::asb ::Rate ;
use anyhow ::{ anyhow , Context , Result } ;
use futures ::{ SinkExt , StreamExt , TryStreamExt } ;
use serde ::{ Deserialize , Serialize } ;
use serde ::Deserialize ;
use std ::convert ::{ Infallible , TryFrom } ;
use std ::sync ::Arc ;
use std ::time ::Duration ;
@ -10,9 +9,9 @@ use tokio::sync::watch;
/// Connect to Kraken websocket API for a constant stream of rate updates.
///
/// If the connection fails, it will automatically be re-established.
pub fn connect ( ) -> Result < RateUpdateStream > {
let ( rate_update, rat e_update_receiver) = watch ::channel ( Err ( Error ::NotYetAvailable ) ) ;
let rate_update = Arc ::new ( rat e_update) ;
pub fn connect ( ) -> Result < PriceUpdates > {
let ( price_update, pric e_update_receiver) = watch ::channel ( Err ( Error ::NotYetAvailable ) ) ;
let price_update = Arc ::new ( pric e_update) ;
tokio ::spawn ( async move {
// The default backoff config is fine for us apart from one thing:
@ -26,12 +25,12 @@ pub fn connect() -> Result<RateUpdateStream> {
let result = backoff ::future ::retry_notify ::< Infallible , _ , _ , _ , _ , _ > (
backoff ,
| | {
let rate_update = rat e_update. clone ( ) ;
let price_update = pric e_update. clone ( ) ;
async move {
let mut stream = connection ::new ( ) . await ? ;
while let Some ( update ) = stream . try_next ( ) . await . map_err ( to_backoff ) ? {
let send_result = rat e_update. send ( Ok ( update ) ) ;
let send_result = pric e_update. send ( Ok ( update ) ) ;
if send_result . is_err ( ) {
return Err ( backoff ::Error ::Permanent ( anyhow ! (
@ -54,30 +53,30 @@ pub fn connect() -> Result<RateUpdateStream> {
tracing ::warn ! ( "Rate updates incurred an unrecoverable error: {:#}" , e ) ;
// in case the retries fail permanently, let the subscribers know
rat e_update. send ( Err ( Error ::PermanentFailure ) )
pric e_update. send ( Err ( Error ::PermanentFailure ) )
}
Ok ( never ) = > match never { } ,
}
} ) ;
Ok ( RateUpdateStream {
inner : rat e_update_receiver,
Ok ( PriceUpdates {
inner : pric e_update_receiver,
} )
}
#[ derive(Clone, Debug) ]
pub struct RateUpdateStream {
inner : watch ::Receiver < Rat eUpdate> ,
pub struct PriceUpdates {
inner : watch ::Receiver < Pric eUpdate> ,
}
impl RateUpdateStream {
pub async fn wait_for_ update( & mut self ) -> Result < Rat eUpdate> {
impl PriceUpdates {
pub async fn wait_for_ next_ update( & mut self ) -> Result < Pric eUpdate> {
self . inner . changed ( ) . await ? ;
Ok ( self . inner . borrow ( ) . clone ( ) )
}
pub fn latest_update ( & mut self ) -> Rat eUpdate {
pub fn latest_update ( & mut self ) -> Pric eUpdate {
self . inner . borrow ( ) . clone ( )
}
}
@ -90,7 +89,7 @@ pub enum Error {
PermanentFailure ,
}
type RateUpdate = Result < R ate, Error > ;
type PriceUpdate = Result < wire ::PriceUpd ate, Error > ;
/// Maps a [`connection::Error`] to a backoff error, effectively defining our
/// retry strategy.
@ -120,7 +119,7 @@ mod connection {
use futures ::stream ::BoxStream ;
use tokio_tungstenite ::tungstenite ;
pub async fn new ( ) -> Result < BoxStream < ' static , Result < R ate, Error > > > {
pub async fn new ( ) -> Result < BoxStream < ' static , Result < wire::PriceUpd ate, Error > > > {
let ( mut rate_stream , _ ) = tokio_tungstenite ::connect_async ( "wss://ws.kraken.com" )
. await
. context ( "Failed to connect to Kraken websocket API" ) ? ;
@ -134,12 +133,12 @@ mod connection {
Ok ( stream )
}
/// Parse a websocket message into a [` R ate`].
/// Parse a websocket message into a [` wire::PriceUpd ate`].
///
/// Messages which are not actually ticker updates are ignored and result in
/// `None` being returned. In the context of a [`TryStream`], these will
/// simply be filtered out.
async fn parse_message ( msg : tungstenite ::Message ) -> Result < Option < R ate> , Error > {
async fn parse_message ( msg : tungstenite ::Message ) -> Result < Option < wire::PriceUpd ate> , Error > {
let msg = match msg {
tungstenite ::Message ::Text ( msg ) = > msg ,
tungstenite ::Message ::Close ( close_frame ) = > {
@ -182,7 +181,7 @@ mod connection {
return Ok ( None ) ;
}
// if the message is not an event, it is a ticker update or an unknown event
Err ( _ ) = > match serde_json ::from_str ::< wire ::Ticker Update> ( & msg ) {
Err ( _ ) = > match serde_json ::from_str ::< wire ::Price Update> ( & msg ) {
Ok ( ticker ) = > ticker ,
Err ( e ) = > {
tracing ::warn ! ( % e , "Failed to deserialize message '{}' as ticker update" , msg ) ;
@ -192,8 +191,6 @@ mod connection {
} ,
} ;
let update = Rate ::try_from ( update ) ? ;
Ok ( Some ( update ) )
}
@ -224,7 +221,7 @@ mod wire {
use bitcoin ::util ::amount ::ParseAmountError ;
use serde_json ::Value ;
#[ derive(Debug, Serialize, Deserialize, PartialEq)]
#[ derive(Debug, Deserialize, PartialEq)]
#[ serde(tag = " event " ) ]
pub enum Event {
#[ serde(rename = " systemStatus " ) ]
@ -247,18 +244,25 @@ mod wire {
BitcoinParseAmount ( #[ from ] ParseAmountError ) ,
}
#[ derive(Debug, Serialize, Deserialize) ]
/// Represents an update within the price ticker.
#[ derive(Clone, Debug, Deserialize) ]
#[ serde(try_from = " TickerUpdate " ) ]
pub struct PriceUpdate {
pub ask : bitcoin ::Amount ,
}
#[ derive(Debug, Deserialize) ]
#[ serde(transparent) ]
pub struct TickerUpdate ( Vec < TickerField > ) ;
#[ derive(Debug, Serialize, Deserialize) ]
#[ derive(Debug, Deserialize)]
#[ serde(untagged) ]
pub enum TickerField {
Data ( TickerData ) ,
Metadata ( Value ) ,
}
#[ derive(Debug, Serialize, Deserialize)]
#[ derive(Debug, Deserialize)]
pub struct TickerData {
#[ serde(rename = " a " ) ]
ask : Vec < RateElement > ,
@ -266,14 +270,14 @@ mod wire {
bid : Vec < RateElement > ,
}
#[ derive(Debug, Serialize, Deserialize)]
#[ derive(Debug, Deserialize)]
#[ serde(untagged) ]
pub enum RateElement {
Text ( String ) ,
Number ( u64 ) ,
}
impl TryFrom < TickerUpdate > for R ate {
impl TryFrom < TickerUpdate > for PriceUpd ate {
type Error = Error ;
fn try_from ( value : TickerUpdate ) -> Result < Self , Error > {
@ -293,7 +297,7 @@ mod wire {
_ = > return Err ( Error ::UnexpectedAskRateElementType ) ,
} ;
Ok ( Self { ask } )
Ok ( PriceUpdate { ask } )
}
}