diff --git a/swap/src/protocol/alice/swap_setup.rs b/swap/src/protocol/alice/swap_setup.rs index 8699fd09..288ff16b 100644 --- a/swap/src/protocol/alice/swap_setup.rs +++ b/swap/src/protocol/alice/swap_setup.rs @@ -2,7 +2,7 @@ use crate::protocol::alice::event_loop::LatestRate; use crate::protocol::alice::{State0, State3}; use crate::protocol::{alice, Message0, Message2, Message4}; use crate::{bitcoin, env, monero}; -use anyhow::{Context as _, Result}; +use anyhow::{anyhow, Context as _, Result}; use futures::future::{BoxFuture, OptionFuture}; use futures::FutureExt; use libp2p::core::connection::ConnectionId; @@ -28,11 +28,14 @@ pub enum OutEvent { send_wallet_snapshot: bmrng::RequestReceiver, }, Completed { - bob_peer_id: PeerId, + peer_id: PeerId, swap_id: Uuid, state3: State3, }, - Error, // TODO be more descriptive + Error { + peer_id: PeerId, + error: Error + } } #[derive(Debug)] @@ -85,7 +88,7 @@ impl From for alice::OutEvent { send_wallet_snapshot, }, OutEvent::Completed { - bob_peer_id, + peer_id: bob_peer_id, swap_id, state3, } => alice::OutEvent::SwapSetupCompleted { @@ -93,9 +96,9 @@ impl From for alice::OutEvent { swap_id, state3: Box::new(state3), }, - OutEvent::Error => alice::OutEvent::Failure { - peer: todo!(), - error: todo!(), + OutEvent::Error { peer_id, error} => alice::OutEvent::Failure { + peer: peer_id, + error: anyhow!(error), }, } } @@ -169,15 +172,41 @@ where } fn inject_event(&mut self, peer_id: PeerId, connection: ConnectionId, event: HandlerOutEvent) { - todo!("bubble up events to network behaviour") + match event { + HandlerOutEvent::Initiated(send_wallet_snapshot) => { + self.events.push_back(OutEvent::Initiated { send_wallet_snapshot }) + } + HandlerOutEvent::Completed(swap_setup_result) => { + match swap_setup_result { + Ok((swap_id, state3)) => { + self.events.push_back(OutEvent::Completed { + peer_id, + swap_id, + state3 + }) + } + Err(error) => { + self.events.push_back(OutEvent::Error { + peer_id, + error + }) + } + } + + } + } } fn poll( &mut self, - cx: &mut Context<'_>, - params: &mut impl PollParameters, + _cx: &mut Context<'_>, + _params: &mut impl PollParameters, ) -> Poll> { - todo!() + if let Some(event) = self.events.pop_front() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + } + + Poll::Pending } } @@ -231,8 +260,7 @@ pub enum SpotPriceError { // Executing(BoxFuture<'static, anyhow::Result<(Uuid, bob::State3)>>) // } -// TODO: Don't just use anyhow::Error -type InboundStream = BoxFuture<'static, anyhow::Result<(Uuid, alice::State3)>>; +type InboundStream = BoxFuture<'static, anyhow::Result<(Uuid, alice::State3), Error>>; pub struct Handler { inbound_stream: OptionFuture, @@ -268,7 +296,7 @@ impl Handler { pub enum HandlerOutEvent { Initiated(bmrng::RequestReceiver), - Completed(anyhow::Result<(Uuid, alice::State3)>), + Completed(anyhow::Result<(Uuid, alice::State3), Error>), } pub enum HandlerInEvent {} @@ -281,7 +309,7 @@ where { type InEvent = HandlerInEvent; type OutEvent = HandlerOutEvent; - type Error = Void; + type Error = Error; type InboundProtocol = protocol::SwapSetup; type OutboundProtocol = upgrade::DeniedUpgrade; type InboundOpenInfo = (); @@ -309,8 +337,8 @@ where // TODO: Put a timeout on the whole future self.inbound_stream = OptionFuture::from(Some( async move { - let request = read_cbor_message::(&mut substream).await?; - let wallet_snapshot = sender.send_receive(request.btc).await?; + let request = read_cbor_message::(&mut substream).await.map_err(|e| Error::Io(e))?; + let wallet_snapshot = sender.send_receive(request.btc).await.map_err(|e| Error::WalletSnapshotFailed(anyhow!(e)))?; // wrap all of these into another future so we can `return` from all the // different blocks @@ -365,7 +393,7 @@ where let xmr = match validate.await { Ok(xmr) => { - write_cbor_message(&mut substream, SpotPriceResponse::Xmr(xmr)).await?; + write_cbor_message(&mut substream, SpotPriceResponse::Xmr(xmr)).await.map_err(|e| Error::Io(e))?; xmr } @@ -374,7 +402,8 @@ where &mut substream, SpotPriceResponse::Error(e.to_error_response()), ) - .await?; + .await + .map_err(|e| Error::Io(e))?; return Err(e.into()); } }; @@ -392,26 +421,31 @@ where let message0 = read_cbor_message::(&mut substream) .await - .context("Failed to deserialize message0")?; - let (swap_id, state1) = state0.receive(message0)?; + .context("Failed to deserialize message0") + .map_err(|e| Error::Io(e))?; + let (swap_id, state1) = state0.receive(message0).map_err(|e| Error::Io(e))?; - write_cbor_message(&mut substream, state1.next_message()).await?; + write_cbor_message(&mut substream, state1.next_message()).await.map_err(|e| Error::Io(e))?; let message2 = read_cbor_message::(&mut substream) .await - .context("Failed to deserialize message2")?; + .context("Failed to deserialize message2") + .map_err(|e| Error::Io(e))?; let state2 = state1 .receive(message2) - .context("Failed to receive Message2")?; + .context("Failed to receive Message2") + .map_err(|e| Error::Io(e))?; - write_cbor_message(&mut substream, state2.next_message()).await?; + write_cbor_message(&mut substream, state2.next_message()).await.map_err(|e| Error::Io(e))?; let message4 = read_cbor_message::(&mut substream) .await - .context("Failed to deserialize message4")?; + .context("Failed to deserialize message4") + .map_err(|e| Error::Io(e))?; let state3 = state2 .receive(message4) - .context("Failed to receive Message4")?; + .context("Failed to receive Message4") + .map_err(|e| Error::Io(e))?; Ok((swap_id, state3)) } @@ -539,6 +573,10 @@ pub enum Error { cli: BlockchainNetwork, asb: BlockchainNetwork, }, + #[error("Io Error: {0}")] + Io(anyhow::Error), + #[error("Failed to request wallet snapshot: {0}")] + WalletSnapshotFailed(anyhow::Error) } impl Error { @@ -560,7 +598,10 @@ impl Error { asb: *asb, } } - Error::LatestRateFetchFailed(_) | Error::SellQuoteCalculationFailed(_) => { + Error::LatestRateFetchFailed(_) + | Error::SellQuoteCalculationFailed(_) + | Error::WalletSnapshotFailed(_) + | Error::Io(_) => { SpotPriceError::Other } }