|
|
|
@ -31,7 +31,7 @@ pub fn connect() -> Result<RateUpdateStream> {
|
|
|
|
|
let mut stream = connection::new().await?;
|
|
|
|
|
|
|
|
|
|
while let Some(update) = stream.try_next().await.map_err(to_backoff)? {
|
|
|
|
|
let send_result = rate_update.send(Ok(update));
|
|
|
|
|
let send_result = rate_update.send(Ok(Rate::new(update.ask)));
|
|
|
|
|
|
|
|
|
|
if send_result.is_err() {
|
|
|
|
|
return Err(backoff::Error::Permanent(anyhow!(
|
|
|
|
@ -120,7 +120,7 @@ mod connection {
|
|
|
|
|
use futures::stream::BoxStream;
|
|
|
|
|
use tokio_tungstenite::tungstenite;
|
|
|
|
|
|
|
|
|
|
pub async fn new() -> Result<BoxStream<'static, Result<Rate, Error>>> {
|
|
|
|
|
pub async fn new() -> Result<BoxStream<'static, Result<wire::PriceUpdate, Error>>> {
|
|
|
|
|
let (mut rate_stream, _) = tokio_tungstenite::connect_async("wss://ws.kraken.com")
|
|
|
|
|
.await
|
|
|
|
|
.context("Failed to connect to Kraken websocket API")?;
|
|
|
|
@ -134,12 +134,12 @@ mod connection {
|
|
|
|
|
Ok(stream)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Parse a websocket message into a [`Rate`].
|
|
|
|
|
/// Parse a websocket message into a [`wire::PriceUpdate`].
|
|
|
|
|
///
|
|
|
|
|
/// Messages which are not actually ticker updates are ignored and result in
|
|
|
|
|
/// `None` being returned. In the context of a [`TryStream`], these will
|
|
|
|
|
/// simply be filtered out.
|
|
|
|
|
async fn parse_message(msg: tungstenite::Message) -> Result<Option<Rate>, Error> {
|
|
|
|
|
async fn parse_message(msg: tungstenite::Message) -> Result<Option<wire::PriceUpdate>, Error> {
|
|
|
|
|
let msg = match msg {
|
|
|
|
|
tungstenite::Message::Text(msg) => msg,
|
|
|
|
|
tungstenite::Message::Close(close_frame) => {
|
|
|
|
@ -182,7 +182,7 @@ mod connection {
|
|
|
|
|
return Ok(None);
|
|
|
|
|
}
|
|
|
|
|
// if the message is not an event, it is a ticker update or an unknown event
|
|
|
|
|
Err(_) => match serde_json::from_str::<wire::TickerUpdate>(&msg) {
|
|
|
|
|
Err(_) => match serde_json::from_str::<wire::PriceUpdate>(&msg) {
|
|
|
|
|
Ok(ticker) => ticker,
|
|
|
|
|
Err(e) => {
|
|
|
|
|
tracing::warn!(%e, "Failed to deserialize message '{}' as ticker update", msg);
|
|
|
|
@ -192,8 +192,6 @@ mod connection {
|
|
|
|
|
},
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let update = Rate::try_from(update)?;
|
|
|
|
|
|
|
|
|
|
Ok(Some(update))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -247,6 +245,13 @@ mod wire {
|
|
|
|
|
BitcoinParseAmount(#[from] ParseAmountError),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Represents an update within the price ticker.
|
|
|
|
|
#[derive(Debug, Deserialize)]
|
|
|
|
|
#[serde(try_from = "TickerUpdate")]
|
|
|
|
|
pub struct PriceUpdate {
|
|
|
|
|
pub ask: bitcoin::Amount,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, Deserialize)]
|
|
|
|
|
#[serde(transparent)]
|
|
|
|
|
pub struct TickerUpdate(Vec<TickerField>);
|
|
|
|
@ -273,7 +278,7 @@ mod wire {
|
|
|
|
|
Number(u64),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl TryFrom<TickerUpdate> for Rate {
|
|
|
|
|
impl TryFrom<TickerUpdate> for PriceUpdate {
|
|
|
|
|
type Error = Error;
|
|
|
|
|
|
|
|
|
|
fn try_from(value: TickerUpdate) -> Result<Self, Error> {
|
|
|
|
@ -293,7 +298,7 @@ mod wire {
|
|
|
|
|
_ => return Err(Error::UnexpectedAskRateElementType),
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
Ok(Self { ask })
|
|
|
|
|
Ok(PriceUpdate { ask })
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|