From 05766d3146ad24ec18160d0de4f84bc0be4f3600 Mon Sep 17 00:00:00 2001 From: "Tobin C. Harding" Date: Fri, 16 Oct 2020 09:14:39 +1100 Subject: [PATCH] Add swap/ Add a binary crate `swap` that implements two nodes (Alice and Bob). With this applied we can start up a node for each role and do: - Bob: Requests current amounts using BTC is input - Alice: Responds with amounts - Bob: (mock) get user input to Ok the amounts ... continue with swap (TODO) --- Cargo.toml | 2 +- swap/Cargo.toml | 32 ++++++ swap/src/alice.rs | 141 ++++++++++++++++++++++++ swap/src/alice/messenger.rs | 135 +++++++++++++++++++++++ swap/src/bob.rs | 157 +++++++++++++++++++++++++++ swap/src/bob/messenger.rs | 117 ++++++++++++++++++++ swap/src/cli.rs | 14 +++ swap/src/lib.rs | 95 ++++++++++++++++ swap/src/main.rs | 96 ++++++++++++++++ swap/src/network.rs | 18 +++ swap/src/network/peer_tracker.rs | 148 +++++++++++++++++++++++++ swap/src/network/request_response.rs | 109 +++++++++++++++++++ swap/src/network/transport.rs | 53 +++++++++ swap/src/trace.rs | 25 +++++ 14 files changed, 1141 insertions(+), 1 deletion(-) create mode 100644 swap/Cargo.toml create mode 100644 swap/src/alice.rs create mode 100644 swap/src/alice/messenger.rs create mode 100644 swap/src/bob.rs create mode 100644 swap/src/bob/messenger.rs create mode 100644 swap/src/cli.rs create mode 100644 swap/src/lib.rs create mode 100644 swap/src/main.rs create mode 100644 swap/src/network.rs create mode 100644 swap/src/network/peer_tracker.rs create mode 100644 swap/src/network/request_response.rs create mode 100644 swap/src/network/transport.rs create mode 100644 swap/src/trace.rs diff --git a/Cargo.toml b/Cargo.toml index e0174381..9d880136 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,2 +1,2 @@ [workspace] -members = ["monero-harness", "xmr-btc"] +members = ["monero-harness", "xmr-btc", "swap"] diff --git a/swap/Cargo.toml b/swap/Cargo.toml new file mode 100644 index 00000000..cb22e3d1 --- /dev/null +++ b/swap/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "swap" +version = "0.1.0" +authors = ["CoBloX developers "] +edition = "2018" +description = "XMR/BTC trustless atomic swaps." + +[dependencies] +anyhow = "1" +async-trait = "0.1" +atty = "0.2" +bitcoin = "0.25" # TODO: Upgrade other crates in this repo to use this version. +derivative = "2" +futures = { version = "0.3", default-features = false } +libp2p = { version = "0.28", default-features = false, features = ["tcp-tokio", "yamux", "mplex", "dns", "noise", "request-response"] } +libp2p-tokio-socks5 = "0.3" +log = { version = "0.4", features = ["serde"] } +monero = "0.9" +rand = "0.7" +serde = { version = "1", features = ["derive"] } +serde_derive = "1.0" +serde_json = "1" +structopt = "0.3" +time = "0.2" +tokio = { version = "0.2", features = ["rt-threaded", "time", "macros", "sync"] } +tracing = { version = "0.1", features = ["attributes"] } +tracing-core = "0.1" +tracing-futures = { version = "0.2", features = ["std-future", "futures-03"] } +tracing-log = "0.1" +tracing-subscriber = { version = "0.2", default-features = false, features = ["fmt", "ansi", "env-filter"] } +void = "1" +xmr-btc = { path = "../xmr-btc" } \ No newline at end of file diff --git a/swap/src/alice.rs b/swap/src/alice.rs new file mode 100644 index 00000000..5addb61f --- /dev/null +++ b/swap/src/alice.rs @@ -0,0 +1,141 @@ +//! Run an XMR/BTC swap in the role of Alice. +//! Alice holds XMR and wishes receive BTC. +use anyhow::Result; +use libp2p::{ + core::{identity::Keypair, Multiaddr}, + request_response::ResponseChannel, + NetworkBehaviour, PeerId, +}; +use std::{thread, time::Duration}; +use tracing::{debug, warn}; + +mod messenger; + +use self::messenger::*; +use crate::{ + monero, + network::{ + peer_tracker::{self, PeerTracker}, + request_response::{AliceToBob, TIMEOUT}, + transport, TokioExecutor, + }, + Never, SwapParams, +}; + +pub type Swarm = libp2p::Swarm; + +pub async fn swap(listen: Multiaddr) -> Result<()> { + let mut swarm = new_swarm(listen)?; + + match swarm.next().await { + BehaviourOutEvent::Request(messenger::BehaviourOutEvent::Btc { btc, channel }) => { + debug!("Got request from Bob"); + let params = SwapParams { + btc, + // TODO: Do a real calculation. + xmr: monero::Amount::from_piconero(10), + }; + + let msg = AliceToBob::Amounts(params); + swarm.send(channel, msg); + } + other => panic!("unexpected event: {:?}", other), + } + + warn!("parking thread ..."); + thread::park(); + Ok(()) +} + +fn new_swarm(listen: Multiaddr) -> Result { + use anyhow::Context as _; + + let behaviour = Alice::default(); + + let local_key_pair = behaviour.identity(); + let local_peer_id = behaviour.peer_id(); + + let transport = transport::build(local_key_pair)?; + + let mut swarm = libp2p::swarm::SwarmBuilder::new(transport, behaviour, local_peer_id.clone()) + .executor(Box::new(TokioExecutor { + handle: tokio::runtime::Handle::current(), + })) + .build(); + + Swarm::listen_on(&mut swarm, listen.clone()) + .with_context(|| format!("Address is not supported: {:#}", listen))?; + + tracing::info!("Initialized swarm: {}", local_peer_id); + + Ok(swarm) +} + +#[allow(clippy::large_enum_variant)] +#[derive(Debug)] +pub enum BehaviourOutEvent { + Request(messenger::BehaviourOutEvent), + ConnectionEstablished(PeerId), + Never, // FIXME: Why do we need this? +} + +impl From for BehaviourOutEvent { + fn from(_: Never) -> Self { + BehaviourOutEvent::Never + } +} + +impl From for BehaviourOutEvent { + fn from(event: messenger::BehaviourOutEvent) -> Self { + BehaviourOutEvent::Request(event) + } +} + +impl From for BehaviourOutEvent { + fn from(event: peer_tracker::BehaviourOutEvent) -> Self { + match event { + peer_tracker::BehaviourOutEvent::ConnectionEstablished(id) => { + BehaviourOutEvent::ConnectionEstablished(id) + } + } + } +} + +/// A `NetworkBehaviour` that represents an XMR/BTC swap node as Alice. +#[derive(NetworkBehaviour)] +#[behaviour(out_event = "BehaviourOutEvent", event_process = false)] +#[allow(missing_debug_implementations)] +pub struct Alice { + net: Messenger, + pt: PeerTracker, + #[behaviour(ignore)] + identity: Keypair, +} + +impl Alice { + pub fn identity(&self) -> Keypair { + self.identity.clone() + } + + pub fn peer_id(&self) -> PeerId { + PeerId::from(self.identity.public()) + } + + /// Alice always sends her messages as a response to a request from Bob. + pub fn send(&mut self, channel: ResponseChannel, msg: AliceToBob) { + self.net.send(channel, msg); + } +} + +impl Default for Alice { + fn default() -> Self { + let identity = Keypair::generate_ed25519(); + let timeout = Duration::from_secs(TIMEOUT); + + Self { + net: Messenger::new(timeout), + pt: PeerTracker::default(), + identity, + } + } +} diff --git a/swap/src/alice/messenger.rs b/swap/src/alice/messenger.rs new file mode 100644 index 00000000..62355006 --- /dev/null +++ b/swap/src/alice/messenger.rs @@ -0,0 +1,135 @@ +use anyhow::Result; +use libp2p::{ + request_response::{ + handler::RequestProtocol, ProtocolSupport, RequestId, RequestResponse, + RequestResponseConfig, RequestResponseEvent, RequestResponseMessage, ResponseChannel, + }, + swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}, + NetworkBehaviour, PeerId, +}; +use std::{ + collections::VecDeque, + task::{Context, Poll}, + time::Duration, +}; +use tracing::{debug, error}; + +use crate::{ + bitcoin, + network::request_response::{AliceToBob, BobToAlice, Codec, Protocol}, + Never, +}; + +#[derive(Debug)] +pub enum BehaviourOutEvent { + Btc { + btc: bitcoin::Amount, + channel: ResponseChannel, + }, +} + +/// A `NetworkBehaviour` that represents an XMR/BTC swap node as Bob. +#[derive(NetworkBehaviour)] +#[behaviour(out_event = "BehaviourOutEvent", poll_method = "poll")] +#[allow(missing_debug_implementations)] +pub struct Messenger { + rr: RequestResponse, + #[behaviour(ignore)] + events: VecDeque, +} + +impl Messenger { + pub fn new(timeout: Duration) -> Self { + let mut config = RequestResponseConfig::default(); + config.set_request_timeout(timeout); + + Self { + rr: RequestResponse::new( + Codec::default(), + vec![(Protocol, ProtocolSupport::Full)], + config, + ), + events: Default::default(), + } + } + + /// Alice always sends her messages as a response to a request from Bob. + pub fn send(&mut self, channel: ResponseChannel, msg: AliceToBob) { + self.rr.send_response(channel, msg); + } + + pub async fn request_amounts( + &mut self, + alice: PeerId, + btc: bitcoin::Amount, + ) -> Result { + let msg = BobToAlice::AmountsFromBtc(btc); + let id = self.rr.send_request(&alice, msg); + debug!("Request sent to: {}", alice); + + Ok(id) + } + + fn poll( + &mut self, + _: &mut Context<'_>, + _: &mut impl PollParameters, + ) -> Poll, BehaviourOutEvent>> { + if let Some(event) = self.events.pop_front() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + } + + Poll::Pending + } +} + +impl NetworkBehaviourEventProcess> for Messenger { + fn inject_event(&mut self, event: RequestResponseEvent) { + match event { + RequestResponseEvent::Message { + peer: _, + message: + RequestResponseMessage::Request { + request, + request_id: _, + channel, + }, + } => match request { + BobToAlice::AmountsFromBtc(btc) => self + .events + .push_back(BehaviourOutEvent::Btc { btc, channel }), + _ => panic!("unexpected request"), + }, + RequestResponseEvent::Message { + peer: _, + message: + RequestResponseMessage::Response { + response: _, + request_id: _, + }, + } => panic!("unexpected response"), + RequestResponseEvent::InboundFailure { + peer: _, + request_id: _, + error, + } => { + error!("Inbound failure: {:?}", error); + } + RequestResponseEvent::OutboundFailure { + peer: _, + request_id: _, + error, + } => { + error!("Outbound failure: {:?}", error); + } + } + } +} + +impl libp2p::swarm::NetworkBehaviourEventProcess<()> for Messenger { + fn inject_event(&mut self, _event: ()) {} +} + +impl libp2p::swarm::NetworkBehaviourEventProcess for Messenger { + fn inject_event(&mut self, _: Never) {} +} diff --git a/swap/src/bob.rs b/swap/src/bob.rs new file mode 100644 index 00000000..33e4520b --- /dev/null +++ b/swap/src/bob.rs @@ -0,0 +1,157 @@ +//! Run an XMR/BTC swap in the role of Bob. +//! Bob holds BTC and wishes receive XMR. +use anyhow::Result; +use futures::{ + channel::mpsc::{Receiver, Sender}, + StreamExt, +}; +use libp2p::{core::identity::Keypair, Multiaddr, NetworkBehaviour, PeerId}; +use std::{process, thread, time::Duration}; +use tracing::{debug, info, warn}; + +mod messenger; + +use self::messenger::*; +use crate::{ + bitcoin, + network::{ + peer_tracker::{self, PeerTracker}, + request_response::TIMEOUT, + transport, TokioExecutor, + }, + Cmd, Never, Rsp, +}; + +pub async fn swap( + btc: u64, + addr: Multiaddr, + mut cmd_tx: Sender, + mut rsp_rx: Receiver, +) -> Result<()> { + let mut swarm = new_swarm()?; + + libp2p::Swarm::dial_addr(&mut swarm, addr)?; + let id = match swarm.next().await { + BehaviourOutEvent::ConnectionEstablished(id) => id, + other => panic!("unexpected event: {:?}", other), + }; + info!("Connection established."); + + swarm.request_amounts(id, btc).await; + + match swarm.next().await { + BehaviourOutEvent::Response(messenger::BehaviourOutEvent::Amounts(p)) => { + debug!("Got response from Alice: {:?}", p); + let cmd = Cmd::VerifyAmounts(p); + cmd_tx.try_send(cmd)?; + let response = rsp_rx.next().await; + if response == Some(Rsp::Abort) { + info!("Amounts no good, aborting ..."); + process::exit(0); + } + info!("User verified amounts, continuing with swap ..."); + } + other => panic!("unexpected event: {:?}", other), + } + + warn!("parking thread ..."); + thread::park(); + Ok(()) +} + +pub type Swarm = libp2p::Swarm; + +fn new_swarm() -> Result { + let behaviour = Bob::default(); + + let local_key_pair = behaviour.identity(); + let local_peer_id = behaviour.peer_id(); + + let transport = transport::build(local_key_pair)?; + + let swarm = libp2p::swarm::SwarmBuilder::new(transport, behaviour, local_peer_id.clone()) + .executor(Box::new(TokioExecutor { + handle: tokio::runtime::Handle::current(), + })) + .build(); + + info!("Initialized swarm with identity {}", local_peer_id); + + Ok(swarm) +} + +#[allow(clippy::large_enum_variant)] +#[derive(Debug)] +pub enum BehaviourOutEvent { + Response(messenger::BehaviourOutEvent), + ConnectionEstablished(PeerId), + Never, // FIXME: Why do we need this? +} + +impl From for BehaviourOutEvent { + fn from(_: Never) -> Self { + BehaviourOutEvent::Never + } +} + +impl From for BehaviourOutEvent { + fn from(event: messenger::BehaviourOutEvent) -> Self { + BehaviourOutEvent::Response(event) + } +} + +impl From for BehaviourOutEvent { + fn from(event: peer_tracker::BehaviourOutEvent) -> Self { + match event { + peer_tracker::BehaviourOutEvent::ConnectionEstablished(id) => { + BehaviourOutEvent::ConnectionEstablished(id) + } + } + } +} + +/// A `NetworkBehaviour` that represents an XMR/BTC swap node as Bob. +#[derive(NetworkBehaviour)] +#[behaviour(out_event = "BehaviourOutEvent", event_process = false)] +#[allow(missing_debug_implementations)] +pub struct Bob { + net: Messenger, + pt: PeerTracker, + #[behaviour(ignore)] + identity: Keypair, +} + +impl Bob { + pub fn identity(&self) -> Keypair { + self.identity.clone() + } + + pub fn peer_id(&self) -> PeerId { + PeerId::from(self.identity.public()) + } + + /// Sends a message to Alice to get current amounts based on `btc`. + pub async fn request_amounts(&mut self, alice: PeerId, btc: u64) { + let btc = bitcoin::Amount::from_sat(btc); + let _id = self.net.request_amounts(alice.clone(), btc).await; + debug!("Requesting amounts from: {}", alice); + } + + /// Returns Alice's peer id if we are connected. + pub fn peer_id_of_alice(&self) -> Option { + self.pt.counterparty() + } +} + +impl Default for Bob { + fn default() -> Bob { + let identity = Keypair::generate_ed25519(); + let timeout = Duration::from_secs(TIMEOUT); + + Self { + net: Messenger::new(timeout), + pt: PeerTracker::default(), + identity, + } + } +} diff --git a/swap/src/bob/messenger.rs b/swap/src/bob/messenger.rs new file mode 100644 index 00000000..e153addc --- /dev/null +++ b/swap/src/bob/messenger.rs @@ -0,0 +1,117 @@ +use anyhow::Result; +use libp2p::{ + request_response::{ + handler::RequestProtocol, ProtocolSupport, RequestId, RequestResponse, + RequestResponseConfig, RequestResponseEvent, RequestResponseMessage, + }, + swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}, + NetworkBehaviour, PeerId, +}; +use std::{ + collections::VecDeque, + task::{Context, Poll}, + time::Duration, +}; +use tracing::{debug, error}; + +use crate::{ + bitcoin, + network::request_response::{AliceToBob, BobToAlice, Codec, Protocol}, + Never, SwapParams, +}; + +#[derive(Debug)] +pub enum BehaviourOutEvent { + Amounts(SwapParams), +} + +/// A `NetworkBehaviour` that represents an XMR/BTC swap node as Bob. +#[derive(NetworkBehaviour)] +#[behaviour(out_event = "BehaviourOutEvent", poll_method = "poll")] +#[allow(missing_debug_implementations)] +pub struct Messenger { + rr: RequestResponse, + #[behaviour(ignore)] + events: VecDeque, +} + +impl Messenger { + pub fn new(timeout: Duration) -> Self { + let mut config = RequestResponseConfig::default(); + config.set_request_timeout(timeout); + + Self { + rr: RequestResponse::new( + Codec::default(), + vec![(Protocol, ProtocolSupport::Full)], + config, + ), + events: Default::default(), + } + } + + pub async fn request_amounts( + &mut self, + alice: PeerId, + btc: bitcoin::Amount, + ) -> Result { + debug!("Sending request ..."); + let msg = BobToAlice::AmountsFromBtc(btc); + let id = self.rr.send_request(&alice, msg); + debug!("Sent."); + + Ok(id) + } + + fn poll( + &mut self, + _: &mut Context<'_>, + _: &mut impl PollParameters, + ) -> Poll, BehaviourOutEvent>> { + if let Some(event) = self.events.pop_front() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + } + + Poll::Pending + } +} + +impl NetworkBehaviourEventProcess> for Messenger { + fn inject_event(&mut self, event: RequestResponseEvent) { + match event { + RequestResponseEvent::Message { + peer: _, + message: RequestResponseMessage::Request { .. }, + } => panic!("Bob should never get a request from Alice"), + RequestResponseEvent::Message { + peer: _, + message: + RequestResponseMessage::Response { + response, + request_id: _, + }, + } => match response { + AliceToBob::Amounts(p) => self.events.push_back(BehaviourOutEvent::Amounts(p)), + }, + + RequestResponseEvent::InboundFailure { .. } => { + panic!("Bob should never get a request from Alice, so should never get an InboundFailure"); + } + RequestResponseEvent::OutboundFailure { + peer: _, + request_id: _, + error, + } => { + error!("Outbound failure: {:?}", error); + } + } + } +} + +impl libp2p::swarm::NetworkBehaviourEventProcess<()> for Messenger { + fn inject_event(&mut self, _event: ()) {} +} + +impl libp2p::swarm::NetworkBehaviourEventProcess for Messenger { + fn inject_event(&mut self, _: Never) {} +} diff --git a/swap/src/cli.rs b/swap/src/cli.rs new file mode 100644 index 00000000..fd4f9a59 --- /dev/null +++ b/swap/src/cli.rs @@ -0,0 +1,14 @@ +#[derive(structopt::StructOpt, Debug)] +pub struct Options { + /// Run the swap as Alice. + #[structopt(long = "as-alice")] + pub as_alice: bool, + + /// Run the swap as Bob and try to swap this many XMR (in piconero). + #[structopt(long = "picos")] + pub piconeros: Option, + + /// Run the swap as Bob and try to swap this many BTC (in satoshi). + #[structopt(long = "sats")] + pub satoshis: Option, +} diff --git a/swap/src/lib.rs b/swap/src/lib.rs new file mode 100644 index 00000000..c5c22421 --- /dev/null +++ b/swap/src/lib.rs @@ -0,0 +1,95 @@ +use serde::{Deserialize, Serialize}; + +pub mod alice; +pub mod bob; +pub mod network; + +pub const ONE_BTC: u64 = 100_000_000; + +pub type Never = std::convert::Infallible; + +/// Commands sent from Bob to the main task. +#[derive(Debug)] +pub enum Cmd { + VerifyAmounts(SwapParams), +} + +/// Responses send from the main task back to Bob. +#[derive(Debug, PartialEq)] +pub enum Rsp { + Verified, + Abort, +} + +/// XMR/BTC swap parameters. +#[derive(Copy, Clone, Debug, Serialize, Deserialize)] +pub struct SwapParams { + /// Amount of BTC to swap. + pub btc: bitcoin::Amount, + /// Amount of XMR to swap. + pub xmr: monero::Amount, +} + +// FIXME: Amount modules are a quick hack so we can derive serde. + +pub mod monero { + use serde::{Deserialize, Serialize}; + use std::fmt; + + #[derive(Copy, Clone, Debug, Serialize, Deserialize)] + pub struct Amount(u64); + + impl Amount { + /// Create an [Amount] with piconero precision and the given number of + /// piconeros. + /// + /// A piconero (a.k.a atomic unit) is equal to 1e-12 XMR. + pub fn from_piconero(amount: u64) -> Self { + Amount(amount) + } + pub fn as_piconero(&self) -> u64 { + self.0 + } + } + + impl From for u64 { + fn from(from: Amount) -> u64 { + from.0 + } + } + + impl fmt::Display for Amount { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{} piconeros", self.0) + } + } +} + +pub mod bitcoin { + use serde::{Deserialize, Serialize}; + use std::fmt; + + #[derive(Copy, Clone, Debug, Serialize, Deserialize)] + pub struct Amount(u64); + + impl Amount { + /// The zero amount. + pub const ZERO: Amount = Amount(0); + /// Exactly one satoshi. + pub const ONE_SAT: Amount = Amount(1); + /// Exactly one bitcoin. + pub const ONE_BTC: Amount = Amount(100_000_000); + + /// Create an [Amount] with satoshi precision and the given number of + /// satoshis. + pub fn from_sat(satoshi: u64) -> Amount { + Amount(satoshi) + } + } + + impl fmt::Display for Amount { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{} satoshis", self.0) + } + } +} diff --git a/swap/src/main.rs b/swap/src/main.rs new file mode 100644 index 00000000..b95e5ba2 --- /dev/null +++ b/swap/src/main.rs @@ -0,0 +1,96 @@ +#![warn( + unused_extern_crates, + missing_debug_implementations, + missing_copy_implementations, + rust_2018_idioms, + clippy::cast_possible_truncation, + clippy::cast_sign_loss, + clippy::fallible_impl_from, + clippy::cast_precision_loss, + clippy::cast_possible_wrap, + clippy::dbg_macro +)] +#![forbid(unsafe_code)] + +use anyhow::{bail, Result}; +use futures::{channel::mpsc, StreamExt}; +use libp2p::Multiaddr; +use log::LevelFilter; +use structopt::StructOpt; +use tracing::info; + +mod cli; +mod trace; + +use cli::Options; +use swap::{alice, bob, Cmd, Rsp, SwapParams}; + +// TODO: Add root seed file instead of generating new seed each run. + +// Alice's address and port until we have a config file. +pub const PORT: u16 = 9876; // Arbitrarily chosen. +pub const ADDR: &str = "127.0.0.1"; + +#[tokio::main] +async fn main() -> Result<()> { + let opt = Options::from_args(); + + trace::init_tracing(LevelFilter::Debug)?; + + let addr = format!("/ip4/{}/tcp/{}", ADDR, PORT); + let alice_addr: Multiaddr = addr.parse().expect("failed to parse Alice's address"); + + if opt.as_alice { + info!("running swap node as Alice ..."); + + if opt.piconeros.is_some() || opt.satoshis.is_some() { + bail!("Alice cannot set the amount to swap via the cli"); + } + + swap_as_alice(alice_addr).await?; + } else { + info!("running swap node as Bob ..."); + + match (opt.piconeros, opt.satoshis) { + (Some(_), Some(_)) => bail!("Please supply only a single amount to swap"), + (None, None) => bail!("Please supply an amount to swap"), + (Some(_picos), _) => todo!("support starting with picos"), + (None, Some(sats)) => { + swap_as_bob(sats, alice_addr).await?; + } + }; + } + + Ok(()) +} + +async fn swap_as_alice(addr: Multiaddr) -> Result<()> { + alice::swap(addr).await +} + +async fn swap_as_bob(sats: u64, addr: Multiaddr) -> Result<()> { + let (cmd_tx, mut cmd_rx) = mpsc::channel(1); + let (mut rsp_tx, rsp_rx) = mpsc::channel(1); + tokio::spawn(bob::swap(sats, addr, cmd_tx, rsp_rx)); + loop { + let read = cmd_rx.next().await; + match read { + Some(cmd) => match cmd { + Cmd::VerifyAmounts(p) => { + if verified(p) { + rsp_tx.try_send(Rsp::Verified)?; + } + } + }, + None => { + info!("Channel closed from other end"); + return Ok(()); + } + } + } +} + +fn verified(_p: SwapParams) -> bool { + // TODO: Read input from the shell. + true +} diff --git a/swap/src/network.rs b/swap/src/network.rs new file mode 100644 index 00000000..90bf44de --- /dev/null +++ b/swap/src/network.rs @@ -0,0 +1,18 @@ +use futures::prelude::*; +use libp2p::core::Executor; +use std::pin::Pin; +use tokio::runtime::Handle; + +pub mod peer_tracker; +pub mod request_response; +pub mod transport; + +pub struct TokioExecutor { + pub handle: Handle, +} + +impl Executor for TokioExecutor { + fn exec(&self, future: Pin + Send>>) { + let _ = self.handle.spawn(future); + } +} diff --git a/swap/src/network/peer_tracker.rs b/swap/src/network/peer_tracker.rs new file mode 100644 index 00000000..7b76563f --- /dev/null +++ b/swap/src/network/peer_tracker.rs @@ -0,0 +1,148 @@ +use futures::task::Context; +use libp2p::{ + core::{connection::ConnectionId, ConnectedPoint}, + swarm::{ + protocols_handler::DummyProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, + PollParameters, + }, + Multiaddr, PeerId, +}; +use std::{ + collections::{hash_map::Entry, HashMap, VecDeque}, + task::Poll, +}; + +#[derive(Debug)] +pub enum BehaviourOutEvent { + ConnectionEstablished(PeerId), +} + +/// A NetworkBehaviour that tracks connections to other peers. +#[derive(Default, Debug)] +pub struct PeerTracker { + connected_peers: HashMap>, + address_hints: HashMap>, + events: VecDeque, +} + +impl PeerTracker { + /// Returns an arbitrary connected counterparty. + /// This is useful if we are connected to a single other node. + pub fn counterparty(&self) -> Option { + // TODO: Refactor to use combinators. + if let Some((id, _)) = self.connected_peers().next() { + return Some(id); + } + None + } + + pub fn connected_peers(&self) -> impl Iterator)> { + self.connected_peers.clone().into_iter() + } + + /// Adds an address hint for the given peer id. The added address is + /// considered most recent and hence is added at the start of the list + /// because libp2p tries to connect with the first address first. + pub fn add_recent_address_hint(&mut self, id: PeerId, addr: Multiaddr) { + let old_addresses = self.address_hints.get_mut(&id); + + match old_addresses { + None => { + let mut hints = VecDeque::new(); + hints.push_back(addr); + self.address_hints.insert(id, hints); + } + Some(hints) => { + hints.push_front(addr); + } + } + } +} + +impl NetworkBehaviour for PeerTracker { + type ProtocolsHandler = DummyProtocolsHandler; + type OutEvent = BehaviourOutEvent; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + DummyProtocolsHandler::default() + } + + /// Note (from libp2p doc): + /// The addresses will be tried in the order returned by this function, + /// which means that they should be ordered by decreasing likelihood of + /// reachability. In other words, the first address should be the most + /// likely to be reachable. + fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec { + let mut addresses: Vec = vec![]; + + // If we are connected then this address is most likely to be valid + if let Some(connected) = self.connected_peers.get(peer) { + for addr in connected.iter() { + addresses.push(addr.clone()) + } + } + + if let Some(hints) = self.address_hints.get(peer) { + for hint in hints { + addresses.push(hint.clone()); + } + } + + addresses + } + + fn inject_connected(&mut self, _: &PeerId) {} + + fn inject_disconnected(&mut self, _: &PeerId) {} + + fn inject_connection_established( + &mut self, + peer: &PeerId, + _: &ConnectionId, + point: &ConnectedPoint, + ) { + if let ConnectedPoint::Dialer { address } = point { + self.connected_peers + .entry(peer.clone()) + .or_default() + .push(address.clone()); + + self.events + .push_back(BehaviourOutEvent::ConnectionEstablished(peer.clone())); + } + } + + fn inject_connection_closed( + &mut self, + peer: &PeerId, + _: &ConnectionId, + point: &ConnectedPoint, + ) { + if let ConnectedPoint::Dialer { address } = point { + match self.connected_peers.entry(peer.clone()) { + Entry::Vacant(_) => {} + Entry::Occupied(mut entry) => { + let addresses = entry.get_mut(); + + if let Some(pos) = addresses.iter().position(|a| a == address) { + addresses.remove(pos); + } + } + } + } + } + + fn inject_event(&mut self, _: PeerId, _: ConnectionId, _: void::Void) {} + + fn poll( + &mut self, + _: &mut Context<'_>, + _: &mut impl PollParameters, + ) -> Poll> { + if let Some(event) = self.events.pop_front() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + } + + Poll::Pending + } +} diff --git a/swap/src/network/request_response.rs b/swap/src/network/request_response.rs new file mode 100644 index 00000000..c986e2a9 --- /dev/null +++ b/swap/src/network/request_response.rs @@ -0,0 +1,109 @@ +use async_trait::async_trait; +use futures::prelude::*; +use libp2p::{ + core::upgrade, + request_response::{ProtocolName, RequestResponseCodec}, +}; +use serde::{Deserialize, Serialize}; +use std::{fmt::Debug, io}; + +use crate::{bitcoin, monero, SwapParams}; + +/// Time to wait for a response back once we send a request. +pub const TIMEOUT: u64 = 3600; // One hour. + +/// Messages Bob sends to Alice. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum BobToAlice { + AmountsFromBtc(bitcoin::Amount), + AmountsFromXmr(monero::Amount), + /* TODO: How are we going to do this when the messages are not Clone? + * Msg(bob::Message), */ +} + +/// Messages Alice sends to Bob. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum AliceToBob { + Amounts(SwapParams), + /* TODO: How are we going to do this when the messages are not Clone? + * Msg(alice::Message) */ +} + +#[derive(Debug, Clone, Copy, Default)] +pub struct Protocol; + +impl ProtocolName for Protocol { + fn protocol_name(&self) -> &[u8] { + b"/xmr/btc/1.0.0" + } +} + +#[derive(Clone, Copy, Debug, Default)] +pub struct Codec; + +#[async_trait] +impl RequestResponseCodec for Codec { + type Protocol = Protocol; + type Request = BobToAlice; + type Response = AliceToBob; + + async fn read_request(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + let message = upgrade::read_one(io, 1024) + .await + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + let mut de = serde_json::Deserializer::from_slice(&message); + let msg = BobToAlice::deserialize(&mut de)?; + + Ok(msg) + } + + async fn read_response( + &mut self, + _: &Self::Protocol, + io: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + let message = upgrade::read_one(io, 1024) + .await + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + let mut de = serde_json::Deserializer::from_slice(&message); + let msg = AliceToBob::deserialize(&mut de)?; + + Ok(msg) + } + + async fn write_request( + &mut self, + _: &Self::Protocol, + io: &mut T, + req: Self::Request, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + let bytes = serde_json::to_vec(&req)?; + upgrade::write_one(io, &bytes).await?; + + Ok(()) + } + + async fn write_response( + &mut self, + _: &Self::Protocol, + io: &mut T, + res: Self::Response, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + let bytes = serde_json::to_vec(&res)?; + upgrade::write_one(io, &bytes).await?; + + Ok(()) + } +} diff --git a/swap/src/network/transport.rs b/swap/src/network/transport.rs new file mode 100644 index 00000000..60d9b3ce --- /dev/null +++ b/swap/src/network/transport.rs @@ -0,0 +1,53 @@ +use anyhow::Result; +use libp2p::{ + core::{ + either::EitherError, + identity, + muxing::StreamMuxerBox, + transport::{boxed::Boxed, timeout::TransportTimeoutError}, + upgrade::{SelectUpgrade, Version}, + Transport, UpgradeError, + }, + dns::{DnsConfig, DnsErr}, + mplex::MplexConfig, + noise::{self, NoiseConfig, NoiseError, X25519Spec}, + tcp::TokioTcpConfig, + yamux, PeerId, +}; +use std::{io, time::Duration}; + +/// Builds a libp2p transport with the following features: +/// - TcpConnection +/// - DNS name resolution +/// - authentication via noise +/// - multiplexing via yamux or mplex +pub fn build(id_keys: identity::Keypair) -> Result { + let dh_keys = noise::Keypair::::new().into_authentic(&id_keys)?; + let noise = NoiseConfig::xx(dh_keys).into_authenticated(); + + let tcp = TokioTcpConfig::new().nodelay(true); + let dns = DnsConfig::new(tcp)?; + + let transport = dns + .upgrade(Version::V1) + .authenticate(noise) + .multiplex(SelectUpgrade::new( + yamux::Config::default(), + MplexConfig::new(), + )) + .map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer))) + .timeout(Duration::from_secs(20)) + .boxed(); + + Ok(transport) +} + +pub type SwapTransport = Boxed< + (PeerId, StreamMuxerBox), + TransportTimeoutError< + EitherError< + EitherError, UpgradeError>, + UpgradeError>, + >, + >, +>; diff --git a/swap/src/trace.rs b/swap/src/trace.rs new file mode 100644 index 00000000..14854cd4 --- /dev/null +++ b/swap/src/trace.rs @@ -0,0 +1,25 @@ +use atty::{self, Stream}; +use log::LevelFilter; +use tracing::{info, subscriber}; +use tracing_log::LogTracer; +use tracing_subscriber::FmtSubscriber; + +pub fn init_tracing(level: log::LevelFilter) -> anyhow::Result<()> { + if level == LevelFilter::Off { + return Ok(()); + } + + // Upstream log filter. + LogTracer::init_with_filter(LevelFilter::Debug)?; + + let is_terminal = atty::is(Stream::Stdout); + let subscriber = FmtSubscriber::builder() + .with_env_filter(format!("swap={}", level)) + .with_ansi(is_terminal) + .finish(); + + subscriber::set_global_default(subscriber)?; + info!("Initialized tracing with level: {}", level); + + Ok(()) +}