Compare commits

...

19 Commits

Author SHA1 Message Date
Thomas Eizinger 281e83a00b
Bump to libp2p 0.38
3 years ago
Thomas Eizinger fc54351055 Add 'libp2p-async-await/' from commit '50e781b12bbeda7986c0cada090f171f41093144'
3 years ago
Thomas Eizinger 50e781b12b
Update to libp2p 0.37
3 years ago
Thomas Eizinger 7a9006cedd
Bump libp2p to version 0.36
3 years ago
Franck Royer 1429cd7802
Add note on simultanous execution of the protocol
3 years ago
Franck Royer a8fe340a02
Rename crate to libp2p-async-await
3 years ago
Franck Royer efa0257ea2
Move tests to library test folder
3 years ago
Franck Royer 8f3c22f1d5
Provide write/read message abstraction on subtreams
3 years ago
Franck Royer e0b49758f0
Log error on dial upgrade failure
3 years ago
Franck Royer c44aa3e035
Reduce number of variants in out events by using `Result`
3 years ago
Franck Royer 827fe5d39e
Handle clippy warnings
3 years ago
Franck Royer cdcfc575d0
Handle substream request as part of outbound protocol state
3 years ago
Franck Royer 20152a926d
Express how complementary the pending states are
3 years ago
Franck Royer c166dd5a63
Improve protocol state variant names
3 years ago
Franck Royer a8595e5a12
Move separate inbound/outbound options to one enum
3 years ago
Franck Royer 4b71739dc9
Move protocol states in an enum
3 years ago
Franck Royer 19f11697fb
Differentiate inbound and outbound negotiated subtreams
3 years ago
Thomas Eizinger d5d7389d25
Initial working version
3 years ago
Thomas Eizinger 6877ae4d6f
Initial draft
3 years ago

@ -0,0 +1,2 @@
/target
Cargo.lock

