Simplify error handling

swap-setup-proto
Thomas Eizinger 3 years ago
parent f9c03fa335
commit 63cfcf22e0
No known key found for this signature in database
GPG Key ID: 651AC83A6C6C8B96

46
Cargo.lock generated

@ -294,6 +294,12 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d73a8ae8ce52d09395e4cafc83b5b81c3deb70a97740e907669c8683c4dd50a" checksum = "4d73a8ae8ce52d09395e4cafc83b5b81c3deb70a97740e907669c8683c4dd50a"
[[package]]
name = "bimap"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50ae17cabbc8a38a1e3e4c1a6a664e9a09672dc14d0896fa8d865d3a5a446b07"
[[package]] [[package]]
name = "bincode" name = "bincode"
version = "1.3.1" version = "1.3.1"
@ -1832,9 +1838,8 @@ dependencies = [
[[package]] [[package]]
name = "libp2p-dns" name = "libp2p-dns"
version = "0.28.1" version = "0.29.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "git+https://github.com/comit-network/rust-libp2p.git?rev=96002105b0a7019330413826a332b3a12a7e8a57#96002105b0a7019330413826a332b3a12a7e8a57"
checksum = "62e63dab8b5ff35e0c101a3e51e843ba782c07bbb1682f5fd827622e0d02b98b"
dependencies = [ dependencies = [
"futures", "futures",
"libp2p-core", "libp2p-core",
@ -1860,9 +1865,8 @@ dependencies = [
[[package]] [[package]]
name = "libp2p-mplex" name = "libp2p-mplex"
version = "0.28.0" version = "0.29.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "git+https://github.com/comit-network/rust-libp2p.git?rev=96002105b0a7019330413826a332b3a12a7e8a57#96002105b0a7019330413826a332b3a12a7e8a57"
checksum = "85e9b544335d1ed30af71daa96edbefadef6f19c7a55f078b9fc92c87163105d"
dependencies = [ dependencies = [
"asynchronous-codec", "asynchronous-codec",
"bytes 1.0.1", "bytes 1.0.1",
@ -1878,9 +1882,8 @@ dependencies = [
[[package]] [[package]]
name = "libp2p-noise" name = "libp2p-noise"
version = "0.31.0" version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "git+https://github.com/comit-network/rust-libp2p.git?rev=96002105b0a7019330413826a332b3a12a7e8a57#96002105b0a7019330413826a332b3a12a7e8a57"
checksum = "57a2aa6fc4e6855eaf9ea1941a14f7ec4df35636fb6b85951e17481df8dcecf6"
dependencies = [ dependencies = [
"bytes 1.0.1", "bytes 1.0.1",
"curve25519-dalek", "curve25519-dalek",
@ -1900,9 +1903,8 @@ dependencies = [
[[package]] [[package]]
name = "libp2p-ping" name = "libp2p-ping"
version = "0.29.0" version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "git+https://github.com/comit-network/rust-libp2p.git?rev=96002105b0a7019330413826a332b3a12a7e8a57#96002105b0a7019330413826a332b3a12a7e8a57"
checksum = "bf4bfaffac63bf3c7ec11ed9d8879d455966ddea7e78ee14737f0b6dce0d1cd1"
dependencies = [ dependencies = [
"futures", "futures",
"libp2p-core", "libp2p-core",
@ -1937,9 +1939,8 @@ dependencies = [
[[package]] [[package]]
name = "libp2p-request-response" name = "libp2p-request-response"
version = "0.11.0" version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "git+https://github.com/comit-network/rust-libp2p.git?rev=96002105b0a7019330413826a332b3a12a7e8a57#96002105b0a7019330413826a332b3a12a7e8a57"
checksum = "1cdbe172f08e6d0f95fa8634e273d4c4268c4063de2e33e7435194b0130c62e3"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"bytes 1.0.1", "bytes 1.0.1",
@ -1981,9 +1982,8 @@ dependencies = [
[[package]] [[package]]
name = "libp2p-tcp" name = "libp2p-tcp"
version = "0.28.0" version = "0.29.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "git+https://github.com/comit-network/rust-libp2p.git?rev=96002105b0a7019330413826a332b3a12a7e8a57#96002105b0a7019330413826a332b3a12a7e8a57"
checksum = "2b1a27d21c477951799e99d5c105d78868258502ce092988040a808d5a19bbd9"
dependencies = [ dependencies = [
"futures", "futures",
"futures-timer", "futures-timer",
@ -1998,9 +1998,8 @@ dependencies = [
[[package]] [[package]]
name = "libp2p-websocket" name = "libp2p-websocket"
version = "0.29.0" version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "git+https://github.com/comit-network/rust-libp2p.git?rev=96002105b0a7019330413826a332b3a12a7e8a57#96002105b0a7019330413826a332b3a12a7e8a57"
checksum = "cace60995ef6f637e4752cccbb2590f6bc358e8741a0d066307636c69a4b3a74"
dependencies = [ dependencies = [
"either", "either",
"futures", "futures",
@ -2016,9 +2015,8 @@ dependencies = [
[[package]] [[package]]
name = "libp2p-yamux" name = "libp2p-yamux"
version = "0.32.0" version = "0.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "git+https://github.com/comit-network/rust-libp2p.git?rev=96002105b0a7019330413826a332b3a12a7e8a57#96002105b0a7019330413826a332b3a12a7e8a57"
checksum = "f35da42cfc6d5cb0dcf3ad6881bc68d146cdf38f98655e09e33fbba4d13eabc4"
dependencies = [ dependencies = [
"futures", "futures",
"libp2p-core", "libp2p-core",

@ -29,8 +29,8 @@ ecdsa_fun = { git = "https://github.com/LLFourn/secp256kfun", default-features =
ed25519-dalek = "1" ed25519-dalek = "1"
futures = { version = "0.3", default-features = false } futures = { version = "0.3", default-features = false }
itertools = "0.10" itertools = "0.10"
libp2p = { git = "https://github.com/comit-network/rust-libp2p.git", rev = "96002105b0a7019330413826a332b3a12a7e8a57", default-features = false, features = ["rendezvous", "request-response", "websocket", "ping", "tcp-tokio", "yamux", "mplex", "dns-tokio", "noise", "identify"] } libp2p = { git = "https://github.com/comit-network/rust-libp2p.git", rev = "96002105b0a7019330413826a332b3a12a7e8a57", default-features = false, features = [ "rendezvous", "request-response", "websocket", "ping", "tcp-tokio", "yamux", "mplex", "dns-tokio", "noise", "identify" ] }
miniscript = { version = "5", features = ["serde"] } miniscript = { version = "5", features = [ "serde" ] }
monero = { version = "0.12", features = [ "serde_support" ] } monero = { version = "0.12", features = [ "serde_support" ] }
monero-rpc = { path = "../monero-rpc" } monero-rpc = { path = "../monero-rpc" }
pem = "0.8" pem = "0.8"

@ -1,4 +1,5 @@
use crate::monero; use crate::monero;
use anyhow::{Context, Result};
use libp2p::core::upgrade; use libp2p::core::upgrade;
use libp2p::swarm::NegotiatedSubstream; use libp2p::swarm::NegotiatedSubstream;
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
@ -82,26 +83,29 @@ pub enum SpotPriceError {
Other, Other,
} }
pub async fn read_cbor_message<T>(substream: &mut NegotiatedSubstream) -> anyhow::Result<T> pub async fn read_cbor_message<T>(substream: &mut NegotiatedSubstream) -> Result<T>
where where
T: DeserializeOwned, T: DeserializeOwned,
{ {
let bytes = upgrade::read_one(substream, BUF_SIZE).await?; let bytes = upgrade::read_one(substream, BUF_SIZE)
.await
.context("Failed to read length-prefixed message from stream")?;
let mut de = serde_cbor::Deserializer::from_slice(&bytes); let mut de = serde_cbor::Deserializer::from_slice(&bytes);
let message = T::deserialize(&mut de)?; let message =
T::deserialize(&mut de).context("Failed to deserialize bytes into message using CBOR")?;
Ok(message) Ok(message)
} }
pub async fn write_cbor_message<T>( pub async fn write_cbor_message<T>(substream: &mut NegotiatedSubstream, message: T) -> Result<()>
substream: &mut NegotiatedSubstream,
message: T,
) -> anyhow::Result<()>
where where
T: Serialize, T: Serialize,
{ {
let bytes = serde_cbor::to_vec(&message)?; let bytes =
upgrade::write_with_len_prefix(substream, &bytes).await?; serde_cbor::to_vec(&message).context("Failed to serialize message as bytes using CBOR")?;
upgrade::write_with_len_prefix(substream, &bytes)
.await
.context("Failed to write bytes as length-prefixed message")?;
Ok(()) Ok(())
} }

@ -1,9 +1,12 @@
use std::collections::VecDeque; use crate::network::swap_setup;
use std::fmt::Debug; use crate::network::swap_setup::{
use std::task::{Context, Poll}; protocol, BlockchainNetwork, SpotPriceError, SpotPriceRequest, SpotPriceResponse,
use std::time::Duration; };
use crate::protocol::alice::event_loop::LatestRate;
use anyhow::{anyhow, Context as _, Result}; use crate::protocol::alice::{State0, State3};
use crate::protocol::{alice, Message0, Message2, Message4};
use crate::{bitcoin, env, monero};
use anyhow::{anyhow, Context, Result};
use futures::future::{BoxFuture, OptionFuture}; use futures::future::{BoxFuture, OptionFuture};
use futures::FutureExt; use futures::FutureExt;
use libp2p::core::connection::ConnectionId; use libp2p::core::connection::ConnectionId;
@ -13,19 +16,13 @@ use libp2p::swarm::{
ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol,
}; };
use libp2p::{Multiaddr, PeerId}; use libp2p::{Multiaddr, PeerId};
use std::time::Instant; use std::collections::VecDeque;
use std::fmt::Debug;
use std::task::Poll;
use std::time::{Duration, Instant};
use uuid::Uuid; use uuid::Uuid;
use void::Void; use void::Void;
use crate::network::swap_setup;
use crate::network::swap_setup::{
protocol, BlockchainNetwork, SpotPriceError, SpotPriceRequest, SpotPriceResponse,
};
use crate::protocol::alice::event_loop::LatestRate;
use crate::protocol::alice::{State0, State3};
use crate::protocol::{alice, Message0, Message2, Message4};
use crate::{bitcoin, env, monero};
#[derive(Debug)] #[derive(Debug)]
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
pub enum OutEvent { pub enum OutEvent {
@ -39,7 +36,7 @@ pub enum OutEvent {
}, },
Error { Error {
peer_id: PeerId, peer_id: PeerId,
error: Error, error: anyhow::Error,
}, },
} }
@ -186,7 +183,7 @@ where
fn poll( fn poll(
&mut self, &mut self,
_cx: &mut Context<'_>, _cx: &mut std::task::Context<'_>,
_params: &mut impl PollParameters, _params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<(), Self::OutEvent>> { ) -> Poll<NetworkBehaviourAction<(), Self::OutEvent>> {
if let Some(event) = self.events.pop_front() { if let Some(event) = self.events.pop_front() {
@ -197,7 +194,7 @@ where
} }
} }
type InboundStream = BoxFuture<'static, anyhow::Result<(Uuid, alice::State3), Error>>; type InboundStream = BoxFuture<'static, Result<(Uuid, alice::State3)>>;
pub struct Handler<LR> { pub struct Handler<LR> {
inbound_stream: OptionFuture<InboundStream>, inbound_stream: OptionFuture<InboundStream>,
@ -239,7 +236,7 @@ impl<LR> Handler<LR> {
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
pub enum HandlerOutEvent { pub enum HandlerOutEvent {
Initiated(bmrng::RequestReceiver<bitcoin::Amount, WalletSnapshot>), Initiated(bmrng::RequestReceiver<bitcoin::Amount, WalletSnapshot>),
Completed(anyhow::Result<(Uuid, alice::State3), Error>), Completed(Result<(Uuid, alice::State3)>),
} }
impl<LR> ProtocolsHandler for Handler<LR> impl<LR> ProtocolsHandler for Handler<LR>
@ -278,11 +275,12 @@ where
let protocol = tokio::time::timeout(self.timeout, async move { let protocol = tokio::time::timeout(self.timeout, async move {
let request = swap_setup::read_cbor_message::<SpotPriceRequest>(&mut substream) let request = swap_setup::read_cbor_message::<SpotPriceRequest>(&mut substream)
.await .await
.map_err(Error::Io)?; .context("Failed to read spot price request")?;
let wallet_snapshot = sender let wallet_snapshot = sender
.send_receive(request.btc) .send_receive(request.btc)
.await .await
.map_err(|e| Error::WalletSnapshotFailed(anyhow!(e)))?; .context("Failed to receive wallet snapshot")?;
// wrap all of these into another future so we can `return` from all the // wrap all of these into another future so we can `return` from all the
// different blocks // different blocks
@ -334,24 +332,16 @@ where
Ok(xmr) Ok(xmr)
}; };
let xmr = match validate.await { let result = validate.await;
Ok(xmr) => {
swap_setup::write_cbor_message(&mut substream, SpotPriceResponse::Xmr(xmr))
.await
.map_err(Error::Io)?;
xmr
}
Err(e) => {
swap_setup::write_cbor_message( swap_setup::write_cbor_message(
&mut substream, &mut substream,
SpotPriceResponse::Error(e.to_error_response()), SpotPriceResponse::from_result_ref(&result),
) )
.await .await
.map_err(Error::Io)?; .context("Failed to write spot price response")?;
return Err(e);
} let xmr = result?;
};
let state0 = State0::new( let state0 = State0::new(
request.btc, request.btc,
@ -366,35 +356,32 @@ where
let message0 = swap_setup::read_cbor_message::<Message0>(&mut substream) let message0 = swap_setup::read_cbor_message::<Message0>(&mut substream)
.await .await
.context("Failed to deserialize message0") .context("Failed to read message0")?;
.map_err(Error::Io)?; let (swap_id, state1) = state0
let (swap_id, state1) = state0.receive(message0).map_err(Error::Io)?; .receive(message0)
.context("Failed to transition state0 -> state1 using message0")?;
swap_setup::write_cbor_message(&mut substream, state1.next_message()) swap_setup::write_cbor_message(&mut substream, state1.next_message())
.await .await
.map_err(Error::Io)?; .context("Failed to send message1")?;
let message2 = swap_setup::read_cbor_message::<Message2>(&mut substream) let message2 = swap_setup::read_cbor_message::<Message2>(&mut substream)
.await .await
.context("Failed to deserialize message2") .context("Failed to read message2")?;
.map_err(Error::Io)?;
let state2 = state1 let state2 = state1
.receive(message2) .receive(message2)
.context("Failed to receive Message2") .context("Failed to transition state1 -> state2 using message2")?;
.map_err(Error::Io)?;
swap_setup::write_cbor_message(&mut substream, state2.next_message()) swap_setup::write_cbor_message(&mut substream, state2.next_message())
.await .await
.map_err(Error::Io)?; .context("Failed to send message3")?;
let message4 = swap_setup::read_cbor_message::<Message4>(&mut substream) let message4 = swap_setup::read_cbor_message::<Message4>(&mut substream)
.await .await
.context("Failed to deserialize message4") .context("Failed to read message4")?;
.map_err(Error::Io)?;
let state3 = state2 let state3 = state2
.receive(message4) .receive(message4)
.context("Failed to receive Message4") .context("Failed to transition state2 -> state3 using message4")?;
.map_err(Error::Io)?;
Ok((swap_id, state3)) Ok((swap_id, state3))
}); });
@ -402,8 +389,8 @@ where
let max_seconds = self.timeout.as_secs(); let max_seconds = self.timeout.as_secs();
self.inbound_stream = OptionFuture::from(Some( self.inbound_stream = OptionFuture::from(Some(
async move { async move {
protocol.await.map_err(|_| Error::Timeout { protocol.await.with_context(|| {
seconds: max_seconds, format!("Failed to complete execution setup within {}s", max_seconds)
})? })?
} }
.boxed(), .boxed(),
@ -413,7 +400,7 @@ where
} }
fn inject_fully_negotiated_outbound(&mut self, _: Void, _: Self::OutboundOpenInfo) { fn inject_fully_negotiated_outbound(&mut self, _: Void, _: Self::OutboundOpenInfo) {
unreachable!("Alice does not support outbound in the hanlder") unreachable!("Alice does not support outbound in the handler")
} }
fn inject_event(&mut self, _: Self::InEvent) { fn inject_event(&mut self, _: Self::InEvent) {
@ -435,7 +422,7 @@ where
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
fn poll( fn poll(
&mut self, &mut self,
cx: &mut Context<'_>, cx: &mut std::task::Context<'_>,
) -> Poll< ) -> Poll<
ProtocolsHandlerEvent< ProtocolsHandlerEvent<
Self::OutboundProtocol, Self::OutboundProtocol,
@ -460,8 +447,15 @@ where
} }
} }
// TODO: Differentiate between errors that we send back and shit that happens on impl SpotPriceResponse {
// our side (IO, timeout) pub fn from_result_ref(result: &Result<monero::Amount, Error>) -> Self {
match result {
Ok(amount) => SpotPriceResponse::Xmr(*amount),
Err(error) => SpotPriceResponse::Error(error.to_error_response()),
}
}
}
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum Error { pub enum Error {
#[error("ASB is running in resume-only mode")] #[error("ASB is running in resume-only mode")]
@ -490,12 +484,6 @@ pub enum Error {
cli: BlockchainNetwork, cli: BlockchainNetwork,
asb: BlockchainNetwork, asb: BlockchainNetwork,
}, },
#[error("Io Error")]
Io(#[source] anyhow::Error),
#[error("Failed to request wallet snapshot")]
WalletSnapshotFailed(#[source] anyhow::Error),
#[error("Failed to complete execution setup within {seconds}s")]
Timeout { seconds: u64 },
} }
impl Error { impl Error {
@ -517,11 +505,9 @@ impl Error {
asb: *asb, asb: *asb,
} }
} }
Error::LatestRateFetchFailed(_) Error::LatestRateFetchFailed(_) | Error::SellQuoteCalculationFailed(_) => {
| Error::SellQuoteCalculationFailed(_) SpotPriceError::Other
| Error::WalletSnapshotFailed(_) }
| Error::Timeout { .. }
| Error::Io(_) => SpotPriceError::Other,
} }
} }
} }

Loading…
Cancel
Save