Refactor the kraken module to automatically re-connect on errors

In order to be able to re-connect on certain errors, we model
connection errors separately from parsing errors. We also change
the API of the whole module to no longer forward all errors to
the subscribers but instead, only update the subscribers with
either a latest rate or a permanent failure in case we exhausted
all our options to re-connect the websocket.

To model all of this properly, we introduce to sub-modules so that
each submodule can have their own `Error` type.

Resolves #297.
pull/301/head
Thomas Eizinger 3 years ago
parent c560b3b21a
commit 9ad2160c69
No known key found for this signature in database
GPG Key ID: 651AC83A6C6C8B96

@ -92,7 +92,7 @@ async fn main() -> Result<()> {
bitcoin_wallet.new_address().await?
);
let kraken_rate_updates = kraken::connect().await?;
let kraken_rate_updates = kraken::connect()?;
let (event_loop, _) = EventLoop::new(
config.network.listen,

@ -3,12 +3,10 @@ use anyhow::{Context, Result};
#[tokio::main]
async fn main() -> Result<()> {
tracing::subscriber::set_global_default(
tracing_subscriber::fmt().with_env_filter("trace").finish(),
tracing_subscriber::fmt().with_env_filter("debug").finish(),
)?;
let mut ticker = swap::kraken::connect()
.await
.context("Failed to connect to kraken")?;
let mut ticker = swap::kraken::connect().context("Failed to connect to kraken")?;
loop {
match ticker.wait_for_update().await? {

@ -1,189 +1,271 @@
use crate::asb::Rate;
use anyhow::{Context, Result};
use bitcoin::util::amount::ParseAmountError;
use futures::{SinkExt, StreamExt};
use anyhow::{anyhow, Context, Result};
use futures::{SinkExt, StreamExt, TryStreamExt};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::convert::TryFrom;
use std::convert::{Infallible, TryFrom};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch;
use tokio_tungstenite::tungstenite;
/// Connect to Kraken websocket API for a constant stream of rate updates.
///
/// If the connection fails, it will automatically be re-established.
pub fn connect() -> Result<RateUpdateStream> {
let (rate_update, rate_update_receiver) = watch::channel(Err(Error::NotYetAvailable));
let rate_update = Arc::new(rate_update);
tokio::spawn(async move {
let result = backoff::future::retry_notify::<Infallible, _, _, _, _, _>(
backoff::ExponentialBackoff::default(),
|| {
let rate_update = rate_update.clone();
async move {
let mut stream = connection::new().await?;
while let Some(update) = stream.try_next().await.map_err(to_backoff)? {
let send_result = rate_update.send(Ok(update));
if send_result.is_err() {
return Err(backoff::Error::Permanent(anyhow!(
"receiver disconnected"
)));
}
}
Err(backoff::Error::Transient(anyhow!("stream ended")))
}
},
|error, next: Duration| {
tracing::info!(%error, "Kraken websocket connection failed, retrying in {}ms", next.as_millis());
}
)
.await;
match result {
Err(e) => {
tracing::warn!("Rate updates incurred an unrecoverable error: {:#}", e);
// in case the retries fail permanently, let the subscribers know
rate_update.send(Err(Error::PermanentFailure))
}
Ok(never) => match never {},
}
});
Ok(RateUpdateStream {
inner: rate_update_receiver,
})
}
#[derive(Clone, Debug)]
pub struct RateUpdateStream {
inner: watch::Receiver<RateUpdate>,
}
impl RateUpdateStream {
pub async fn wait_for_update(&mut self) -> Result<RateUpdate> {
self.inner.changed().await?;
Ok(self.inner.borrow().clone())
}
pub fn latest_update(&mut self) -> RateUpdate {
self.inner.borrow().clone()
}
}
#[derive(Clone, Debug, thiserror::Error)]
pub enum Error {
#[error("Rate is not yet available")]
NotYetAvailable,
#[error("Permanently failed to retrieve rate from Kraken")]
PermanentFailure,
}
type RateUpdate = Result<Rate, Error>;
pub async fn connect() -> Result<RateUpdateStream> {
let (rate_update, rate_update_receiver) = watch::channel(Err(Error::NotYetRetrieved));
/// Maps a [`connection::Error`] to a backoff error, effectively defining our
/// retry strategy.
fn to_backoff(e: connection::Error) -> backoff::Error<anyhow::Error> {
use backoff::Error::*;
match e {
// Connection closures and websocket errors will be retried
connection::Error::ConnectionClosed => Transient(anyhow::Error::from(e)),
connection::Error::WebSocket(_) => Transient(anyhow::Error::from(e)),
// Failures while parsing a message are permanent because they most likely present a
// programmer error
connection::Error::Parse(_) => Permanent(anyhow::Error::from(e)),
}
}
/// Kraken websocket connection module.
///
/// Responsible for establishing a connection to the Kraken websocket API and
/// transforming the received websocket frames into a stream of rate updates.
/// The connection may fail in which case it is simply terminated and the stream
/// ends.
mod connection {
use super::*;
use crate::kraken::wire;
use futures::stream::BoxStream;
use tokio_tungstenite::tungstenite;
let (rate_stream, _) = tokio_tungstenite::connect_async("wss://ws.kraken.com")
pub async fn new() -> Result<BoxStream<'static, Result<Rate, Error>>> {
let (mut rate_stream, _) = tokio_tungstenite::connect_async("wss://ws.kraken.com")
.await
.context("Failed to connect to Kraken websocket API")?;
let (mut rate_stream_sink, mut rate_stream) = rate_stream.split();
rate_stream
.send(SUBSCRIBE_XMR_BTC_TICKER_PAYLOAD.into())
.await?;
tokio::spawn(async move {
while let Some(msg) = rate_stream.next().await {
let stream = rate_stream.err_into().try_filter_map(parse_message).boxed();
Ok(stream)
}
/// Parse a websocket message into a [`Rate`].
///
/// Messages which are not actually ticker updates are ignored and result in
/// `None` being returned. In the context of a [`TryStream`], these will
/// simply be filtered out.
async fn parse_message(msg: tungstenite::Message) -> Result<Option<Rate>, Error> {
let msg = match msg {
Ok(tungstenite::Message::Text(msg)) => msg,
Ok(tungstenite::Message::Close(close_frame)) => {
tungstenite::Message::Text(msg) => msg,
tungstenite::Message::Close(close_frame) => {
if let Some(tungstenite::protocol::CloseFrame { code, reason }) = close_frame {
tracing::error!(
tracing::debug!(
"Kraken rate stream was closed with code {} and reason: {}",
code,
reason
);
} else {
tracing::error!("Kraken rate stream was closed without code and reason");
tracing::debug!("Kraken rate stream was closed without code and reason");
}
let _ = rate_update.send(Err(Error::ConnectionClosed));
continue;
return Err(Error::ConnectionClosed);
}
Ok(msg) => {
msg => {
tracing::trace!(
"Kraken rate stream returned non text message that will be ignored: {}",
msg
);
continue;
}
Err(e) => {
tracing::error!(%e, "Error when reading from Kraken rate stream");
let _ = rate_update.send(Err(e.into()));
continue;
return Ok(None);
}
};
let update = match serde_json::from_str::<Event>(&msg) {
Ok(Event::SystemStatus) => {
let update = match serde_json::from_str::<wire::Event>(&msg) {
Ok(wire::Event::SystemStatus) => {
tracing::debug!("Connected to Kraken websocket API");
continue;
return Ok(None);
}
Ok(Event::SubscriptionStatus) => {
Ok(wire::Event::SubscriptionStatus) => {
tracing::debug!("Subscribed to updates for ticker");
continue;
return Ok(None);
}
Ok(Event::Heartbeat) => {
Ok(wire::Event::Heartbeat) => {
tracing::trace!("Received heartbeat message");
continue;
return Ok(None);
}
// if the message is not an event, it is a ticker update or an unknown event
Err(_) => match serde_json::from_str::<TickerUpdate>(&msg) {
Err(_) => match serde_json::from_str::<wire::TickerUpdate>(&msg) {
Ok(ticker) => ticker,
Err(e) => {
tracing::warn!(%e, "Failed to deserialize message '{}' as ticker update", msg);
let _ = rate_update.send(Err(Error::UnknownMessage { msg }));
continue;
}
},
};
let rate = match Rate::try_from(update) {
Ok(rate) => rate,
Err(e) => {
let _ = rate_update.send(Err(e));
continue;
return Ok(None);
}
},
};
let _ = rate_update.send(Ok(rate));
}
});
rate_stream_sink
.send(SUBSCRIBE_XMR_BTC_TICKER_PAYLOAD.into())
.await?;
Ok(RateUpdateStream {
inner: rate_update_receiver,
})
}
let update = Rate::try_from(update)?;
#[derive(Clone, Debug)]
pub struct RateUpdateStream {
inner: watch::Receiver<RateUpdate>,
}
impl RateUpdateStream {
pub async fn wait_for_update(&mut self) -> Result<RateUpdate> {
self.inner.changed().await?;
Ok(self.inner.borrow().clone())
Ok(Some(update))
}
pub fn latest_update(&mut self) -> RateUpdate {
self.inner.borrow().clone()
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("The Kraken server closed the websocket connection")]
ConnectionClosed,
#[error("Failed to read message from websocket stream")]
WebSocket(#[from] tungstenite::Error),
#[error("Failed to parse rate from websocket message")]
Parse(#[from] wire::Error),
}
}
const SUBSCRIBE_XMR_BTC_TICKER_PAYLOAD: &str = r#"
{ "event": "subscribe",
const SUBSCRIBE_XMR_BTC_TICKER_PAYLOAD: &str = r#"
{ "event": "subscribe",
"pair": [ "XMR/XBT" ],
"subscription": {
"name": "ticker"
}
}"#;
#[derive(Clone, Debug, thiserror::Error)]
pub enum Error {
#[error("Rate has not yet been retrieved from Kraken websocket API")]
NotYetRetrieved,
#[error("The Kraken server closed the websocket connection")]
ConnectionClosed,
#[error("Websocket: {0}")]
WebSocket(String),
#[error("Received unknown message from Kraken: {msg}")]
UnknownMessage { msg: String },
#[error("Data field is missing")]
DataFieldMissing,
#[error("Ask Rate Element is of unexpected type")]
UnexpectedAskRateElementType,
#[error("Ask Rate Element is missing")]
MissingAskRateElementType,
#[error("Bitcoin amount parse error: ")]
BitcoinParseAmount(#[from] ParseAmountError),
}"#;
}
impl From<tungstenite::Error> for Error {
fn from(err: tungstenite::Error) -> Self {
Error::WebSocket(format!("{:#}", err))
}
}
/// Kraken websocket API wire module.
///
/// Responsible for parsing websocket text messages to events and rate updates.
mod wire {
use super::*;
use bitcoin::util::amount::ParseAmountError;
use serde_json::Value;
#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[serde(tag = "event")]
enum Event {
#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[serde(tag = "event")]
pub enum Event {
#[serde(rename = "systemStatus")]
SystemStatus,
#[serde(rename = "heartbeat")]
Heartbeat,
#[serde(rename = "subscriptionStatus")]
SubscriptionStatus,
}
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(transparent)]
struct TickerUpdate(Vec<TickerField>);
#[derive(Clone, Debug, thiserror::Error)]
pub enum Error {
#[error("Data field is missing")]
DataFieldMissing,
#[error("Ask Rate Element is of unexpected type")]
UnexpectedAskRateElementType,
#[error("Ask Rate Element is missing")]
MissingAskRateElementType,
#[error("Failed to parse Bitcoin amount")]
BitcoinParseAmount(#[from] ParseAmountError),
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(transparent)]
pub struct TickerUpdate(Vec<TickerField>);
#[derive(Debug, Serialize, Deserialize)]
#[serde(untagged)]
enum TickerField {
#[derive(Debug, Serialize, Deserialize)]
#[serde(untagged)]
pub enum TickerField {
Data(TickerData),
Metadata(Value),
}
}
#[derive(Debug, Serialize, Deserialize)]
struct TickerData {
#[derive(Debug, Serialize, Deserialize)]
pub struct TickerData {
#[serde(rename = "a")]
ask: Vec<RateElement>,
#[serde(rename = "b")]
bid: Vec<RateElement>,
}
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(untagged)]
enum RateElement {
#[derive(Debug, Serialize, Deserialize)]
#[serde(untagged)]
pub enum RateElement {
Text(String),
Number(u64),
}
}
impl TryFrom<TickerUpdate> for Rate {
impl TryFrom<TickerUpdate> for Rate {
type Error = Error;
fn try_from(value: TickerUpdate) -> Result<Self, Error> {
@ -205,10 +287,10 @@ impl TryFrom<TickerUpdate> for Rate {
Ok(Self { ask })
}
}
}
#[cfg(test)]
mod tests {
#[cfg(test)]
mod tests {
use super::*;
#[test]
@ -235,4 +317,5 @@ mod tests {
let _ = serde_json::from_str::<TickerUpdate>(message).unwrap();
}
}
}

Loading…
Cancel
Save