@ -0,0 +1,20 @@
[package]
name = "libp2p-async-await"
version = "0.1.0"
authors = ["Thomas Eizinger <thomas@eizinger.io>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
libp2p = { version = "0.38", default-features = false }
log = "0.4"
[dev-dependencies]
anyhow = "1"
serde_cbor = "0.11"
tokio = { version = "1", features = ["macros", "rt", "time", "rt-multi-thread"] }
libp2p = { version = "0.37", default-features = false, features = ["noise", "yamux"] }
rand = "0.8"
serde = { version = "1", features = ["derive"] }
env_logger = "0.8"

@ -0,0 +1,472 @@
use libp2p::core::connection::ConnectionId;
use libp2p::core::{upgrade, ConnectedPoint, Multiaddr, UpgradeInfo};
use libp2p::futures::future::BoxFuture;
use libp2p::futures::task::{Context, Poll};
use libp2p::futures::FutureExt;
use libp2p::swarm::protocols_handler::OutboundUpgradeSend;
use libp2p::swarm::{
KeepAlive, NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler,
PollParameters, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr,
SubstreamProtocol,
};
use libp2p::{InboundUpgrade, OutboundUpgrade, PeerId};
use std::collections::{HashMap, VecDeque};
use std::convert::Infallible;
use std::future::{Future, Ready};
use std::{io, iter, mem};
type Protocol<T, E> = BoxFuture<'static, Result<T, E>>;
type InboundProtocolFn<I, E> = Box<dyn FnOnce(InboundSubstream) -> Protocol<I, E> + Send + 'static>;
type OutboundProtocolFn<O, E> =
Box<dyn FnOnce(OutboundSubstream) -> Protocol<O, E> + Send + 'static>;
enum InboundProtocolState<T, E> {
GotFunctionNeedSubstream(InboundProtocolFn<T, E>),
GotSubstreamNeedFunction(InboundSubstream),
Executing(Protocol<T, E>),
}
enum OutboundProtocolState<T, E> {
GotFunctionNeedSubstream(OutboundProtocolFn<T, E>),
GotFunctionRequestedSubstream(OutboundProtocolFn<T, E>),
Executing(Protocol<T, E>),
}
enum ProtocolState<I, O, E> {
None,
Inbound(InboundProtocolState<I, E>),
Outbound(OutboundProtocolState<O, E>),
Done,
Poisoned,
}
pub struct Handler<TInboundOut, TOutboundOut, TErr> {
state: ProtocolState<TInboundOut, TOutboundOut, TErr>,
info: &'static [u8],
}
impl<TInboundOut, TOutboundOut, TErr> Handler<TInboundOut, TOutboundOut, TErr> {
pub fn new(info: &'static [u8]) -> Self {
Self {
state: ProtocolState::None,
info,
}
}
}
pub struct ProtocolInfo {
info: &'static [u8],
}
impl ProtocolInfo {
fn new(info: &'static [u8]) -> Self {
Self { info }
}
}
impl UpgradeInfo for ProtocolInfo {
type Info = &'static [u8];
type InfoIter = iter::Once<&'static [u8]>;
fn protocol_info(&self) -> Self::InfoIter {
iter::once(self.info)
}
}
pub struct InboundSubstream(NegotiatedSubstream);
pub struct OutboundSubstream(NegotiatedSubstream);
macro_rules! impl_read_write {
($t:ty) => {
impl $t {
pub async fn write_message(&mut self, msg: &[u8]) -> Result<(), io::Error> {
upgrade::write_with_len_prefix(&mut self.0, msg).await
}
pub async fn read_message(
&mut self,
max_size: usize,
) -> Result<Vec<u8>, upgrade::ReadOneError> {
upgrade::read_one(&mut self.0, max_size).await
}
}
};
}
impl_read_write!(InboundSubstream);
impl_read_write!(OutboundSubstream);
impl InboundUpgrade<NegotiatedSubstream> for ProtocolInfo {
type Output = InboundSubstream;
type Error = Infallible;
type Future = Ready<Result<Self::Output, Self::Error>>;
fn upgrade_inbound(self, socket: NegotiatedSubstream, _: Self::Info) -> Self::Future {
std::future::ready(Ok(InboundSubstream(socket)))
}
}
impl OutboundUpgrade<NegotiatedSubstream> for ProtocolInfo {
type Output = OutboundSubstream;
type Error = Infallible;
type Future = Ready<Result<Self::Output, Self::Error>>;
fn upgrade_outbound(self, socket: NegotiatedSubstream, _: Self::Info) -> Self::Future {
std::future::ready(Ok(OutboundSubstream(socket)))
}
}
pub enum ProtocolInEvent<I, O, E> {
ExecuteInbound(InboundProtocolFn<I, E>),
ExecuteOutbound(OutboundProtocolFn<O, E>),
}
pub enum ProtocolOutEvent<I, O, E> {
Inbound(Result<I, E>),
Outbound(Result<O, E>),
}
impl<TInboundOut, TOutboundOut, TErr> ProtocolsHandler for Handler<TInboundOut, TOutboundOut, TErr>
where
TInboundOut: Send + 'static,
TOutboundOut: Send + 'static,
TErr: Send + 'static,
{
type InEvent = ProtocolInEvent<TInboundOut, TOutboundOut, TErr>;
type OutEvent = ProtocolOutEvent<TInboundOut, TOutboundOut, TErr>;
type Error = Infallible;
type InboundProtocol = ProtocolInfo;
type OutboundProtocol = ProtocolInfo;
type InboundOpenInfo = ();
type OutboundOpenInfo = ();
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
SubstreamProtocol::new(ProtocolInfo::new(self.info), ())
}
fn inject_fully_negotiated_inbound(
&mut self,
substream: InboundSubstream,
_: Self::InboundOpenInfo,
) {
match mem::replace(&mut self.state, ProtocolState::Poisoned) {
ProtocolState::None => {
self.state = ProtocolState::Inbound(
InboundProtocolState::GotSubstreamNeedFunction(substream),
);
}
ProtocolState::Inbound(InboundProtocolState::GotFunctionNeedSubstream(protocol_fn)) => {
self.state =
ProtocolState::Inbound(InboundProtocolState::Executing(protocol_fn(substream)));
}
ProtocolState::Inbound(_) | ProtocolState::Done => {
panic!("Illegal state, substream is already present.");
}
ProtocolState::Outbound(_) => {
panic!("Failed to process inbound substream in outbound protocol.");
}
ProtocolState::Poisoned => {
panic!("Illegal state, currently in transient state poisoned.");
}
}
}
fn inject_fully_negotiated_outbound(
&mut self,
substream: OutboundSubstream,
_: Self::OutboundOpenInfo,
) {
match mem::replace(&mut self.state, ProtocolState::Poisoned) {
ProtocolState::Outbound(OutboundProtocolState::GotFunctionRequestedSubstream(
protocol_fn,
)) => {
self.state = ProtocolState::Outbound(OutboundProtocolState::Executing(
protocol_fn(substream),
));
}
ProtocolState::None
| ProtocolState::Outbound(OutboundProtocolState::GotFunctionNeedSubstream(_)) => {
panic!("Illegal state, receiving substream means it was requested.");
}
ProtocolState::Outbound(_) | ProtocolState::Done => {
panic!("Illegal state, substream is already present.");
}
ProtocolState::Inbound(_) => {
panic!("Failed to process outbound substream in inbound protocol.");
}
ProtocolState::Poisoned => {
panic!("Illegal state, currently in transient state poisoned.");
}
}
}
fn inject_event(&mut self, event: Self::InEvent) {
match event {
ProtocolInEvent::ExecuteInbound(protocol_fn) => {
match mem::replace(&mut self.state, ProtocolState::Poisoned) {
ProtocolState::None => {
self.state = ProtocolState::Inbound(
InboundProtocolState::GotFunctionNeedSubstream(protocol_fn),
);
}
ProtocolState::Inbound(InboundProtocolState::GotSubstreamNeedFunction(
substream,
)) => {
self.state = ProtocolState::Inbound(InboundProtocolState::Executing(
protocol_fn(substream),
));
}
ProtocolState::Inbound(_) | ProtocolState::Done => {
panic!("Illegal state, protocol fn is already present.");
}
ProtocolState::Outbound(_) => {
panic!("Failed to process inbound protocol fn in outbound protocol.");
}
ProtocolState::Poisoned => {
panic!("Illegal state, currently in transient state poisoned.");
}
}
}
ProtocolInEvent::ExecuteOutbound(protocol_fn) => {
match mem::replace(&mut self.state, ProtocolState::Poisoned) {
ProtocolState::None => {
self.state = ProtocolState::Outbound(
OutboundProtocolState::GotFunctionNeedSubstream(protocol_fn),
);
}
ProtocolState::Outbound(_) | ProtocolState::Done => {
panic!("Illegal state, protocol fn is already present.");
}
ProtocolState::Inbound(_) => {
panic!("Failed to process outbound protocol fn in inbound protocol.");
}
ProtocolState::Poisoned => {
panic!("Illegal state, currently in transient state poisoned.");
}
}
}
}
}
fn inject_dial_upgrade_error(
&mut self,
_: Self::OutboundOpenInfo,
err: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>,
) {
log::error!("Failed to upgrade: {}", err);
}
fn connection_keep_alive(&self) -> KeepAlive {
KeepAlive::Yes
}
#[allow(clippy::type_complexity)]
fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ProtocolsHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::OutEvent,
Self::Error,
>,
> {
match mem::replace(&mut self.state, ProtocolState::Poisoned) {
ProtocolState::Inbound(InboundProtocolState::Executing(mut protocol)) => match protocol
.poll_unpin(cx)
{
Poll::Ready(res) => {
self.state = ProtocolState::Done;
Poll::Ready(ProtocolsHandlerEvent::Custom(ProtocolOutEvent::Inbound(
res,
)))
}
Poll::Pending => {
self.state = ProtocolState::Inbound(InboundProtocolState::Executing(protocol));
Poll::Pending
}
},
ProtocolState::Outbound(OutboundProtocolState::Executing(mut protocol)) => {
match protocol.poll_unpin(cx) {
Poll::Ready(res) => {
self.state = ProtocolState::Done;
Poll::Ready(ProtocolsHandlerEvent::Custom(ProtocolOutEvent::Outbound(
res,
)))
}
Poll::Pending => {
self.state =
ProtocolState::Outbound(OutboundProtocolState::Executing(protocol));
Poll::Pending
}
}
}
ProtocolState::Outbound(OutboundProtocolState::GotFunctionNeedSubstream(protocol)) => {
self.state = ProtocolState::Outbound(
OutboundProtocolState::GotFunctionRequestedSubstream(protocol),
);
Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(ProtocolInfo::new(self.info), ()),
})
}
ProtocolState::Poisoned => {
unreachable!("Protocol is poisoned (transient state)")
}
other => {
self.state = other;
Poll::Pending
}
}
}
}
/// A behaviour that can execute await/.async protocols.
///
/// Note: It is not possible to execute the same protocol with the same peer several simultaneous times.
pub struct Behaviour<I, O, E> {
protocol_in_events: VecDeque<(PeerId, ProtocolInEvent<I, O, E>)>,
protocol_out_events: VecDeque<(PeerId, ProtocolOutEvent<I, O, E>)>,
connected_peers: HashMap<PeerId, Vec<Multiaddr>>,
info: &'static [u8],
}
impl<I, O, E> Behaviour<I, O, E> {
/// Constructs a new [`Behaviour`] with the given protocol info.
///
/// # Example
///
/// ```
/// # use libp2p_async_await::Behaviour;
///
/// let _ = Behaviour::new(b"/foo/bar/1.0.0");
/// ```
pub fn new(info: &'static [u8]) -> Self {
Self {
protocol_in_events: VecDeque::default(),
protocol_out_events: VecDeque::default(),
connected_peers: HashMap::default(),
info,
}
}
}
impl<I, O, E> Behaviour<I, O, E> {
pub fn do_protocol_listener<F>(
&mut self,
peer: PeerId,
protocol: impl FnOnce(InboundSubstream) -> F + Send + 'static,
) where
F: Future<Output = Result<I, E>> + Send + 'static,
{
self.protocol_in_events.push_back((
peer,
ProtocolInEvent::ExecuteInbound(Box::new(move |substream| protocol(substream).boxed())),
));
}
pub fn do_protocol_dialer<F>(
&mut self,
peer: PeerId,
protocol: impl FnOnce(OutboundSubstream) -> F + Send + 'static,
) where
F: Future<Output = Result<O, E>> + Send + 'static,
{
self.protocol_in_events.push_back((
peer,
ProtocolInEvent::ExecuteOutbound(Box::new(move |substream| {
protocol(substream).boxed()
})),
));
}
}
#[derive(Clone)]
pub enum BehaviourOutEvent<I, O, E> {
Inbound(PeerId, Result<I, E>),
Outbound(PeerId, Result<O, E>),
}
impl<I, O, E> NetworkBehaviour for Behaviour<I, O, E>
where
I: Send + 'static,
O: Send + 'static,
E: Send + 'static,
{
type ProtocolsHandler = Handler<I, O, E>;
type OutEvent = BehaviourOutEvent<I, O, E>;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
Handler::new(self.info)
}
fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
self.connected_peers.get(peer).cloned().unwrap_or_default()
}
fn inject_connected(&mut self, _: &PeerId) {}
fn inject_disconnected(&mut self, _: &PeerId) {}
fn inject_connection_established(
&mut self,
peer: &PeerId,
_: &ConnectionId,
point: &ConnectedPoint,
) {
let multiaddr = point.get_remote_address().clone();
self.connected_peers
.entry(*peer)
.or_default()
.push(multiaddr);
}
fn inject_connection_closed(
&mut self,
peer: &PeerId,
_: &ConnectionId,
point: &ConnectedPoint,
) {
let multiaddr = point.get_remote_address();
self.connected_peers
.entry(*peer)
.or_default()
.retain(|addr| addr != multiaddr);
}
fn inject_event(&mut self, peer: PeerId, _: ConnectionId, event: ProtocolOutEvent<I, O, E>) {
self.protocol_out_events.push_back((peer, event));
}
fn poll(
&mut self,
_: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<ProtocolInEvent<I, O, E>, Self::OutEvent>> {
if let Some((peer, event)) = self.protocol_in_events.pop_front() {
if !self.connected_peers.contains_key(&peer) {
self.protocol_in_events.push_back((peer, event));
} else {
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id: peer,
handler: NotifyHandler::Any,
event,
});
}
}
if let Some((peer, event)) = self.protocol_out_events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(match event {
ProtocolOutEvent::Inbound(res) => BehaviourOutEvent::Inbound(peer, res),
ProtocolOutEvent::Outbound(res) => BehaviourOutEvent::Outbound(peer, res),
}));
}
Poll::Pending
}
}

