use futures::future; use libp2p::core::muxing::StreamMuxerBox; use libp2p::core::transport::memory::MemoryTransport; use libp2p::core::upgrade::{SelectUpgrade, Version}; use libp2p::core::{Executor, Multiaddr}; use libp2p::mplex::MplexConfig; use libp2p::noise::{self, NoiseConfig, X25519Spec}; use libp2p::swarm::{ IntoProtocolsHandler, NetworkBehaviour, ProtocolsHandler, SwarmBuilder, SwarmEvent, }; use libp2p::{identity, yamux, PeerId, Swarm, Transport}; use std::fmt::Debug; use std::future::Future; use std::pin::Pin; use std::time::Duration; use tokio::time; /// An adaptor struct for libp2p that spawns futures into the current /// thread-local runtime. struct GlobalSpawnTokioExecutor; impl Executor for GlobalSpawnTokioExecutor { fn exec(&self, future: Pin + Send>>) { let _ = tokio::spawn(future); } } #[allow(missing_debug_implementations)] pub struct Actor { pub swarm: Swarm, pub addr: Multiaddr, pub peer_id: PeerId, } pub async fn new_connected_swarm_pair(behaviour_fn: F) -> (Actor, Actor) where B: NetworkBehaviour, F: Fn(PeerId, identity::Keypair) -> B + Clone, <<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent: Clone, ::OutEvent: Debug{ let (swarm, addr, peer_id) = new_swarm(behaviour_fn.clone()); let mut alice = Actor { swarm, addr, peer_id, }; let (swarm, addr, peer_id) = new_swarm(behaviour_fn); let mut bob = Actor { swarm, addr, peer_id, }; connect(&mut alice.swarm, &mut bob.swarm).await; (alice, bob) } pub fn new_swarm B>( behaviour_fn: F, ) -> (Swarm, Multiaddr, PeerId) where B: NetworkBehaviour, { let id_keys = identity::Keypair::generate_ed25519(); let peer_id = PeerId::from(id_keys.public()); let dh_keys = noise::Keypair::::new() .into_authentic(&id_keys) .expect("failed to create dh_keys"); let noise = NoiseConfig::xx(dh_keys).into_authenticated(); let transport = MemoryTransport::default() .upgrade(Version::V1) .authenticate(noise) .multiplex(SelectUpgrade::new( yamux::YamuxConfig::default(), MplexConfig::new(), )) .timeout(Duration::from_secs(5)) .map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer))) .boxed(); let mut swarm: Swarm = SwarmBuilder::new(transport, behaviour_fn(peer_id, id_keys), peer_id) .executor(Box::new(GlobalSpawnTokioExecutor)) .build(); let address_port = rand::random::(); let addr = format!("/memory/{}", address_port) .parse::() .unwrap(); Swarm::listen_on(&mut swarm, addr.clone()).unwrap(); (swarm, addr, peer_id) } pub async fn await_events_or_timeout( alice_event: impl Future, bob_event: impl Future, ) -> (A, B) { time::timeout( Duration::from_secs(10), future::join(alice_event, bob_event), ) .await .expect("network behaviours to emit an event within 10 seconds") } /// Connects two swarms with each other. /// /// This assumes the transport that is in use can be used by Bob to connect to /// the listen address that is emitted by Alice. In other words, they have to be /// on the same network. The memory transport used by the above `new_swarm` /// function fulfills this. /// /// We also assume that the swarms don't emit any behaviour events during the /// connection phase. Any event emitted is considered a bug from this functions /// PoV because they would be lost. pub async fn connect(alice: &mut Swarm, bob: &mut Swarm) where BA: NetworkBehaviour, BB: NetworkBehaviour, ::OutEvent: Debug, ::OutEvent: Debug, { let mut alice_connected = false; let mut bob_connected = false; while !alice_connected && !bob_connected { let (alice_event, bob_event) = future::join(alice.next_event(), bob.next_event()).await; match alice_event { SwarmEvent::ConnectionEstablished { .. } => { alice_connected = true; } SwarmEvent::NewListenAddr(addr) => { bob.dial_addr(addr).unwrap(); } SwarmEvent::Behaviour(event) => { panic!( "alice unexpectedly emitted a behaviour event during connection: {:?}", event ); } _ => {} } match bob_event { SwarmEvent::ConnectionEstablished { .. } => { bob_connected = true; } SwarmEvent::Behaviour(event) => { panic!( "bob unexpectedly emitted a behaviour event during connection: {:?}", event ); } _ => {} } } }