From 18eb1ab511788755d49e95a0900788b2113e6451 Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Mon, 28 Jun 2021 16:45:40 +1000 Subject: [PATCH] Use our rendezvous libp2p fork This includes changing the network test framework (currently not in use) to reflect the latest libp2p version. --- Cargo.lock | 157 +++++++++++++----------- swap/Cargo.toml | 2 +- swap/src/asb/event_loop.rs | 26 ++-- swap/src/cli/event_loop.rs | 24 ++-- swap/src/network/test.rs | 236 ++++++++++++++++++++----------------- 5 files changed, 245 insertions(+), 200 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8e586d62..3cc7ebf5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1414,13 +1414,13 @@ dependencies = [ [[package]] name = "hmac-drbg" -version = "0.2.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6e570451493f10f6581b48cdd530413b63ea9e780f544bfd3bdcaa0d89d1a7b" +checksum = "17ea0a1394df5b6574da6e0c1ade9e78868c9fb0a4e5ef4428e32da4676b85b1" dependencies = [ - "digest 0.8.1", - "generic-array 0.12.4", - "hmac 0.7.1", + "digest 0.9.0", + "generic-array 0.14.4", + "hmac 0.8.1", ] [[package]] @@ -1770,9 +1770,8 @@ checksum = "ba4aede83fc3617411dc6993bc8c70919750c1c257c6ca6a502aed6e0e2394ae" [[package]] name = "libp2p" -version = "0.38.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ebbb17eece4aec5bb970880c73825c16ca59ca05a4e41803751e68c7e5f0c618" +version = "0.39.0" +source = "git+https://github.com/comit-network/rust-libp2p?branch=rendezvous#c3cd411cedb12eab72ba26b08c43c2467a8fd8e9" dependencies = [ "atomic", "bytes 1.0.1", @@ -1789,7 +1788,7 @@ dependencies = [ "libp2p-tcp", "libp2p-websocket", "libp2p-yamux", - "parity-multiaddr", + "multiaddr", "parking_lot 0.11.1", "pin-project 1.0.5", "smallvec", @@ -1798,9 +1797,8 @@ dependencies = [ [[package]] name = "libp2p-core" -version = "0.28.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "554d3e7e9e65f939d66b75fd6a4c67f258fe250da61b91f46c545fc4a89b51d9" +version = "0.29.0" +source = "git+https://github.com/comit-network/rust-libp2p?branch=rendezvous#c3cd411cedb12eab72ba26b08c43c2467a8fd8e9" dependencies = [ "asn1_der", "bs58", @@ -1812,9 +1810,9 @@ dependencies = [ "lazy_static", "libsecp256k1", "log 0.4.14", + "multiaddr", "multihash", "multistream-select", - "parity-multiaddr", "parking_lot 0.11.1", "pin-project 1.0.5", "prost", @@ -1832,9 +1830,8 @@ dependencies = [ [[package]] name = "libp2p-dns" -version = "0.28.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62e63dab8b5ff35e0c101a3e51e843ba782c07bbb1682f5fd827622e0d02b98b" +version = "0.29.0" +source = "git+https://github.com/comit-network/rust-libp2p?branch=rendezvous#c3cd411cedb12eab72ba26b08c43c2467a8fd8e9" dependencies = [ "futures", "libp2p-core", @@ -1845,9 +1842,8 @@ dependencies = [ [[package]] name = "libp2p-mplex" -version = "0.28.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85e9b544335d1ed30af71daa96edbefadef6f19c7a55f078b9fc92c87163105d" +version = "0.29.0" +source = "git+https://github.com/comit-network/rust-libp2p?branch=rendezvous#c3cd411cedb12eab72ba26b08c43c2467a8fd8e9" dependencies = [ "asynchronous-codec", "bytes 1.0.1", @@ -1863,9 +1859,8 @@ dependencies = [ [[package]] name = "libp2p-noise" -version = "0.31.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57a2aa6fc4e6855eaf9ea1941a14f7ec4df35636fb6b85951e17481df8dcecf6" +version = "0.32.0" +source = "git+https://github.com/comit-network/rust-libp2p?branch=rendezvous#c3cd411cedb12eab72ba26b08c43c2467a8fd8e9" dependencies = [ "bytes 1.0.1", "curve25519-dalek", @@ -1885,9 +1880,8 @@ dependencies = [ [[package]] name = "libp2p-ping" -version = "0.29.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf4bfaffac63bf3c7ec11ed9d8879d455966ddea7e78ee14737f0b6dce0d1cd1" +version = "0.30.0" +source = "git+https://github.com/comit-network/rust-libp2p?branch=rendezvous#c3cd411cedb12eab72ba26b08c43c2467a8fd8e9" dependencies = [ "futures", "libp2p-core", @@ -1900,9 +1894,8 @@ dependencies = [ [[package]] name = "libp2p-request-response" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cdbe172f08e6d0f95fa8634e273d4c4268c4063de2e33e7435194b0130c62e3" +version = "0.12.0" +source = "git+https://github.com/comit-network/rust-libp2p?branch=rendezvous#c3cd411cedb12eab72ba26b08c43c2467a8fd8e9" dependencies = [ "async-trait", "bytes 1.0.1", @@ -1920,9 +1913,8 @@ dependencies = [ [[package]] name = "libp2p-swarm" -version = "0.29.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e04d8e1eef675029ec728ba14e8d0da7975d84b6679b699b4ae91a1de9c3a92" +version = "0.30.0" +source = "git+https://github.com/comit-network/rust-libp2p?branch=rendezvous#c3cd411cedb12eab72ba26b08c43c2467a8fd8e9" dependencies = [ "either", "futures", @@ -1937,8 +1929,7 @@ dependencies = [ [[package]] name = "libp2p-swarm-derive" version = "0.23.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "365b0a699fea5168676840567582a012ea297b1ca02eee467e58301b9c9c5eed" +source = "git+https://github.com/comit-network/rust-libp2p?branch=rendezvous#c3cd411cedb12eab72ba26b08c43c2467a8fd8e9" dependencies = [ "quote 1.0.9", "syn 1.0.64", @@ -1946,9 +1937,8 @@ dependencies = [ [[package]] name = "libp2p-tcp" -version = "0.28.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b1a27d21c477951799e99d5c105d78868258502ce092988040a808d5a19bbd9" +version = "0.29.0" +source = "git+https://github.com/comit-network/rust-libp2p?branch=rendezvous#c3cd411cedb12eab72ba26b08c43c2467a8fd8e9" dependencies = [ "futures", "futures-timer", @@ -1963,9 +1953,8 @@ dependencies = [ [[package]] name = "libp2p-websocket" -version = "0.29.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cace60995ef6f637e4752cccbb2590f6bc358e8741a0d066307636c69a4b3a74" +version = "0.30.0" +source = "git+https://github.com/comit-network/rust-libp2p?branch=rendezvous#c3cd411cedb12eab72ba26b08c43c2467a8fd8e9" dependencies = [ "either", "futures", @@ -1981,9 +1970,8 @@ dependencies = [ [[package]] name = "libp2p-yamux" -version = "0.32.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f35da42cfc6d5cb0dcf3ad6881bc68d146cdf38f98655e09e33fbba4d13eabc4" +version = "0.33.0" +source = "git+https://github.com/comit-network/rust-libp2p?branch=rendezvous#c3cd411cedb12eab72ba26b08c43c2467a8fd8e9" dependencies = [ "futures", "libp2p-core", @@ -1994,20 +1982,52 @@ dependencies = [ [[package]] name = "libsecp256k1" -version = "0.3.5" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fc1e2c808481a63dc6da2074752fdd4336a3c8fcc68b83db6f1fd5224ae7962" +checksum = "bd1137239ab33b41aa9637a88a28249e5e70c40a42ccc92db7f12cc356c1fcd7" dependencies = [ "arrayref", - "crunchy", - "digest 0.8.1", + "base64 0.12.3", + "digest 0.9.0", "hmac-drbg", + "libsecp256k1-core", + "libsecp256k1-gen-ecmult", + "libsecp256k1-gen-genmult", "rand 0.7.3", - "sha2 0.8.2", - "subtle 2.4.0", + "serde", + "sha2 0.9.5", "typenum", ] +[[package]] +name = "libsecp256k1-core" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ee11012b293ea30093c129173cac4335513064094619f4639a25b310fd33c11" +dependencies = [ + "crunchy", + "digest 0.9.0", + "subtle 2.4.0", +] + +[[package]] +name = "libsecp256k1-gen-ecmult" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32239626ffbb6a095b83b37a02ceb3672b2443a87a000a884fc3c4d16925c9c0" +dependencies = [ + "libsecp256k1-core", +] + +[[package]] +name = "libsecp256k1-gen-genmult" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76acb433e21d10f5f9892b1962c2856c58c7f39a9e4bd68ac82b9436a0ffd5b9" +dependencies = [ + "libsecp256k1-core", +] + [[package]] name = "libz-sys" version = "1.1.2" @@ -2282,6 +2302,24 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "multiaddr" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7139982f583d7e53879d9f611fe48ced18e77d684309484f2252c76bcd39f549" +dependencies = [ + "arrayref", + "bs58", + "byteorder", + "data-encoding", + "multihash", + "percent-encoding 2.1.0", + "serde", + "static_assertions", + "unsigned-varint 0.7.0", + "url 2.2.2", +] + [[package]] name = "multihash" version = "0.13.2" @@ -2317,9 +2355,8 @@ checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" [[package]] name = "multistream-select" -version = "0.10.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d91ec0a2440aaff5f78ec35631a7027d50386c6163aa975f7caa0d5da4b6ff8" +version = "0.10.3" +source = "git+https://github.com/comit-network/rust-libp2p?branch=rendezvous#c3cd411cedb12eab72ba26b08c43c2467a8fd8e9" dependencies = [ "bytes 1.0.1", "futures", @@ -2472,24 +2509,6 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" -[[package]] -name = "parity-multiaddr" -version = "0.11.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58341485071825827b7f03cf7efd1cb21e6a709bea778fb50227fd45d2f361b4" -dependencies = [ - "arrayref", - "bs58", - "byteorder", - "data-encoding", - "multihash", - "percent-encoding 2.1.0", - "serde", - "static_assertions", - "unsigned-varint 0.7.0", - "url 2.2.2", -] - [[package]] name = "parking_lot" version = "0.10.2" diff --git a/swap/Cargo.toml b/swap/Cargo.toml index 46deee8a..fb3c26e6 100644 --- a/swap/Cargo.toml +++ b/swap/Cargo.toml @@ -29,7 +29,7 @@ ecdsa_fun = { git = "https://github.com/LLFourn/secp256kfun", default-features = ed25519-dalek = "1" futures = { version = "0.3", default-features = false } itertools = "0.10" -libp2p = { version = "0.38", default-features = false, features = [ "tcp-tokio", "yamux", "mplex", "dns-tokio", "noise", "request-response", "websocket", "ping" ] } +libp2p = { git = "https://github.com/comit-network/rust-libp2p", branch = "rendezvous", default-features = false, features = [ "tcp-tokio", "yamux", "mplex", "dns-tokio", "noise", "request-response", "websocket", "ping" ] } miniscript = { version = "5", features = [ "serde" ] } monero = { version = "0.12", features = [ "serde_support" ] } monero-rpc = { path = "../monero-rpc" } diff --git a/swap/src/asb/event_loop.rs b/swap/src/asb/event_loop.rs index e18d595e..0c4f26fc 100644 --- a/swap/src/asb/event_loop.rs +++ b/swap/src/asb/event_loop.rs @@ -149,9 +149,9 @@ where loop { tokio::select! { - swarm_event = self.swarm.next_event() => { + swarm_event = self.swarm.next() => { match swarm_event { - SwarmEvent::Behaviour(OutEvent::SwapSetupInitiated { mut send_wallet_snapshot }) => { + Some(SwarmEvent::Behaviour(OutEvent::SwapSetupInitiated { mut send_wallet_snapshot })) => { let (btc, responder) = match send_wallet_snapshot.recv().await { Ok((btc, responder)) => (btc, responder), @@ -172,13 +172,13 @@ where // Ignore result, we should never hit this because the receiver will alive as long as the connection is. let _ = responder.respond(wallet_snapshot); } - SwarmEvent::Behaviour(OutEvent::SwapSetupCompleted{peer_id, swap_id, state3}) => { + Some(SwarmEvent::Behaviour(OutEvent::SwapSetupCompleted{peer_id, swap_id, state3})) => { let _ = self.handle_execution_setup_done(peer_id, swap_id, *state3).await; } - SwarmEvent::Behaviour(OutEvent::SwapDeclined { peer, error }) => { + Some(SwarmEvent::Behaviour(OutEvent::SwapDeclined { peer, error })) => { tracing::warn!(%peer, "Ignoring spot price request because: {}", error); } - SwarmEvent::Behaviour(OutEvent::QuoteRequested { channel, peer }) => { + Some(SwarmEvent::Behaviour(OutEvent::QuoteRequested { channel, peer })) => { let quote = match self.make_quote(self.min_buy, self.max_buy).await { Ok(quote) => quote, Err(error) => { @@ -191,13 +191,13 @@ where tracing::debug!(%peer, "Failed to respond with quote"); } } - SwarmEvent::Behaviour(OutEvent::TransferProofAcknowledged { peer, id }) => { + Some(SwarmEvent::Behaviour(OutEvent::TransferProofAcknowledged { peer, id })) => { tracing::debug!(%peer, "Bob acknowledged transfer proof"); if let Some(responder) = self.inflight_transfer_proofs.remove(&id) { let _ = responder.respond(()); } } - SwarmEvent::Behaviour(OutEvent::EncryptedSignatureReceived{ msg, channel, peer }) => { + Some(SwarmEvent::Behaviour(OutEvent::EncryptedSignatureReceived{ msg, channel, peer })) => { let swap_id = msg.swap_id; let swap_peer = self.db.get_peer_id(swap_id); @@ -246,12 +246,12 @@ where channel }.boxed()); } - SwarmEvent::Behaviour(OutEvent::Failure {peer, error}) => { + Some(SwarmEvent::Behaviour(OutEvent::Failure {peer, error})) => { tracing::error!( %peer, "Communication error. Error {:#}", error); } - SwarmEvent::ConnectionEstablished { peer_id: peer, endpoint, .. } => { + Some(SwarmEvent::ConnectionEstablished { peer_id: peer, endpoint, .. }) => { tracing::debug!(%peer, address = %endpoint.get_remote_address(), "New connection established"); if let Some(transfer_proofs) = self.buffered_transfer_proofs.remove(&peer) { @@ -263,16 +263,16 @@ where } } } - SwarmEvent::IncomingConnectionError { send_back_addr: address, error, .. } => { + Some(SwarmEvent::IncomingConnectionError { send_back_addr: address, error, .. }) => { tracing::warn!(%address, "Failed to set up connection with peer. Error {:#}", error); } - SwarmEvent::ConnectionClosed { peer_id: peer, num_established, endpoint, cause: Some(error) } if num_established == 0 => { + Some(SwarmEvent::ConnectionClosed { peer_id: peer, num_established, endpoint, cause: Some(error) }) if num_established == 0 => { tracing::warn!(%peer, address = %endpoint.get_remote_address(), "Lost connection. Error {:#}", error); } - SwarmEvent::ConnectionClosed { peer_id: peer, num_established, endpoint, cause: None } if num_established == 0 => { + Some(SwarmEvent::ConnectionClosed { peer_id: peer, num_established, endpoint, cause: None }) if num_established == 0 => { tracing::info!(%peer, address = %endpoint.get_remote_address(), "Successfully closed connection"); } - SwarmEvent::NewListenAddr(address) => { + Some(SwarmEvent::NewListenAddr(address)) => { tracing::info!(%address, "New listen address detected"); } _ => {} diff --git a/swap/src/cli/event_loop.rs b/swap/src/cli/event_loop.rs index 9d55a81a..1d77b8ef 100644 --- a/swap/src/cli/event_loop.rs +++ b/swap/src/cli/event_loop.rs @@ -94,19 +94,19 @@ impl EventLoop { loop { // Note: We are making very elaborate use of `select!` macro's feature here. Make sure to read the documentation thoroughly: https://docs.rs/tokio/1.4.0/tokio/macro.select.html tokio::select! { - swarm_event = self.swarm.next_event().fuse() => { + swarm_event = self.swarm.next().fuse() => { match swarm_event { - SwarmEvent::Behaviour(OutEvent::QuoteReceived { id, response }) => { + Some(SwarmEvent::Behaviour(OutEvent::QuoteReceived { id, response })) => { if let Some(responder) = self.inflight_quote_requests.remove(&id) { let _ = responder.respond(response); } } - SwarmEvent::Behaviour(OutEvent::SwapSetupCompleted(response)) => { + Some(SwarmEvent::Behaviour(OutEvent::SwapSetupCompleted(response))) => { if let Some(responder) = self.inflight_swap_setup.take() { let _ = responder.respond(*response); } } - SwarmEvent::Behaviour(OutEvent::TransferProofReceived { msg, channel, peer }) => { + Some(SwarmEvent::Behaviour(OutEvent::TransferProofReceived { msg, channel, peer })) => { let swap_id = msg.swap_id; if peer != self.alice_peer_id { @@ -142,34 +142,34 @@ impl EventLoop { channel }.boxed())); } - SwarmEvent::Behaviour(OutEvent::EncryptedSignatureAcknowledged { id }) => { + Some(SwarmEvent::Behaviour(OutEvent::EncryptedSignatureAcknowledged { id })) => { if let Some(responder) = self.inflight_encrypted_signature_requests.remove(&id) { let _ = responder.respond(()); } } - SwarmEvent::Behaviour(OutEvent::AllRedialAttemptsExhausted { peer }) if peer == self.alice_peer_id => { + Some(SwarmEvent::Behaviour(OutEvent::AllRedialAttemptsExhausted { peer })) if peer == self.alice_peer_id => { tracing::error!("Exhausted all re-dial attempts to Alice"); return; } - SwarmEvent::Behaviour(OutEvent::Failure { peer, error }) => { + Some(SwarmEvent::Behaviour(OutEvent::Failure { peer, error })) => { tracing::warn!(%peer, "Communication error: {:#}", error); return; } - SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } if peer_id == self.alice_peer_id => { + Some(SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. }) if peer_id == self.alice_peer_id => { tracing::info!("Connected to Alice at {}", endpoint.get_remote_address()); } - SwarmEvent::Dialing(peer_id) if peer_id == self.alice_peer_id => { + Some(SwarmEvent::Dialing(peer_id)) if peer_id == self.alice_peer_id => { tracing::debug!("Dialling Alice at {}", peer_id); } - SwarmEvent::ConnectionClosed { peer_id, endpoint, num_established, cause: Some(error) } if peer_id == self.alice_peer_id && num_established == 0 => { + Some(SwarmEvent::ConnectionClosed { peer_id, endpoint, num_established, cause: Some(error) }) if peer_id == self.alice_peer_id && num_established == 0 => { tracing::warn!("Lost connection to Alice at {}, cause: {}", endpoint.get_remote_address(), error); } - SwarmEvent::ConnectionClosed { peer_id, num_established, cause: None, .. } if peer_id == self.alice_peer_id && num_established == 0 => { + Some(SwarmEvent::ConnectionClosed { peer_id, num_established, cause: None, .. }) if peer_id == self.alice_peer_id && num_established == 0 => { // no error means the disconnection was requested tracing::info!("Successfully closed connection to Alice"); return; } - SwarmEvent::UnreachableAddr { peer_id, address, attempts_remaining, error } if peer_id == self.alice_peer_id && attempts_remaining == 0 => { + Some(SwarmEvent::UnreachableAddr { peer_id, address, attempts_remaining, error }) if peer_id == self.alice_peer_id && attempts_remaining == 0 => { tracing::warn!(%address, "Failed to dial Alice: {}", error); if let Some(duration) = self.swarm.behaviour_mut().redial.until_next_redial() { diff --git a/swap/src/network/test.rs b/swap/src/network/test.rs index ea4b1a8d..8c2eb037 100644 --- a/swap/src/network/test.rs +++ b/swap/src/network/test.rs @@ -1,19 +1,18 @@ -use futures::future; +use async_trait::async_trait; +use futures::stream::FusedStream; +use futures::{future, Future, Stream, StreamExt}; use libp2p::core::muxing::StreamMuxerBox; -use libp2p::core::transport::memory::MemoryTransport; -use libp2p::core::upgrade::{SelectUpgrade, Version}; -use libp2p::core::{Executor, Multiaddr}; +use libp2p::core::transport::upgrade::Version; +use libp2p::core::transport::MemoryTransport; +use libp2p::core::upgrade::SelectUpgrade; +use libp2p::core::{identity, Executor, Multiaddr, PeerId, Transport}; use libp2p::mplex::MplexConfig; -use libp2p::noise::{self, NoiseConfig, X25519Spec}; -use libp2p::swarm::{ - IntoProtocolsHandler, NetworkBehaviour, ProtocolsHandler, SwarmBuilder, SwarmEvent, -}; -use libp2p::{identity, yamux, PeerId, Swarm, Transport}; +use libp2p::noise::{Keypair, NoiseConfig, X25519Spec}; +use libp2p::swarm::{AddressScore, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent}; +use libp2p::yamux::YamuxConfig; use std::fmt::Debug; -use std::future::Future; use std::pin::Pin; use std::time::Duration; -use tokio::time; /// An adaptor struct for libp2p that spawns futures into the current /// thread-local runtime. @@ -25,49 +24,18 @@ impl Executor for GlobalSpawnTokioExecutor { } } -#[allow(missing_debug_implementations)] -pub struct Actor { - pub swarm: Swarm, - pub addr: Multiaddr, - pub peer_id: PeerId, -} - -pub async fn new_connected_swarm_pair(behaviour_fn: F) -> (Actor, Actor) +pub fn new_swarm(behaviour_fn: F) -> Swarm where B: NetworkBehaviour, - F: Fn(PeerId, identity::Keypair) -> B + Clone, - <<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent: Clone, -::OutEvent: Debug{ - let (swarm, addr, peer_id) = new_swarm(behaviour_fn.clone()); - let mut alice = Actor { - swarm, - addr, - peer_id, - }; - - let (swarm, addr, peer_id) = new_swarm(behaviour_fn); - let mut bob = Actor { - swarm, - addr, - peer_id, - }; - - connect(&mut alice.swarm, &mut bob.swarm).await; - - (alice, bob) -} - -pub fn new_swarm B>( - behaviour_fn: F, -) -> (Swarm, Multiaddr, PeerId) -where + ::OutEvent: Debug, B: NetworkBehaviour, + F: FnOnce(PeerId, identity::Keypair) -> B, { - let id_keys = identity::Keypair::generate_ed25519(); - let peer_id = PeerId::from(id_keys.public()); + let identity = identity::Keypair::generate_ed25519(); + let peer_id = PeerId::from(identity.public()); - let dh_keys = noise::Keypair::::new() - .into_authentic(&id_keys) + let dh_keys = Keypair::::new() + .into_authentic(&identity) .expect("failed to create dh_keys"); let noise = NoiseConfig::xx(dh_keys).into_authenticated(); @@ -75,88 +43,146 @@ where .upgrade(Version::V1) .authenticate(noise) .multiplex(SelectUpgrade::new( - yamux::YamuxConfig::default(), + YamuxConfig::default(), MplexConfig::new(), )) .timeout(Duration::from_secs(5)) .map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer))) .boxed(); - let mut swarm: Swarm = SwarmBuilder::new(transport, behaviour_fn(peer_id, id_keys), peer_id) + SwarmBuilder::new(transport, behaviour_fn(peer_id, identity), peer_id) .executor(Box::new(GlobalSpawnTokioExecutor)) - .build(); + .build() +} +fn get_rand_memory_address() -> Multiaddr { let address_port = rand::random::(); let addr = format!("/memory/{}", address_port) .parse::() .unwrap(); - Swarm::listen_on(&mut swarm, addr.clone()).unwrap(); - - (swarm, addr, peer_id) + addr } -pub async fn await_events_or_timeout( - alice_event: impl Future, - bob_event: impl Future, -) -> (A, B) { - time::timeout( - Duration::from_secs(10), - future::join(alice_event, bob_event), +pub async fn await_events_or_timeout( + swarm_1: &mut (impl Stream> + FusedStream + Unpin), + swarm_2: &mut (impl Stream> + FusedStream + Unpin), +) -> (SwarmEvent, SwarmEvent) +where + SwarmEvent: Debug, + SwarmEvent: Debug, +{ + tokio::time::timeout( + Duration::from_secs(30), + future::join( + swarm_1 + .inspect(|event| tracing::debug!("Swarm1 emitted {:?}", event)) + .select_next_some(), + swarm_2 + .inspect(|event| tracing::debug!("Swarm2 emitted {:?}", event)) + .select_next_some(), + ), ) .await .expect("network behaviours to emit an event within 10 seconds") } -/// Connects two swarms with each other. -/// -/// This assumes the transport that is in use can be used by Bob to connect to -/// the listen address that is emitted by Alice. In other words, they have to be -/// on the same network. The memory transport used by the above `new_swarm` -/// function fulfills this. -/// -/// We also assume that the swarms don't emit any behaviour events during the -/// connection phase. Any event emitted is considered a bug from this functions -/// PoV because they would be lost. -pub async fn connect(alice: &mut Swarm, bob: &mut Swarm) +/// An extension trait for [`Swarm`] that makes it easier to set up a network of +/// [`Swarm`]s for tests. +#[async_trait] +pub trait SwarmExt { + /// Establishes a connection to the given [`Swarm`], polling both of them + /// until the connection is established. + async fn block_on_connection(&mut self, other: &mut Swarm) + where + T: NetworkBehaviour, + ::OutEvent: Debug; + + /// Listens on a random memory address, polling the [`Swarm`] until the + /// transport is ready to accept connections. + async fn listen_on_random_memory_address(&mut self) -> Multiaddr; +} + +#[async_trait] +impl SwarmExt for Swarm where - BA: NetworkBehaviour, - BB: NetworkBehaviour, - ::OutEvent: Debug, - ::OutEvent: Debug, + B: NetworkBehaviour, + ::OutEvent: Debug, { - let mut alice_connected = false; - let mut bob_connected = false; - - while !alice_connected && !bob_connected { - let (alice_event, bob_event) = future::join(alice.next_event(), bob.next_event()).await; - - match alice_event { - SwarmEvent::ConnectionEstablished { .. } => { - alice_connected = true; + async fn block_on_connection(&mut self, other: &mut Swarm) + where + T: NetworkBehaviour, + ::OutEvent: Debug, + { + let addr_to_dial = other.external_addresses().next().unwrap().addr.clone(); + + self.dial_addr(addr_to_dial.clone()).unwrap(); + + let mut dialer_done = false; + let mut listener_done = false; + + loop { + let dialer_event_fut = self.select_next_some(); + + tokio::select! { + dialer_event = dialer_event_fut => { + match dialer_event { + SwarmEvent::ConnectionEstablished { .. } => { + dialer_done = true; + } + SwarmEvent::UnknownPeerUnreachableAddr { address, error } if address == addr_to_dial => { + panic!("Failed to dial address {}: {}", addr_to_dial, error) + } + other => { + tracing::debug!("Ignoring {:?}", other); + } + } + }, + listener_event = other.select_next_some() => { + match listener_event { + SwarmEvent::ConnectionEstablished { .. } => { + listener_done = true; + } + SwarmEvent::IncomingConnectionError { error, .. } => { + panic!("Failure in incoming connection {}", error); + } + other => { + tracing::debug!("Ignoring {:?}", other); + } + } + } } - SwarmEvent::NewListenAddr(addr) => { - bob.dial_addr(addr).unwrap(); - } - SwarmEvent::Behaviour(event) => { - panic!( - "alice unexpectedly emitted a behaviour event during connection: {:?}", - event - ); + + if dialer_done && listener_done { + return; } - _ => {} } - match bob_event { - SwarmEvent::ConnectionEstablished { .. } => { - bob_connected = true; - } - SwarmEvent::Behaviour(event) => { - panic!( - "bob unexpectedly emitted a behaviour event during connection: {:?}", - event - ); + } + + async fn listen_on_random_memory_address(&mut self) -> Multiaddr { + let multiaddr = get_rand_memory_address(); + + self.listen_on(multiaddr.clone()).unwrap(); + + // block until we are actually listening + loop { + match self.select_next_some().await { + SwarmEvent::NewListenAddr(addr) if addr == multiaddr => { + break; + } + other => { + tracing::debug!( + "Ignoring {:?} while waiting for listening to succeed", + other + ); + } } - _ => {} } + + // Memory addresses are externally reachable because they all share the same + // memory-space. + self.add_external_address(multiaddr.clone(), AddressScore::Infinite); + + multiaddr } }