@ -0,0 +1,150 @@
use libp2p::futures::future;
use libp2p::futures::future::FutureExt;
use libp2p::{
core::{muxing::StreamMuxerBox, transport::memory::MemoryTransport, upgrade::Version},
identity,
noise::{self, NoiseConfig, X25519Spec},
swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent},
yamux::YamuxConfig,
Multiaddr, PeerId, Swarm, Transport,
};
use std::{fmt::Debug, future::Future, time::Duration};
use tokio::runtime::Handle;
use tokio::time;
#[allow(missing_debug_implementations)]
pub struct Actor<B: NetworkBehaviour> {
pub swarm: Swarm<B>,
pub addr: Multiaddr,
pub peer_id: PeerId,
}
pub async fn new_connected_swarm_pair<B, F>(behaviour_fn: F, handle: Handle) -> (Actor<B>, Actor<B>)
where
B: NetworkBehaviour,
F: Fn(PeerId, identity::Keypair) -> B + Clone,
<B as NetworkBehaviour>::OutEvent: Debug,
{
let (swarm, addr, peer_id) = new_swarm(behaviour_fn.clone(), handle.clone());
let mut alice = Actor {
swarm,
addr,
peer_id,
};
let (swarm, addr, peer_id) = new_swarm(behaviour_fn, handle);
let mut bob = Actor {
swarm,
addr,
peer_id,
};
connect(&mut alice.swarm, &mut bob.swarm).await;
(alice, bob)
}
pub fn new_swarm<B: NetworkBehaviour, F: Fn(PeerId, identity::Keypair) -> B>(
behaviour_fn: F,
handle: Handle,
) -> (Swarm<B>, Multiaddr, PeerId) {
let id_keys = identity::Keypair::generate_ed25519();
let peer_id = PeerId::from(id_keys.public());
let dh_keys = noise::Keypair::<X25519Spec>::new()
.into_authentic(&id_keys)
.expect("failed to create dh_keys");
let noise = NoiseConfig::xx(dh_keys).into_authenticated();
let transport = MemoryTransport::default()
.upgrade(Version::V1)
.authenticate(noise)
.multiplex(YamuxConfig::default())
.map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer)))
.boxed();
let mut swarm: Swarm<B> = SwarmBuilder::new(transport, behaviour_fn(peer_id, id_keys), peer_id)
.executor(Box::new(move |f| {
handle.spawn(f);
}))
.build();
let address_port = rand::random::<u64>();
let addr = format!("/memory/{}", address_port)
.parse::<Multiaddr>()
.unwrap();
Swarm::listen_on(&mut swarm, addr.clone()).unwrap();
(swarm, addr, peer_id)
}
pub async fn await_events_or_timeout<A, B>(
alice_event: impl Future<Output = A>,
bob_event: impl Future<Output = B>,
) -> (A, B) {
time::timeout(
Duration::from_secs(10),
future::join(alice_event, bob_event),
)
.await
.expect("network behaviours to emit an event within 10 seconds")
}
/// Connects two swarms with each other.
///
/// This assumes the transport that is in use can be used by Alice to connect to
/// the listen address that is emitted by Bob. In other words, they have to be
/// on the same network. The memory transport used by the above `new_swarm`
/// function fulfills this.
///
/// We also assume that the swarms don't emit any behaviour events during the
/// connection phase. Any event emitted is considered a bug from this functions
/// PoV because they would be lost.
pub async fn connect<B>(alice: &mut Swarm<B>, bob: &mut Swarm<B>)
where
B: NetworkBehaviour,
<B as NetworkBehaviour>::OutEvent: Debug,
{
let mut alice_connected = false;
let mut bob_connected = false;
while !(alice_connected && bob_connected) {
libp2p::futures::select! {
alice_event = alice.next_event().fuse() => {
match alice_event {
SwarmEvent::ConnectionEstablished { .. } => {
log::info!("alice connected");
alice_connected = true;
}
SwarmEvent::Behaviour(event) => {
panic!(
"alice unexpectedly emitted a behaviour event during connection: {:?}",
event
);
}
_ => {}
}
}
bob_event = bob.next_event().fuse() => {
match bob_event {
SwarmEvent::ConnectionEstablished { .. } => {
log::info!("bob connected");
bob_connected = true;
}
SwarmEvent::NewListenAddr(addr) => {
Swarm::dial_addr(alice, addr).unwrap();
}
SwarmEvent::Behaviour(event) => {
panic!(
"bob unexpectedly emitted a behaviour event during connection: {:?}",
event
);
}
_ => {}
}
}
}
}
}

