git-subtree-dir: libp2p-async-await git-subtree-mainline:discover-makers-cliada5acb2b5
git-subtree-split:50e781b12b
commit
fc54351055
@ -0,0 +1,2 @@
|
||||
/target
|
||||
Cargo.lock
|
@ -0,0 +1,20 @@
|
||||
[package]
|
||||
name = "libp2p-async-await"
|
||||
version = "0.1.0"
|
||||
authors = ["Thomas Eizinger <thomas@eizinger.io>"]
|
||||
edition = "2018"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
libp2p = { version = "0.37", default-features = false }
|
||||
log = "0.4"
|
||||
|
||||
[dev-dependencies]
|
||||
anyhow = "1"
|
||||
serde_cbor = "0.11"
|
||||
tokio = { version = "1", features = ["macros", "rt", "time", "rt-multi-thread"] }
|
||||
libp2p = { version = "0.37", default-features = false, features = ["noise", "yamux"] }
|
||||
rand = "0.8"
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
env_logger = "0.8"
|
@ -0,0 +1,472 @@
|
||||
use libp2p::core::connection::ConnectionId;
|
||||
use libp2p::core::{upgrade, ConnectedPoint, Multiaddr, UpgradeInfo};
|
||||
use libp2p::futures::future::BoxFuture;
|
||||
use libp2p::futures::task::{Context, Poll};
|
||||
use libp2p::futures::FutureExt;
|
||||
use libp2p::swarm::protocols_handler::OutboundUpgradeSend;
|
||||
use libp2p::swarm::{
|
||||
KeepAlive, NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler,
|
||||
PollParameters, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr,
|
||||
SubstreamProtocol,
|
||||
};
|
||||
use libp2p::{InboundUpgrade, OutboundUpgrade, PeerId};
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::convert::Infallible;
|
||||
use std::future::{Future, Ready};
|
||||
use std::{io, iter, mem};
|
||||
|
||||
type Protocol<T, E> = BoxFuture<'static, Result<T, E>>;
|
||||
type InboundProtocolFn<I, E> = Box<dyn FnOnce(InboundSubstream) -> Protocol<I, E> + Send + 'static>;
|
||||
type OutboundProtocolFn<O, E> =
|
||||
Box<dyn FnOnce(OutboundSubstream) -> Protocol<O, E> + Send + 'static>;
|
||||
|
||||
enum InboundProtocolState<T, E> {
|
||||
GotFunctionNeedSubstream(InboundProtocolFn<T, E>),
|
||||
GotSubstreamNeedFunction(InboundSubstream),
|
||||
Executing(Protocol<T, E>),
|
||||
}
|
||||
|
||||
enum OutboundProtocolState<T, E> {
|
||||
GotFunctionNeedSubstream(OutboundProtocolFn<T, E>),
|
||||
GotFunctionRequestedSubstream(OutboundProtocolFn<T, E>),
|
||||
Executing(Protocol<T, E>),
|
||||
}
|
||||
|
||||
enum ProtocolState<I, O, E> {
|
||||
None,
|
||||
Inbound(InboundProtocolState<I, E>),
|
||||
Outbound(OutboundProtocolState<O, E>),
|
||||
Done,
|
||||
Poisoned,
|
||||
}
|
||||
|
||||
pub struct Handler<TInboundOut, TOutboundOut, TErr> {
|
||||
state: ProtocolState<TInboundOut, TOutboundOut, TErr>,
|
||||
info: &'static [u8],
|
||||
}
|
||||
|
||||
impl<TInboundOut, TOutboundOut, TErr> Handler<TInboundOut, TOutboundOut, TErr> {
|
||||
pub fn new(info: &'static [u8]) -> Self {
|
||||
Self {
|
||||
state: ProtocolState::None,
|
||||
info,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ProtocolInfo {
|
||||
info: &'static [u8],
|
||||
}
|
||||
|
||||
impl ProtocolInfo {
|
||||
fn new(info: &'static [u8]) -> Self {
|
||||
Self { info }
|
||||
}
|
||||
}
|
||||
|
||||
impl UpgradeInfo for ProtocolInfo {
|
||||
type Info = &'static [u8];
|
||||
type InfoIter = iter::Once<&'static [u8]>;
|
||||
|
||||
fn protocol_info(&self) -> Self::InfoIter {
|
||||
iter::once(self.info)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct InboundSubstream(NegotiatedSubstream);
|
||||
|
||||
pub struct OutboundSubstream(NegotiatedSubstream);
|
||||
|
||||
macro_rules! impl_read_write {
|
||||
($t:ty) => {
|
||||
impl $t {
|
||||
pub async fn write_message(&mut self, msg: &[u8]) -> Result<(), io::Error> {
|
||||
upgrade::write_with_len_prefix(&mut self.0, msg).await
|
||||
}
|
||||
|
||||
pub async fn read_message(
|
||||
&mut self,
|
||||
max_size: usize,
|
||||
) -> Result<Vec<u8>, upgrade::ReadOneError> {
|
||||
upgrade::read_one(&mut self.0, max_size).await
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
impl_read_write!(InboundSubstream);
|
||||
impl_read_write!(OutboundSubstream);
|
||||
|
||||
impl InboundUpgrade<NegotiatedSubstream> for ProtocolInfo {
|
||||
type Output = InboundSubstream;
|
||||
type Error = Infallible;
|
||||
type Future = Ready<Result<Self::Output, Self::Error>>;
|
||||
|
||||
fn upgrade_inbound(self, socket: NegotiatedSubstream, _: Self::Info) -> Self::Future {
|
||||
std::future::ready(Ok(InboundSubstream(socket)))
|
||||
}
|
||||
}
|
||||
|
||||
impl OutboundUpgrade<NegotiatedSubstream> for ProtocolInfo {
|
||||
type Output = OutboundSubstream;
|
||||
type Error = Infallible;
|
||||
type Future = Ready<Result<Self::Output, Self::Error>>;
|
||||
|
||||
fn upgrade_outbound(self, socket: NegotiatedSubstream, _: Self::Info) -> Self::Future {
|
||||
std::future::ready(Ok(OutboundSubstream(socket)))
|
||||
}
|
||||
}
|
||||
|
||||
pub enum ProtocolInEvent<I, O, E> {
|
||||
ExecuteInbound(InboundProtocolFn<I, E>),
|
||||
ExecuteOutbound(OutboundProtocolFn<O, E>),
|
||||
}
|
||||
|
||||
pub enum ProtocolOutEvent<I, O, E> {
|
||||
Inbound(Result<I, E>),
|
||||
Outbound(Result<O, E>),
|
||||
}
|
||||
|
||||
impl<TInboundOut, TOutboundOut, TErr> ProtocolsHandler for Handler<TInboundOut, TOutboundOut, TErr>
|
||||
where
|
||||
TInboundOut: Send + 'static,
|
||||
TOutboundOut: Send + 'static,
|
||||
TErr: Send + 'static,
|
||||
{
|
||||
type InEvent = ProtocolInEvent<TInboundOut, TOutboundOut, TErr>;
|
||||
type OutEvent = ProtocolOutEvent<TInboundOut, TOutboundOut, TErr>;
|
||||
type Error = Infallible;
|
||||
type InboundProtocol = ProtocolInfo;
|
||||
type OutboundProtocol = ProtocolInfo;
|
||||
type InboundOpenInfo = ();
|
||||
type OutboundOpenInfo = ();
|
||||
|
||||
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
|
||||
SubstreamProtocol::new(ProtocolInfo::new(self.info), ())
|
||||
}
|
||||
|
||||
fn inject_fully_negotiated_inbound(
|
||||
&mut self,
|
||||
substream: InboundSubstream,
|
||||
_: Self::InboundOpenInfo,
|
||||
) {
|
||||
match mem::replace(&mut self.state, ProtocolState::Poisoned) {
|
||||
ProtocolState::None => {
|
||||
self.state = ProtocolState::Inbound(
|
||||
InboundProtocolState::GotSubstreamNeedFunction(substream),
|
||||
);
|
||||
}
|
||||
ProtocolState::Inbound(InboundProtocolState::GotFunctionNeedSubstream(protocol_fn)) => {
|
||||
self.state =
|
||||
ProtocolState::Inbound(InboundProtocolState::Executing(protocol_fn(substream)));
|
||||
}
|
||||
ProtocolState::Inbound(_) | ProtocolState::Done => {
|
||||
panic!("Illegal state, substream is already present.");
|
||||
}
|
||||
ProtocolState::Outbound(_) => {
|
||||
panic!("Failed to process inbound substream in outbound protocol.");
|
||||
}
|
||||
ProtocolState::Poisoned => {
|
||||
panic!("Illegal state, currently in transient state poisoned.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_fully_negotiated_outbound(
|
||||
&mut self,
|
||||
substream: OutboundSubstream,
|
||||
_: Self::OutboundOpenInfo,
|
||||
) {
|
||||
match mem::replace(&mut self.state, ProtocolState::Poisoned) {
|
||||
ProtocolState::Outbound(OutboundProtocolState::GotFunctionRequestedSubstream(
|
||||
protocol_fn,
|
||||
)) => {
|
||||
self.state = ProtocolState::Outbound(OutboundProtocolState::Executing(
|
||||
protocol_fn(substream),
|
||||
));
|
||||
}
|
||||
ProtocolState::None
|
||||
| ProtocolState::Outbound(OutboundProtocolState::GotFunctionNeedSubstream(_)) => {
|
||||
panic!("Illegal state, receiving substream means it was requested.");
|
||||
}
|
||||
ProtocolState::Outbound(_) | ProtocolState::Done => {
|
||||
panic!("Illegal state, substream is already present.");
|
||||
}
|
||||
ProtocolState::Inbound(_) => {
|
||||
panic!("Failed to process outbound substream in inbound protocol.");
|
||||
}
|
||||
ProtocolState::Poisoned => {
|
||||
panic!("Illegal state, currently in transient state poisoned.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_event(&mut self, event: Self::InEvent) {
|
||||
match event {
|
||||
ProtocolInEvent::ExecuteInbound(protocol_fn) => {
|
||||
match mem::replace(&mut self.state, ProtocolState::Poisoned) {
|
||||
ProtocolState::None => {
|
||||
self.state = ProtocolState::Inbound(
|
||||
InboundProtocolState::GotFunctionNeedSubstream(protocol_fn),
|
||||
);
|
||||
}
|
||||
ProtocolState::Inbound(InboundProtocolState::GotSubstreamNeedFunction(
|
||||
substream,
|
||||
)) => {
|
||||
self.state = ProtocolState::Inbound(InboundProtocolState::Executing(
|
||||
protocol_fn(substream),
|
||||
));
|
||||
}
|
||||
ProtocolState::Inbound(_) | ProtocolState::Done => {
|
||||
panic!("Illegal state, protocol fn is already present.");
|
||||
}
|
||||
ProtocolState::Outbound(_) => {
|
||||
panic!("Failed to process inbound protocol fn in outbound protocol.");
|
||||
}
|
||||
ProtocolState::Poisoned => {
|
||||
panic!("Illegal state, currently in transient state poisoned.");
|
||||
}
|
||||
}
|
||||
}
|
||||
ProtocolInEvent::ExecuteOutbound(protocol_fn) => {
|
||||
match mem::replace(&mut self.state, ProtocolState::Poisoned) {
|
||||
ProtocolState::None => {
|
||||
self.state = ProtocolState::Outbound(
|
||||
OutboundProtocolState::GotFunctionNeedSubstream(protocol_fn),
|
||||
);
|
||||
}
|
||||
ProtocolState::Outbound(_) | ProtocolState::Done => {
|
||||
panic!("Illegal state, protocol fn is already present.");
|
||||
}
|
||||
ProtocolState::Inbound(_) => {
|
||||
panic!("Failed to process outbound protocol fn in inbound protocol.");
|
||||
}
|
||||
ProtocolState::Poisoned => {
|
||||
panic!("Illegal state, currently in transient state poisoned.");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_dial_upgrade_error(
|
||||
&mut self,
|
||||
_: Self::OutboundOpenInfo,
|
||||
err: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>,
|
||||
) {
|
||||
log::error!("Failed to upgrade: {}", err);
|
||||
}
|
||||
|
||||
fn connection_keep_alive(&self) -> KeepAlive {
|
||||
KeepAlive::Yes
|
||||
}
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
fn poll(
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<
|
||||
ProtocolsHandlerEvent<
|
||||
Self::OutboundProtocol,
|
||||
Self::OutboundOpenInfo,
|
||||
Self::OutEvent,
|
||||
Self::Error,
|
||||
>,
|
||||
> {
|
||||
match mem::replace(&mut self.state, ProtocolState::Poisoned) {
|
||||
ProtocolState::Inbound(InboundProtocolState::Executing(mut protocol)) => match protocol
|
||||
.poll_unpin(cx)
|
||||
{
|
||||
Poll::Ready(res) => {
|
||||
self.state = ProtocolState::Done;
|
||||
Poll::Ready(ProtocolsHandlerEvent::Custom(ProtocolOutEvent::Inbound(
|
||||
res,
|
||||
)))
|
||||
}
|
||||
Poll::Pending => {
|
||||
self.state = ProtocolState::Inbound(InboundProtocolState::Executing(protocol));
|
||||
Poll::Pending
|
||||
}
|
||||
},
|
||||
ProtocolState::Outbound(OutboundProtocolState::Executing(mut protocol)) => {
|
||||
match protocol.poll_unpin(cx) {
|
||||
Poll::Ready(res) => {
|
||||
self.state = ProtocolState::Done;
|
||||
Poll::Ready(ProtocolsHandlerEvent::Custom(ProtocolOutEvent::Outbound(
|
||||
res,
|
||||
)))
|
||||
}
|
||||
Poll::Pending => {
|
||||
self.state =
|
||||
ProtocolState::Outbound(OutboundProtocolState::Executing(protocol));
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
}
|
||||
ProtocolState::Outbound(OutboundProtocolState::GotFunctionNeedSubstream(protocol)) => {
|
||||
self.state = ProtocolState::Outbound(
|
||||
OutboundProtocolState::GotFunctionRequestedSubstream(protocol),
|
||||
);
|
||||
Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
|
||||
protocol: SubstreamProtocol::new(ProtocolInfo::new(self.info), ()),
|
||||
})
|
||||
}
|
||||
ProtocolState::Poisoned => {
|
||||
unreachable!("Protocol is poisoned (transient state)")
|
||||
}
|
||||
other => {
|
||||
self.state = other;
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A behaviour that can execute await/.async protocols.
|
||||
///
|
||||
/// Note: It is not possible to execute the same protocol with the same peer several simultaneous times.
|
||||
pub struct Behaviour<I, O, E> {
|
||||
protocol_in_events: VecDeque<(PeerId, ProtocolInEvent<I, O, E>)>,
|
||||
protocol_out_events: VecDeque<(PeerId, ProtocolOutEvent<I, O, E>)>,
|
||||
|
||||
connected_peers: HashMap<PeerId, Vec<Multiaddr>>,
|
||||
|
||||
info: &'static [u8],
|
||||
}
|
||||
|
||||
impl<I, O, E> Behaviour<I, O, E> {
|
||||
/// Constructs a new [`Behaviour`] with the given protocol info.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```
|
||||
/// # use libp2p_async_await::Behaviour;
|
||||
///
|
||||
/// let _ = Behaviour::new(b"/foo/bar/1.0.0");
|
||||
/// ```
|
||||
pub fn new(info: &'static [u8]) -> Self {
|
||||
Self {
|
||||
protocol_in_events: VecDeque::default(),
|
||||
protocol_out_events: VecDeque::default(),
|
||||
connected_peers: HashMap::default(),
|
||||
info,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<I, O, E> Behaviour<I, O, E> {
|
||||
pub fn do_protocol_listener<F>(
|
||||
&mut self,
|
||||
peer: PeerId,
|
||||
protocol: impl FnOnce(InboundSubstream) -> F + Send + 'static,
|
||||
) where
|
||||
F: Future<Output = Result<I, E>> + Send + 'static,
|
||||
{
|
||||
self.protocol_in_events.push_back((
|
||||
peer,
|
||||
ProtocolInEvent::ExecuteInbound(Box::new(move |substream| protocol(substream).boxed())),
|
||||
));
|
||||
}
|
||||
|
||||
pub fn do_protocol_dialer<F>(
|
||||
&mut self,
|
||||
peer: PeerId,
|
||||
protocol: impl FnOnce(OutboundSubstream) -> F + Send + 'static,
|
||||
) where
|
||||
F: Future<Output = Result<O, E>> + Send + 'static,
|
||||
{
|
||||
self.protocol_in_events.push_back((
|
||||
peer,
|
||||
ProtocolInEvent::ExecuteOutbound(Box::new(move |substream| {
|
||||
protocol(substream).boxed()
|
||||
})),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub enum BehaviourOutEvent<I, O, E> {
|
||||
Inbound(PeerId, Result<I, E>),
|
||||
Outbound(PeerId, Result<O, E>),
|
||||
}
|
||||
|
||||
impl<I, O, E> NetworkBehaviour for Behaviour<I, O, E>
|
||||
where
|
||||
I: Send + 'static,
|
||||
O: Send + 'static,
|
||||
E: Send + 'static,
|
||||
{
|
||||
type ProtocolsHandler = Handler<I, O, E>;
|
||||
type OutEvent = BehaviourOutEvent<I, O, E>;
|
||||
|
||||
fn new_handler(&mut self) -> Self::ProtocolsHandler {
|
||||
Handler::new(self.info)
|
||||
}
|
||||
|
||||
fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
|
||||
self.connected_peers.get(peer).cloned().unwrap_or_default()
|
||||
}
|
||||
|
||||
fn inject_connected(&mut self, _: &PeerId) {}
|
||||
|
||||
fn inject_disconnected(&mut self, _: &PeerId) {}
|
||||
|
||||
fn inject_connection_established(
|
||||
&mut self,
|
||||
peer: &PeerId,
|
||||
_: &ConnectionId,
|
||||
point: &ConnectedPoint,
|
||||
) {
|
||||
let multiaddr = point.get_remote_address().clone();
|
||||
|
||||
self.connected_peers
|
||||
.entry(*peer)
|
||||
.or_default()
|
||||
.push(multiaddr);
|
||||
}
|
||||
|
||||
fn inject_connection_closed(
|
||||
&mut self,
|
||||
peer: &PeerId,
|
||||
_: &ConnectionId,
|
||||
point: &ConnectedPoint,
|
||||
) {
|
||||
let multiaddr = point.get_remote_address();
|
||||
|
||||
self.connected_peers
|
||||
.entry(*peer)
|
||||
.or_default()
|
||||
.retain(|addr| addr != multiaddr);
|
||||
}
|
||||
|
||||
fn inject_event(&mut self, peer: PeerId, _: ConnectionId, event: ProtocolOutEvent<I, O, E>) {
|
||||
self.protocol_out_events.push_back((peer, event));
|
||||
}
|
||||
|
||||
fn poll(
|
||||
&mut self,
|
||||
_: &mut Context<'_>,
|
||||
_: &mut impl PollParameters,
|
||||
) -> Poll<NetworkBehaviourAction<ProtocolInEvent<I, O, E>, Self::OutEvent>> {
|
||||
if let Some((peer, event)) = self.protocol_in_events.pop_front() {
|
||||
if !self.connected_peers.contains_key(&peer) {
|
||||
self.protocol_in_events.push_back((peer, event));
|
||||
} else {
|
||||
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
|
||||
peer_id: peer,
|
||||
handler: NotifyHandler::Any,
|
||||
event,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if let Some((peer, event)) = self.protocol_out_events.pop_front() {
|
||||
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(match event {
|
||||
ProtocolOutEvent::Inbound(res) => BehaviourOutEvent::Inbound(peer, res),
|
||||
ProtocolOutEvent::Outbound(res) => BehaviourOutEvent::Outbound(peer, res),
|
||||
}));
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
@ -0,0 +1,150 @@
|
||||
use libp2p::futures::future;
|
||||
use libp2p::futures::future::FutureExt;
|
||||
use libp2p::{
|
||||
core::{muxing::StreamMuxerBox, transport::memory::MemoryTransport, upgrade::Version},
|
||||
identity,
|
||||
noise::{self, NoiseConfig, X25519Spec},
|
||||
swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent},
|
||||
yamux::YamuxConfig,
|
||||
Multiaddr, PeerId, Swarm, Transport,
|
||||
};
|
||||
use std::{fmt::Debug, future::Future, time::Duration};
|
||||
use tokio::runtime::Handle;
|
||||
use tokio::time;
|
||||
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct Actor<B: NetworkBehaviour> {
|
||||
pub swarm: Swarm<B>,
|
||||
pub addr: Multiaddr,
|
||||
pub peer_id: PeerId,
|
||||
}
|
||||
|
||||
pub async fn new_connected_swarm_pair<B, F>(behaviour_fn: F, handle: Handle) -> (Actor<B>, Actor<B>)
|
||||
where
|
||||
B: NetworkBehaviour,
|
||||
F: Fn(PeerId, identity::Keypair) -> B + Clone,
|
||||
<B as NetworkBehaviour>::OutEvent: Debug,
|
||||
{
|
||||
let (swarm, addr, peer_id) = new_swarm(behaviour_fn.clone(), handle.clone());
|
||||
let mut alice = Actor {
|
||||
swarm,
|
||||
addr,
|
||||
peer_id,
|
||||
};
|
||||
|
||||
let (swarm, addr, peer_id) = new_swarm(behaviour_fn, handle);
|
||||
let mut bob = Actor {
|
||||
swarm,
|
||||
addr,
|
||||
peer_id,
|
||||
};
|
||||
|
||||
connect(&mut alice.swarm, &mut bob.swarm).await;
|
||||
|
||||
(alice, bob)
|
||||
}
|
||||
|
||||
pub fn new_swarm<B: NetworkBehaviour, F: Fn(PeerId, identity::Keypair) -> B>(
|
||||
behaviour_fn: F,
|
||||
handle: Handle,
|
||||
) -> (Swarm<B>, Multiaddr, PeerId) {
|
||||
let id_keys = identity::Keypair::generate_ed25519();
|
||||
let peer_id = PeerId::from(id_keys.public());
|
||||
|
||||
let dh_keys = noise::Keypair::<X25519Spec>::new()
|
||||
.into_authentic(&id_keys)
|
||||
.expect("failed to create dh_keys");
|
||||
let noise = NoiseConfig::xx(dh_keys).into_authenticated();
|
||||
|
||||
let transport = MemoryTransport::default()
|
||||
.upgrade(Version::V1)
|
||||
.authenticate(noise)
|
||||
.multiplex(YamuxConfig::default())
|
||||
.map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer)))
|
||||
.boxed();
|
||||
|
||||
let mut swarm: Swarm<B> = SwarmBuilder::new(transport, behaviour_fn(peer_id, id_keys), peer_id)
|
||||
.executor(Box::new(move |f| {
|
||||
handle.spawn(f);
|
||||
}))
|
||||
.build();
|
||||
|
||||
let address_port = rand::random::<u64>();
|
||||
let addr = format!("/memory/{}", address_port)
|
||||
.parse::<Multiaddr>()
|
||||
.unwrap();
|
||||
|
||||
Swarm::listen_on(&mut swarm, addr.clone()).unwrap();
|
||||
|
||||
(swarm, addr, peer_id)
|
||||
}
|
||||
|
||||
pub async fn await_events_or_timeout<A, B>(
|
||||
alice_event: impl Future<Output = A>,
|
||||
bob_event: impl Future<Output = B>,
|
||||
) -> (A, B) {
|
||||
time::timeout(
|
||||
Duration::from_secs(10),
|
||||
future::join(alice_event, bob_event),
|
||||
)
|
||||
.await
|
||||
.expect("network behaviours to emit an event within 10 seconds")
|
||||
}
|
||||
|
||||
/// Connects two swarms with each other.
|
||||
///
|
||||
/// This assumes the transport that is in use can be used by Alice to connect to
|
||||
/// the listen address that is emitted by Bob. In other words, they have to be
|
||||
/// on the same network. The memory transport used by the above `new_swarm`
|
||||
/// function fulfills this.
|
||||
///
|
||||
/// We also assume that the swarms don't emit any behaviour events during the
|
||||
/// connection phase. Any event emitted is considered a bug from this functions
|
||||
/// PoV because they would be lost.
|
||||
pub async fn connect<B>(alice: &mut Swarm<B>, bob: &mut Swarm<B>)
|
||||
where
|
||||
B: NetworkBehaviour,
|
||||
<B as NetworkBehaviour>::OutEvent: Debug,
|
||||
{
|
||||
let mut alice_connected = false;
|
||||
let mut bob_connected = false;
|
||||
|
||||
while !(alice_connected && bob_connected) {
|
||||
libp2p::futures::select! {
|
||||
alice_event = alice.next_event().fuse() => {
|
||||
match alice_event {
|
||||
SwarmEvent::ConnectionEstablished { .. } => {
|
||||
log::info!("alice connected");
|
||||
alice_connected = true;
|
||||
}
|
||||
SwarmEvent::Behaviour(event) => {
|
||||
panic!(
|
||||
"alice unexpectedly emitted a behaviour event during connection: {:?}",
|
||||
event
|
||||
);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
bob_event = bob.next_event().fuse() => {
|
||||
match bob_event {
|
||||
SwarmEvent::ConnectionEstablished { .. } => {
|
||||
log::info!("bob connected");
|
||||
bob_connected = true;
|
||||
}
|
||||
SwarmEvent::NewListenAddr(addr) => {
|
||||
Swarm::dial_addr(alice, addr).unwrap();
|
||||
}
|
||||
SwarmEvent::Behaviour(event) => {
|
||||
panic!(
|
||||
"bob unexpectedly emitted a behaviour event during connection: {:?}",
|
||||
event
|
||||
);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,140 @@
|
||||
#![allow(clippy::blacklisted_name)]
|
||||
|
||||
use anyhow::{Context, Error};
|
||||
use harness::await_events_or_timeout;
|
||||
use harness::new_connected_swarm_pair;
|
||||
use libp2p::swarm::SwarmEvent;
|
||||
use libp2p::PeerId;
|
||||
use libp2p_async_await::{Behaviour, BehaviourOutEvent};
|
||||
use tokio::runtime::Handle;
|
||||
|
||||
mod harness;
|
||||
|
||||
#[derive(serde::Serialize, serde::Deserialize, Debug)]
|
||||
struct Message0 {
|
||||
foo: u32,
|
||||
}
|
||||
#[derive(serde::Serialize, serde::Deserialize, Debug)]
|
||||
struct Message1 {
|
||||
bar: u32,
|
||||
}
|
||||
#[derive(serde::Serialize, serde::Deserialize, Debug)]
|
||||
struct Message2 {
|
||||
baz: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct AliceResult {
|
||||
bar: u32,
|
||||
}
|
||||
#[derive(Debug)]
|
||||
struct BobResult {
|
||||
foo: u32,
|
||||
baz: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum MyOutEvent {
|
||||
Alice(AliceResult),
|
||||
Bob(BobResult),
|
||||
Failed(anyhow::Error),
|
||||
}
|
||||
|
||||
impl From<BehaviourOutEvent<BobResult, AliceResult, anyhow::Error>> for MyOutEvent {
|
||||
fn from(event: BehaviourOutEvent<BobResult, AliceResult, Error>) -> Self {
|
||||
match event {
|
||||
BehaviourOutEvent::Inbound(_, Ok(bob)) => MyOutEvent::Bob(bob),
|
||||
BehaviourOutEvent::Outbound(_, Ok(alice)) => MyOutEvent::Alice(alice),
|
||||
BehaviourOutEvent::Inbound(_, Err(e)) | BehaviourOutEvent::Outbound(_, Err(e)) => {
|
||||
MyOutEvent::Failed(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(libp2p::NetworkBehaviour)]
|
||||
#[behaviour(out_event = "MyOutEvent", event_process = false)]
|
||||
struct MyBehaviour {
|
||||
inner: Behaviour<BobResult, AliceResult, anyhow::Error>,
|
||||
}
|
||||
|
||||
impl MyBehaviour {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
inner: Behaviour::new(b"/foo/bar/1.0.0"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl MyBehaviour {
|
||||
fn alice_do_protocol(&mut self, bob: PeerId, foo: u32, baz: u32) {
|
||||
self.inner
|
||||
.do_protocol_dialer(bob, move |mut substream| async move {
|
||||
substream
|
||||
.write_message(
|
||||
&serde_cbor::to_vec(&Message0 { foo })
|
||||
.context("failed to serialize Message0")?,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let bytes = substream.read_message(1024).await?;
|
||||
|
||||
let message1 = serde_cbor::from_slice::<Message1>(&bytes)?;
|
||||
|
||||
substream
|
||||
.write_message(
|
||||
&serde_cbor::to_vec(&Message2 { baz })
|
||||
.context("failed to serialize Message2")?,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(AliceResult { bar: message1.bar })
|
||||
})
|
||||
}
|
||||
|
||||
fn bob_do_protocol(&mut self, alice: PeerId, bar: u32) {
|
||||
self.inner
|
||||
.do_protocol_listener(alice, move |mut substream| async move {
|
||||
let bytes = substream.read_message(1024).await?;
|
||||
let message0 = serde_cbor::from_slice::<Message0>(&bytes)?;
|
||||
|
||||
substream
|
||||
.write_message(
|
||||
&serde_cbor::to_vec(&Message1 { bar })
|
||||
.context("failed to serialize Message1")?,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let bytes = substream.read_message(1024).await?;
|
||||
let message2 = serde_cbor::from_slice::<Message2>(&bytes)?;
|
||||
|
||||
Ok(BobResult {
|
||||
foo: message0.foo,
|
||||
baz: message2.baz,
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn it_works() {
|
||||
let _ = env_logger::try_init();
|
||||
|
||||
let (mut alice, mut bob) =
|
||||
new_connected_swarm_pair(|_, _| MyBehaviour::new(), Handle::current()).await;
|
||||
|
||||
alice.swarm.alice_do_protocol(bob.peer_id, 10, 42);
|
||||
bob.swarm.bob_do_protocol(alice.peer_id, 1337);
|
||||
|
||||
let (alice_event, bob_event) =
|
||||
await_events_or_timeout(alice.swarm.next_event(), bob.swarm.next_event()).await;
|
||||
|
||||
assert!(matches!(
|
||||
alice_event,
|
||||
SwarmEvent::Behaviour(MyOutEvent::Alice(AliceResult { bar: 1337 }))
|
||||
));
|
||||
assert!(matches!(
|
||||
bob_event,
|
||||
SwarmEvent::Behaviour(MyOutEvent::Bob(BobResult { foo: 10, baz: 42 }))
|
||||
));
|
||||
}
|
Loading…
Reference in new issue