321: Properly handle concurrent messages to and from peers r=thomaseizinger a=thomaseizinger

Previously, we were forwarding incoming messages from peers to all
swaps that were currently running. That is obviously wrong. The new
design scopes an `EventLoopHandle` to a specific PeerId to avoid
this problem.

Co-authored-by: Thomas Eizinger <thomas@eizinger.io>
pull/331/head
bors[bot] 3 years ago committed by GitHub
commit 2c385ee7d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -4,7 +4,6 @@ use crate::monero::monero_private_key;
use crate::protocol::alice;
use crate::protocol::alice::AliceState;
use ::bitcoin::hashes::core::fmt::Display;
use libp2p::PeerId;
use monero_rpc::wallet::BlockHeight;
use serde::{Deserialize, Serialize};
@ -15,13 +14,9 @@ use serde::{Deserialize, Serialize};
pub enum Alice {
Started {
state3: alice::State3,
#[serde(with = "crate::serde_peer_id")]
bob_peer_id: PeerId,
},
BtcLocked {
state3: alice::State3,
#[serde(with = "crate::serde_peer_id")]
bob_peer_id: PeerId,
},
XmrLocked {
monero_wallet_restore_blockheight: BlockHeight,
@ -64,19 +59,11 @@ pub enum AliceEndState {
impl From<&AliceState> for Alice {
fn from(alice_state: &AliceState) -> Self {
match alice_state {
AliceState::Started {
state3,
bob_peer_id,
} => Alice::Started {
AliceState::Started { state3 } => Alice::Started {
state3: state3.as_ref().clone(),
bob_peer_id: *bob_peer_id,
},
AliceState::BtcLocked {
state3,
bob_peer_id,
} => Alice::BtcLocked {
AliceState::BtcLocked { state3 } => Alice::BtcLocked {
state3: state3.as_ref().clone(),
bob_peer_id: *bob_peer_id,
},
AliceState::XmrLocked {
monero_wallet_restore_blockheight,
@ -137,18 +124,10 @@ impl From<&AliceState> for Alice {
impl From<Alice> for AliceState {
fn from(db_state: Alice) -> Self {
match db_state {
Alice::Started {
state3,
bob_peer_id,
} => AliceState::Started {
bob_peer_id,
Alice::Started { state3 } => AliceState::Started {
state3: Box::new(state3),
},
Alice::BtcLocked {
state3,
bob_peer_id,
} => AliceState::BtcLocked {
bob_peer_id,
Alice::BtcLocked { state3 } => AliceState::BtcLocked {
state3: Box::new(state3),
},
Alice::XmrLocked {

@ -30,4 +30,3 @@ pub mod seed;
pub mod trace;
mod monero_ext;
mod serde_peer_id;

@ -28,13 +28,17 @@ pub enum OutEvent {
bob_peer_id: PeerId,
state3: Box<State3>,
},
TransferProofAcknowledged,
TransferProofAcknowledged(PeerId),
EncryptedSignature {
msg: Box<EncryptedSignature>,
channel: ResponseChannel<()>,
peer: PeerId,
},
ResponseSent, // Same variant is used for all messages as no processing is done
Failure(Error),
Failure {
peer: PeerId,
error: Error,
},
}
impl From<peer_tracker::OutEvent> for OutEvent {
@ -61,23 +65,20 @@ impl From<spot_price::OutEvent> for OutEvent {
} => OutEvent::SpotPriceRequested { msg, channel, peer },
spot_price::OutEvent::Message {
message: RequestResponseMessage::Response { .. },
..
} => OutEvent::Failure(anyhow!(
"Alice is only meant to hand out spot prices, not receive them"
)),
peer,
} => OutEvent::Failure {
error: anyhow!("Alice is only meant to hand out spot prices, not receive them"),
peer,
},
spot_price::OutEvent::ResponseSent { .. } => OutEvent::ResponseSent,
spot_price::OutEvent::InboundFailure { peer, error, .. } => OutEvent::Failure(anyhow!(
"spot_price protocol with peer {} failed due to {:?}",
spot_price::OutEvent::InboundFailure { peer, error, .. } => OutEvent::Failure {
error: anyhow!("spot_price protocol failed due to {:?}", error),
peer,
error
)),
spot_price::OutEvent::OutboundFailure { peer, error, .. } => {
OutEvent::Failure(anyhow!(
"spot_price protocol with peer {} failed due to {:?}",
peer,
error
))
}
},
spot_price::OutEvent::OutboundFailure { peer, error, .. } => OutEvent::Failure {
error: anyhow!("spot_price protocol failed due to {:?}", error),
peer,
},
}
}
}
@ -91,21 +92,20 @@ impl From<quote::OutEvent> for OutEvent {
} => OutEvent::QuoteRequested { channel, peer },
quote::OutEvent::Message {
message: RequestResponseMessage::Response { .. },
..
} => OutEvent::Failure(anyhow!(
"Alice is only meant to hand out quotes, not receive them"
)),
peer,
} => OutEvent::Failure {
error: anyhow!("Alice is only meant to hand out quotes, not receive them"),
peer,
},
quote::OutEvent::ResponseSent { .. } => OutEvent::ResponseSent,
quote::OutEvent::InboundFailure { peer, error, .. } => OutEvent::Failure(anyhow!(
"quote protocol with peer {} failed due to {:?}",
quote::OutEvent::InboundFailure { peer, error, .. } => OutEvent::Failure {
error: anyhow!("quote protocol failed due to {:?}", error),
peer,
error
)),
quote::OutEvent::OutboundFailure { peer, error, .. } => OutEvent::Failure(anyhow!(
"quote protocol with peer {} failed due to {:?}",
},
quote::OutEvent::OutboundFailure { peer, error, .. } => OutEvent::Failure {
error: anyhow!("quote protocol failed due to {:?}", error),
peer,
error
)),
},
}
}
}
@ -121,7 +121,7 @@ impl From<execution_setup::OutEvent> for OutEvent {
bob_peer_id,
state3: Box::new(state3),
},
Failure(err) => OutEvent::Failure(err),
Failure { peer, error } => OutEvent::Failure { peer, error },
}
}
}
@ -130,8 +130,11 @@ impl From<transfer_proof::OutEvent> for OutEvent {
fn from(event: transfer_proof::OutEvent) -> Self {
use crate::protocol::alice::transfer_proof::OutEvent::*;
match event {
Acknowledged => OutEvent::TransferProofAcknowledged,
Failure(err) => OutEvent::Failure(err.context("Failure with Transfer Proof")),
Acknowledged(peer) => OutEvent::TransferProofAcknowledged(peer),
Failure { peer, error } => OutEvent::Failure {
peer,
error: error.context("Failure with Transfer Proof"),
},
}
}
}
@ -140,12 +143,16 @@ impl From<encrypted_signature::OutEvent> for OutEvent {
fn from(event: encrypted_signature::OutEvent) -> Self {
use crate::protocol::alice::encrypted_signature::OutEvent::*;
match event {
MsgReceived { msg, channel } => OutEvent::EncryptedSignature {
MsgReceived { msg, channel, peer } => OutEvent::EncryptedSignature {
msg: Box::new(msg),
channel,
peer,
},
AckSent => OutEvent::ResponseSent,
Failure(err) => OutEvent::Failure(err.context("Failure with Encrypted Signature")),
Failure { peer, error } => OutEvent::Failure {
peer,
error: error.context("Failure with Encrypted Signature"),
},
}
}
}

@ -5,7 +5,7 @@ use libp2p::request_response::{
ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent,
RequestResponseMessage, ResponseChannel,
};
use libp2p::NetworkBehaviour;
use libp2p::{NetworkBehaviour, PeerId};
use std::time::Duration;
use tracing::debug;
@ -14,9 +14,13 @@ pub enum OutEvent {
MsgReceived {
msg: EncryptedSignature,
channel: ResponseChannel<()>,
peer: PeerId,
},
AckSent,
Failure(Error),
Failure {
peer: PeerId,
error: Error,
},
}
/// A `NetworkBehaviour` that represents receiving the Bitcoin encrypted
@ -67,18 +71,24 @@ impl From<RequestResponseEvent<EncryptedSignature, ()>> for OutEvent {
OutEvent::MsgReceived {
msg: request,
channel,
peer,
}
}
RequestResponseEvent::Message {
message: RequestResponseMessage::Response { .. },
..
} => OutEvent::Failure(anyhow!("Alice should not get a Response")),
RequestResponseEvent::InboundFailure { error, .. } => {
OutEvent::Failure(anyhow!("Inbound failure: {:?}", error))
}
RequestResponseEvent::OutboundFailure { error, .. } => {
OutEvent::Failure(anyhow!("Outbound failure: {:?}", error))
}
peer,
} => OutEvent::Failure {
peer,
error: anyhow!("Alice should not get a Response"),
},
RequestResponseEvent::InboundFailure { error, peer, .. } => OutEvent::Failure {
peer,
error: anyhow!("Inbound failure: {:?}", error),
},
RequestResponseEvent::OutboundFailure { error, peer, .. } => OutEvent::Failure {
peer,
error: anyhow!("Outbound failure: {:?}", error),
},
RequestResponseEvent::ResponseSent { .. } => OutEvent::AckSent,
}
}

@ -9,13 +9,16 @@ use crate::protocol::bob::EncryptedSignature;
use crate::seed::Seed;
use crate::{bitcoin, kraken, monero};
use anyhow::{bail, Context, Result};
use futures::future;
use futures::future::{BoxFuture, FutureExt};
use futures::stream::{FuturesUnordered, StreamExt};
use libp2p::core::Multiaddr;
use libp2p::futures::FutureExt;
use libp2p::{PeerId, Swarm};
use rand::rngs::OsRng;
use std::collections::HashMap;
use std::convert::Infallible;
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc};
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, error, trace};
use uuid::Uuid;
@ -30,19 +33,19 @@ pub struct EventLoop<RS> {
latest_rate: RS,
max_buy: bitcoin::Amount,
recv_encrypted_signature: broadcast::Sender<EncryptedSignature>,
send_transfer_proof: mpsc::Receiver<(PeerId, TransferProof)>,
// Only used to produce new handles
send_transfer_proof_sender: mpsc::Sender<(PeerId, TransferProof)>,
/// Stores a sender per peer for incoming [`EncryptedSignature`]s.
recv_encrypted_signature: HashMap<PeerId, oneshot::Sender<EncryptedSignature>>,
/// Stores a list of futures, waiting for transfer proof which will be sent
/// to the given peer.
send_transfer_proof: FuturesUnordered<BoxFuture<'static, Result<(PeerId, TransferProof)>>>,
swap_sender: mpsc::Sender<Swap>,
}
#[derive(Debug)]
pub struct EventLoopHandle {
recv_encrypted_signature: broadcast::Receiver<EncryptedSignature>,
send_transfer_proof: mpsc::Sender<(PeerId, TransferProof)>,
recv_encrypted_signature: Option<oneshot::Receiver<EncryptedSignature>>,
send_transfer_proof: Option<oneshot::Sender<TransferProof>>,
}
impl<LR> EventLoop<LR>
@ -74,8 +77,6 @@ where
Swarm::listen_on(&mut swarm, listen_address.clone())
.with_context(|| format!("Address is not supported: {:#}", listen_address))?;
let recv_encrypted_signature = BroadcastChannels::default();
let send_transfer_proof = MpscChannels::default();
let swap_channel = MpscChannels::default();
let event_loop = EventLoop {
@ -86,30 +87,26 @@ where
monero_wallet,
db,
latest_rate,
recv_encrypted_signature: recv_encrypted_signature.sender,
send_transfer_proof: send_transfer_proof.receiver,
send_transfer_proof_sender: send_transfer_proof.sender,
swap_sender: swap_channel.sender,
max_buy,
recv_encrypted_signature: Default::default(),
send_transfer_proof: Default::default(),
};
Ok((event_loop, swap_channel.receiver))
}
pub fn new_handle(&self) -> EventLoopHandle {
EventLoopHandle {
recv_encrypted_signature: self.recv_encrypted_signature.subscribe(),
send_transfer_proof: self.send_transfer_proof_sender.clone(),
}
}
pub fn peer_id(&self) -> PeerId {
self.peer_id
}
pub async fn run(mut self) {
// ensure that the send_transfer_proof stream is NEVER empty, otherwise it will
// terminate forever.
self.send_transfer_proof.push(future::pending().boxed());
loop {
tokio::select! {
swarm_event = self.swarm.next().fuse() => {
swarm_event = self.swarm.next() => {
match swarm_event {
OutEvent::ConnectionEstablished(alice) => {
debug!("Connection Established with {}", alice);
@ -161,27 +158,43 @@ where
OutEvent::ExecutionSetupDone{bob_peer_id, state3} => {
let _ = self.handle_execution_setup_done(bob_peer_id, *state3).await;
}
OutEvent::TransferProofAcknowledged => {
trace!("Bob acknowledged transfer proof");
OutEvent::TransferProofAcknowledged(peer) => {
trace!(%peer, "Bob acknowledged transfer proof");
}
OutEvent::EncryptedSignature{ msg, channel } => {
let _ = self.recv_encrypted_signature.send(*msg);
// Send back empty response so that the request/response protocol completes.
OutEvent::EncryptedSignature{ msg, channel, peer } => {
match self.recv_encrypted_signature.remove(&peer) {
Some(sender) => {
// this failing just means the receiver is no longer interested ...
let _ = sender.send(*msg);
},
None => {
tracing::warn!(%peer, "No sender for encrypted signature, maybe already handled?")
}
}
if let Err(error) = self.swarm.send_encrypted_signature_ack(channel) {
error!("Failed to send Encrypted Signature ack: {:?}", error);
}
}
OutEvent::ResponseSent => {}
OutEvent::Failure(err) => {
error!("Communication error: {:#}", err);
OutEvent::Failure {peer, error} => {
error!(%peer, "Communication error: {:#}", error);
}
}
},
transfer_proof = self.send_transfer_proof.recv().fuse() => {
if let Some((bob_peer_id, msg)) = transfer_proof {
self.swarm.send_transfer_proof(bob_peer_id, msg);
next_transfer_proof = self.send_transfer_proof.next() => {
match next_transfer_proof {
Some(Ok((peer, transfer_proof))) => {
self.swarm.send_transfer_proof(peer, transfer_proof);
},
Some(Err(_)) => {
tracing::debug!("A swap stopped without sending a transfer proof");
}
None => {
unreachable!("stream of transfer proof receivers must never terminate")
}
}
},
}
}
}
}
@ -230,11 +243,10 @@ where
async fn handle_execution_setup_done(&mut self, bob_peer_id: PeerId, state3: State3) {
let swap_id = Uuid::new_v4();
let handle = self.new_handle();
let handle = self.new_handle(bob_peer_id);
let initial_state = AliceState::Started {
state3: Box::new(state3),
bob_peer_id,
};
let swap = Swap {
@ -251,6 +263,29 @@ where
tracing::warn!(%swap_id, "Swap cannot be spawned: {}", error);
}
}
/// Create a new [`EventLoopHandle`] that is scoped for communication with
/// the given peer.
fn new_handle(&mut self, peer: PeerId) -> EventLoopHandle {
let (send_transfer_proof_sender, send_transfer_proof_receiver) = oneshot::channel();
let (recv_enc_sig_sender, recv_enc_sig_receiver) = oneshot::channel();
self.recv_encrypted_signature
.insert(peer, recv_enc_sig_sender);
self.send_transfer_proof.push(
async move {
let transfer_proof = send_transfer_proof_receiver.await?;
Ok((peer, transfer_proof))
}
.boxed(),
);
EventLoopHandle {
recv_encrypted_signature: Some(recv_enc_sig_receiver),
send_transfer_proof: Some(send_transfer_proof_sender),
}
}
}
pub trait LatestRate {
@ -277,13 +312,24 @@ impl LatestRate for kraken::RateUpdateStream {
impl EventLoopHandle {
pub async fn recv_encrypted_signature(&mut self) -> Result<EncryptedSignature> {
self.recv_encrypted_signature
.recv()
.await
.context("Failed to receive Bitcoin encrypted signature from Bob")
let signature = self
.recv_encrypted_signature
.take()
.context("Encrypted signature was already received")?
.await?;
Ok(signature)
}
pub async fn send_transfer_proof(&mut self, bob: PeerId, msg: TransferProof) -> Result<()> {
let _ = self.send_transfer_proof.send((bob, msg)).await?;
pub async fn send_transfer_proof(&mut self, msg: TransferProof) -> Result<()> {
if self
.send_transfer_proof
.take()
.context("Transfer proof was already sent")?
.send(msg)
.is_err()
{
bail!("Failed to send transfer proof, receiver no longer listening?")
}
Ok(())
}
@ -308,21 +354,3 @@ impl<T> Default for MpscChannels<T> {
MpscChannels { sender, receiver }
}
}
#[allow(missing_debug_implementations)]
struct BroadcastChannels<T>
where
T: Clone,
{
sender: broadcast::Sender<T>,
}
impl<T> Default for BroadcastChannels<T>
where
T: Clone,
{
fn default() -> Self {
let (sender, _receiver) = broadcast::channel(100);
BroadcastChannels { sender }
}
}

@ -29,7 +29,7 @@ pub struct Message3 {
#[derive(Debug)]
pub enum OutEvent {
Done { bob_peer_id: PeerId, state3: State3 },
Failure(Error),
Failure { peer: PeerId, error: Error },
}
impl From<BehaviourOutEvent<(PeerId, State3), (), Error>> for OutEvent {
@ -39,7 +39,7 @@ impl From<BehaviourOutEvent<(PeerId, State3), (), Error>> for OutEvent {
bob_peer_id,
state3,
},
BehaviourOutEvent::Inbound(_, Err(e)) => OutEvent::Failure(e),
BehaviourOutEvent::Inbound(peer, Err(e)) => OutEvent::Failure { peer, error: e },
BehaviourOutEvent::Outbound(..) => unreachable!("Alice only supports inbound"),
}
}

@ -7,7 +7,6 @@ use crate::protocol::bob::{Message0, Message2, Message4};
use crate::protocol::CROSS_CURVE_PROOF_SYSTEM;
use crate::{bitcoin, monero};
use anyhow::{anyhow, bail, Context, Result};
use libp2p::PeerId;
use monero_rpc::wallet::BlockHeight;
use rand::{CryptoRng, RngCore};
use serde::{Deserialize, Serialize};
@ -17,11 +16,9 @@ use std::fmt;
#[derive(Debug)]
pub enum AliceState {
Started {
bob_peer_id: PeerId,
state3: Box<State3>,
},
BtcLocked {
bob_peer_id: PeerId,
state3: Box<State3>,
},
XmrLocked {

@ -7,10 +7,8 @@ use crate::protocol::alice::TransferProof;
use crate::{bitcoin, monero};
use anyhow::{bail, Context, Result};
use futures::pin_mut;
use libp2p::PeerId;
pub async fn lock_xmr(
bob_peer_id: PeerId,
state3: alice::State3,
event_loop_handle: &mut EventLoopHandle,
monero_wallet: &monero::Wallet,
@ -30,7 +28,7 @@ pub async fn lock_xmr(
// Otherwise Alice might publish the lock tx twice!
event_loop_handle
.send_transfer_proof(bob_peer_id, TransferProof {
.send_transfer_proof(TransferProof {
tx_lock_proof: transfer_proof,
})
.await?;

@ -76,10 +76,7 @@ async fn run_until_internal(
Ok(state)
} else {
match state {
AliceState::Started {
state3,
bob_peer_id,
} => {
AliceState::Started { state3 } => {
timeout(
env_config.bob_time_to_act,
bitcoin_wallet
@ -94,10 +91,7 @@ async fn run_until_internal(
})
.await?;
let state = AliceState::BtcLocked {
bob_peer_id,
state3,
};
let state = AliceState::BtcLocked { state3 };
let db_state = (&state).into();
db.insert_latest_state(swap_id, database::Swap::Alice(db_state))
@ -114,21 +108,12 @@ async fn run_until_internal(
)
.await
}
AliceState::BtcLocked {
bob_peer_id,
state3,
} => {
AliceState::BtcLocked { state3 } => {
// Record the current monero wallet block height so we don't have to scan from
// block 0 for scenarios where we create a refund wallet.
let monero_wallet_restore_blockheight = monero_wallet.block_height().await?;
lock_xmr(
bob_peer_id,
*state3.clone(),
&mut event_loop_handle,
&monero_wallet,
)
.await?;
lock_xmr(*state3.clone(), &mut event_loop_handle, &monero_wallet).await?;
let state = AliceState::XmrLocked {
state3,

@ -16,8 +16,8 @@ pub struct TransferProof {
#[derive(Debug)]
pub enum OutEvent {
Acknowledged,
Failure(Error),
Acknowledged(PeerId),
Failure { peer: PeerId, error: Error },
}
/// A `NetworkBehaviour` that represents sending the Monero transfer proof to
@ -56,23 +56,27 @@ impl From<RequestResponseEvent<TransferProof, ()>> for OutEvent {
match event {
RequestResponseEvent::Message {
message: RequestResponseMessage::Request { .. },
..
} => OutEvent::Failure(anyhow!(
"Alice should never get a transfer proof request from Bob"
)),
peer,
} => OutEvent::Failure {
peer,
error: anyhow!("Alice should never get a transfer proof request from Bob"),
},
RequestResponseEvent::Message {
message: RequestResponseMessage::Response { .. },
..
} => OutEvent::Acknowledged,
RequestResponseEvent::InboundFailure { error, .. } => {
OutEvent::Failure(anyhow!("Inbound failure: {:?}", error))
}
RequestResponseEvent::OutboundFailure { error, .. } => {
OutEvent::Failure(anyhow!("Outbound failure: {:?}", error))
}
RequestResponseEvent::ResponseSent { .. } => {
OutEvent::Failure(anyhow!("Alice should not send a response"))
}
peer,
} => OutEvent::Acknowledged(peer),
RequestResponseEvent::InboundFailure { error, peer, .. } => OutEvent::Failure {
peer,
error: anyhow!("Inbound failure: {:?}", error),
},
RequestResponseEvent::OutboundFailure { error, peer, .. } => OutEvent::Failure {
peer,
error: anyhow!("Outbound failure: {:?}", error),
},
RequestResponseEvent::ResponseSent { peer, .. } => OutEvent::Failure {
peer,
error: anyhow!("Alice should not send a response"),
},
}
}
}

@ -1,49 +0,0 @@
//! A serde module that defines how we want to serialize PeerIds on the
//! HTTP-API.
use libp2p::PeerId;
use serde::de::Error;
use serde::{Deserialize, Deserializer, Serializer};
pub fn serialize<S>(peer_id: &PeerId, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let string = peer_id.to_string();
serializer.serialize_str(&string)
}
#[allow(dead_code)]
pub fn deserialize<'de, D>(deserializer: D) -> Result<PeerId, D::Error>
where
D: Deserializer<'de>,
{
let string = String::deserialize(deserializer)?;
let peer_id = string.parse().map_err(D::Error::custom)?;
Ok(peer_id)
}
#[cfg(test)]
mod tests {
use super::*;
use serde::Serialize;
use spectral::prelude::*;
#[derive(Serialize)]
struct SerializablePeerId(#[serde(with = "super")] PeerId);
#[test]
fn maker_id_serializes_as_expected() {
let peer_id = SerializablePeerId(
"QmfUfpC2frwFvcDzpspnfZitHt5wct6n4kpG5jzgRdsxkY"
.parse()
.unwrap(),
);
let got = serde_json::to_string(&peer_id).expect("failed to serialize peer id");
assert_that(&got)
.is_equal_to(r#""QmfUfpC2frwFvcDzpspnfZitHt5wct6n4kpG5jzgRdsxkY""#.to_string());
}
}
Loading…
Cancel
Save