@ -0,0 +1,140 @@
#![allow(clippy::blacklisted_name)]
use anyhow::{Context, Error};
use harness::await_events_or_timeout;
use harness::new_connected_swarm_pair;
use libp2p::swarm::SwarmEvent;
use libp2p::PeerId;
use libp2p_async_await::{Behaviour, BehaviourOutEvent};
use tokio::runtime::Handle;
mod harness;
#[derive(serde::Serialize, serde::Deserialize, Debug)]
struct Message0 {
foo: u32,
}
#[derive(serde::Serialize, serde::Deserialize, Debug)]
struct Message1 {
bar: u32,
}
#[derive(serde::Serialize, serde::Deserialize, Debug)]
struct Message2 {
baz: u32,
}
#[derive(Debug)]
struct AliceResult {
bar: u32,
}
#[derive(Debug)]
struct BobResult {
foo: u32,
baz: u32,
}
#[derive(Debug)]
enum MyOutEvent {
Alice(AliceResult),
Bob(BobResult),
Failed(anyhow::Error),
}
impl From<BehaviourOutEvent<BobResult, AliceResult, anyhow::Error>> for MyOutEvent {
fn from(event: BehaviourOutEvent<BobResult, AliceResult, Error>) -> Self {
match event {
BehaviourOutEvent::Inbound(_, Ok(bob)) => MyOutEvent::Bob(bob),
BehaviourOutEvent::Outbound(_, Ok(alice)) => MyOutEvent::Alice(alice),
BehaviourOutEvent::Inbound(_, Err(e)) | BehaviourOutEvent::Outbound(_, Err(e)) => {
MyOutEvent::Failed(e)
}
}
}
}
#[derive(libp2p::NetworkBehaviour)]
#[behaviour(out_event = "MyOutEvent", event_process = false)]
struct MyBehaviour {
inner: Behaviour<BobResult, AliceResult, anyhow::Error>,
}
impl MyBehaviour {
pub fn new() -> Self {
Self {
inner: Behaviour::new(b"/foo/bar/1.0.0"),
}
}
}
impl MyBehaviour {
fn alice_do_protocol(&mut self, bob: PeerId, foo: u32, baz: u32) {
self.inner
.do_protocol_dialer(bob, move |mut substream| async move {
substream
.write_message(
&serde_cbor::to_vec(&Message0 { foo })
.context("failed to serialize Message0")?,
)
.await?;
let bytes = substream.read_message(1024).await?;
let message1 = serde_cbor::from_slice::<Message1>(&bytes)?;
substream
.write_message(
&serde_cbor::to_vec(&Message2 { baz })
.context("failed to serialize Message2")?,
)
.await?;
Ok(AliceResult { bar: message1.bar })
})
}
fn bob_do_protocol(&mut self, alice: PeerId, bar: u32) {
self.inner
.do_protocol_listener(alice, move |mut substream| async move {
let bytes = substream.read_message(1024).await?;
let message0 = serde_cbor::from_slice::<Message0>(&bytes)?;
substream
.write_message(
&serde_cbor::to_vec(&Message1 { bar })
.context("failed to serialize Message1")?,
)
.await?;
let bytes = substream.read_message(1024).await?;
let message2 = serde_cbor::from_slice::<Message2>(&bytes)?;
Ok(BobResult {
foo: message0.foo,
baz: message2.baz,
})
})
}
}
#[tokio::test]
async fn it_works() {
let _ = env_logger::try_init();
let (mut alice, mut bob) =
new_connected_swarm_pair(|_, _| MyBehaviour::new(), Handle::current()).await;
alice.swarm.alice_do_protocol(bob.peer_id, 10, 42);
bob.swarm.bob_do_protocol(alice.peer_id, 1337);
let (alice_event, bob_event) =
await_events_or_timeout(alice.swarm.next_event(), bob.swarm.next_event()).await;
assert!(matches!(
alice_event,
SwarmEvent::Behaviour(MyOutEvent::Alice(AliceResult { bar: 1337 }))
));
assert!(matches!(
bob_event,
SwarmEvent::Behaviour(MyOutEvent::Bob(BobResult { foo: 10, baz: 42 }))
));
}
Loading…
Cancel
Save