|
|
|
@ -24,26 +24,27 @@ type OutboundProtocolFn<O, E> =
|
|
|
|
|
Box<dyn FnOnce(OutboundSubstream) -> Protocol<O, E> + Send + 'static>;
|
|
|
|
|
|
|
|
|
|
enum InboundProtocolState<T, E> {
|
|
|
|
|
None,
|
|
|
|
|
PendingSubstream(InboundProtocolFn<T, E>),
|
|
|
|
|
PendingProtocolFn(InboundSubstream),
|
|
|
|
|
ReadyToPoll(Protocol<T, E>),
|
|
|
|
|
Done,
|
|
|
|
|
Poisoned,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
enum OutboundProtocolState<T, E> {
|
|
|
|
|
None,
|
|
|
|
|
PendingSubstream(OutboundProtocolFn<T, E>),
|
|
|
|
|
PendingProtocolFn(OutboundSubstream),
|
|
|
|
|
ReadyToPoll(Protocol<T, E>),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
enum ProtocolState<I, O, E> {
|
|
|
|
|
None,
|
|
|
|
|
Inbound(InboundProtocolState<I, E>),
|
|
|
|
|
Outbound(OutboundProtocolState<O, E>),
|
|
|
|
|
Done,
|
|
|
|
|
Poisoned,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub struct NMessageHandler<TInboundOut, TOutboundOut, TErr> {
|
|
|
|
|
inbound_state: InboundProtocolState<TInboundOut, TErr>,
|
|
|
|
|
outbound_state: OutboundProtocolState<TOutboundOut, TErr>,
|
|
|
|
|
state: ProtocolState<TInboundOut, TOutboundOut, TErr>,
|
|
|
|
|
|
|
|
|
|
// TODO: See if it can be included in OutboundProtocolState.
|
|
|
|
|
// Or it can be inferred from OutboundProtocolState current variant.
|
|
|
|
@ -55,8 +56,7 @@ pub struct NMessageHandler<TInboundOut, TOutboundOut, TErr> {
|
|
|
|
|
impl<TInboundOut, TOutboundOut, TErr> NMessageHandler<TInboundOut, TOutboundOut, TErr> {
|
|
|
|
|
pub fn new(info: &'static [u8]) -> Self {
|
|
|
|
|
Self {
|
|
|
|
|
inbound_state: InboundProtocolState::None,
|
|
|
|
|
outbound_state: OutboundProtocolState::None,
|
|
|
|
|
state: ProtocolState::None,
|
|
|
|
|
substream_request: None,
|
|
|
|
|
info,
|
|
|
|
|
}
|
|
|
|
@ -143,18 +143,24 @@ where
|
|
|
|
|
substream: InboundSubstream,
|
|
|
|
|
_: Self::InboundOpenInfo,
|
|
|
|
|
) {
|
|
|
|
|
match mem::replace(&mut self.inbound_state, InboundProtocolState::Poisoned) {
|
|
|
|
|
InboundProtocolState::None => {
|
|
|
|
|
self.inbound_state = InboundProtocolState::PendingProtocolFn(substream);
|
|
|
|
|
match mem::replace(&mut self.state, ProtocolState::Poisoned) {
|
|
|
|
|
ProtocolState::None => {
|
|
|
|
|
self.state =
|
|
|
|
|
ProtocolState::Inbound(InboundProtocolState::PendingProtocolFn(substream));
|
|
|
|
|
}
|
|
|
|
|
ProtocolState::Inbound(InboundProtocolState::PendingSubstream(protocol_fn)) => {
|
|
|
|
|
self.state = ProtocolState::Inbound(InboundProtocolState::ReadyToPoll(
|
|
|
|
|
protocol_fn(substream),
|
|
|
|
|
));
|
|
|
|
|
}
|
|
|
|
|
InboundProtocolState::PendingSubstream(protocol_fn) => {
|
|
|
|
|
self.inbound_state = InboundProtocolState::ReadyToPoll(protocol_fn(substream));
|
|
|
|
|
ProtocolState::Inbound(_) | ProtocolState::Done => {
|
|
|
|
|
panic!("Illegal state, substream is already present.");
|
|
|
|
|
}
|
|
|
|
|
InboundProtocolState::PendingProtocolFn(_)
|
|
|
|
|
| InboundProtocolState::ReadyToPoll(_)
|
|
|
|
|
| InboundProtocolState::Done
|
|
|
|
|
| InboundProtocolState::Poisoned => {
|
|
|
|
|
panic!("Failed to inject inbound substream due to unexpected state.");
|
|
|
|
|
ProtocolState::Outbound(_) => {
|
|
|
|
|
panic!("Failed to process inbound substream in outbound protocol.");
|
|
|
|
|
}
|
|
|
|
|
ProtocolState::Poisoned => {
|
|
|
|
|
panic!("Illegal state, currently in transient state poisoned.");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -164,18 +170,24 @@ where
|
|
|
|
|
substream: OutboundSubstream,
|
|
|
|
|
_: Self::OutboundOpenInfo,
|
|
|
|
|
) {
|
|
|
|
|
match mem::replace(&mut self.outbound_state, OutboundProtocolState::Poisoned) {
|
|
|
|
|
OutboundProtocolState::None => {
|
|
|
|
|
self.outbound_state = OutboundProtocolState::PendingProtocolFn(substream);
|
|
|
|
|
match mem::replace(&mut self.state, ProtocolState::Poisoned) {
|
|
|
|
|
ProtocolState::None => {
|
|
|
|
|
self.state =
|
|
|
|
|
ProtocolState::Outbound(OutboundProtocolState::PendingProtocolFn(substream));
|
|
|
|
|
}
|
|
|
|
|
ProtocolState::Outbound(OutboundProtocolState::PendingSubstream(protocol_fn)) => {
|
|
|
|
|
self.state = ProtocolState::Outbound(OutboundProtocolState::ReadyToPoll(
|
|
|
|
|
protocol_fn(substream),
|
|
|
|
|
));
|
|
|
|
|
}
|
|
|
|
|
OutboundProtocolState::PendingSubstream(protocol_fn) => {
|
|
|
|
|
self.outbound_state = OutboundProtocolState::ReadyToPoll(protocol_fn(substream));
|
|
|
|
|
ProtocolState::Outbound(_) | ProtocolState::Done => {
|
|
|
|
|
panic!("Illegal state, substream is already present.");
|
|
|
|
|
}
|
|
|
|
|
OutboundProtocolState::PendingProtocolFn(_)
|
|
|
|
|
| OutboundProtocolState::ReadyToPoll(_)
|
|
|
|
|
| OutboundProtocolState::Done
|
|
|
|
|
| OutboundProtocolState::Poisoned => {
|
|
|
|
|
panic!("Failed to inject outbound substream due to unexpected state.");
|
|
|
|
|
ProtocolState::Inbound(_) => {
|
|
|
|
|
panic!("Failed to process outbound substream in inbound protocol.");
|
|
|
|
|
}
|
|
|
|
|
ProtocolState::Poisoned => {
|
|
|
|
|
panic!("Illegal state, currently in transient state poisoned.");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -183,19 +195,25 @@ where
|
|
|
|
|
fn inject_event(&mut self, event: Self::InEvent) {
|
|
|
|
|
match event {
|
|
|
|
|
ProtocolInEvent::ExecuteInbound(protocol_fn) => {
|
|
|
|
|
match mem::replace(&mut self.inbound_state, InboundProtocolState::Poisoned) {
|
|
|
|
|
InboundProtocolState::None => {
|
|
|
|
|
self.inbound_state = InboundProtocolState::PendingSubstream(protocol_fn);
|
|
|
|
|
match mem::replace(&mut self.state, ProtocolState::Poisoned) {
|
|
|
|
|
ProtocolState::None => {
|
|
|
|
|
self.state = ProtocolState::Inbound(
|
|
|
|
|
InboundProtocolState::PendingSubstream(protocol_fn),
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
ProtocolState::Inbound(InboundProtocolState::PendingProtocolFn(substream)) => {
|
|
|
|
|
self.state = ProtocolState::Inbound(InboundProtocolState::ReadyToPoll(
|
|
|
|
|
protocol_fn(substream),
|
|
|
|
|
));
|
|
|
|
|
}
|
|
|
|
|
InboundProtocolState::PendingProtocolFn(substream) => {
|
|
|
|
|
self.inbound_state =
|
|
|
|
|
InboundProtocolState::ReadyToPoll(protocol_fn(substream));
|
|
|
|
|
ProtocolState::Inbound(_) | ProtocolState::Done => {
|
|
|
|
|
panic!("Illegal state, protocol fn is already present.");
|
|
|
|
|
}
|
|
|
|
|
InboundProtocolState::PendingSubstream(_)
|
|
|
|
|
| InboundProtocolState::ReadyToPoll(_)
|
|
|
|
|
| InboundProtocolState::Done
|
|
|
|
|
| InboundProtocolState::Poisoned => {
|
|
|
|
|
panic!("Failed to inject inbound protocol fn due to unexpected state.");
|
|
|
|
|
ProtocolState::Outbound(_) => {
|
|
|
|
|
panic!("Failed to process inbound protocol fn in outbound protocol.");
|
|
|
|
|
}
|
|
|
|
|
ProtocolState::Poisoned => {
|
|
|
|
|
panic!("Illegal state, currently in transient state poisoned.");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -203,19 +221,27 @@ where
|
|
|
|
|
self.substream_request =
|
|
|
|
|
Some(SubstreamProtocol::new(NMessageProtocol::new(self.info), ()));
|
|
|
|
|
|
|
|
|
|
match mem::replace(&mut self.outbound_state, OutboundProtocolState::Poisoned) {
|
|
|
|
|
OutboundProtocolState::None => {
|
|
|
|
|
self.outbound_state = OutboundProtocolState::PendingSubstream(protocol_fn);
|
|
|
|
|
match mem::replace(&mut self.state, ProtocolState::Poisoned) {
|
|
|
|
|
ProtocolState::None => {
|
|
|
|
|
self.state = ProtocolState::Outbound(
|
|
|
|
|
OutboundProtocolState::PendingSubstream(protocol_fn),
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
ProtocolState::Outbound(OutboundProtocolState::PendingProtocolFn(
|
|
|
|
|
substream,
|
|
|
|
|
)) => {
|
|
|
|
|
self.state = ProtocolState::Outbound(OutboundProtocolState::ReadyToPoll(
|
|
|
|
|
protocol_fn(substream),
|
|
|
|
|
));
|
|
|
|
|
}
|
|
|
|
|
OutboundProtocolState::PendingProtocolFn(substream) => {
|
|
|
|
|
self.outbound_state =
|
|
|
|
|
OutboundProtocolState::ReadyToPoll(protocol_fn(substream));
|
|
|
|
|
ProtocolState::Outbound(_) | ProtocolState::Done => {
|
|
|
|
|
panic!("Illegal state, protocol fn is already present.");
|
|
|
|
|
}
|
|
|
|
|
OutboundProtocolState::PendingSubstream(_)
|
|
|
|
|
| OutboundProtocolState::ReadyToPoll(_)
|
|
|
|
|
| OutboundProtocolState::Done
|
|
|
|
|
| OutboundProtocolState::Poisoned => {
|
|
|
|
|
panic!("Failed to inject outbound protocol fn due to unexpected state.");
|
|
|
|
|
ProtocolState::Inbound(_) => {
|
|
|
|
|
panic!("Failed to process outbound protocol fn in inbound protocol.");
|
|
|
|
|
}
|
|
|
|
|
ProtocolState::Poisoned => {
|
|
|
|
|
panic!("Illegal state, currently in transient state poisoned.");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -249,61 +275,57 @@ where
|
|
|
|
|
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol });
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
match mem::replace(&mut self.inbound_state, InboundProtocolState::Poisoned) {
|
|
|
|
|
InboundProtocolState::ReadyToPoll(mut protocol) => match protocol.poll_unpin(cx) {
|
|
|
|
|
Poll::Ready(Ok(value)) => {
|
|
|
|
|
self.inbound_state = InboundProtocolState::Done;
|
|
|
|
|
return Poll::Ready(ProtocolsHandlerEvent::Custom(
|
|
|
|
|
ProtocolOutEvent::InboundFinished(value),
|
|
|
|
|
));
|
|
|
|
|
}
|
|
|
|
|
Poll::Ready(Err(e)) => {
|
|
|
|
|
self.inbound_state = InboundProtocolState::Done;
|
|
|
|
|
return Poll::Ready(ProtocolsHandlerEvent::Custom(
|
|
|
|
|
ProtocolOutEvent::InboundFailed(e),
|
|
|
|
|
));
|
|
|
|
|
}
|
|
|
|
|
Poll::Pending => {
|
|
|
|
|
self.inbound_state = InboundProtocolState::ReadyToPoll(protocol);
|
|
|
|
|
return Poll::Pending;
|
|
|
|
|
match mem::replace(&mut self.state, ProtocolState::Poisoned) {
|
|
|
|
|
ProtocolState::Inbound(InboundProtocolState::ReadyToPoll(mut protocol)) => {
|
|
|
|
|
match protocol.poll_unpin(cx) {
|
|
|
|
|
Poll::Ready(Ok(value)) => {
|
|
|
|
|
self.state = ProtocolState::Done;
|
|
|
|
|
Poll::Ready(ProtocolsHandlerEvent::Custom(
|
|
|
|
|
ProtocolOutEvent::InboundFinished(value),
|
|
|
|
|
))
|
|
|
|
|
}
|
|
|
|
|
Poll::Ready(Err(e)) => {
|
|
|
|
|
self.state = ProtocolState::Done;
|
|
|
|
|
Poll::Ready(ProtocolsHandlerEvent::Custom(
|
|
|
|
|
ProtocolOutEvent::InboundFailed(e),
|
|
|
|
|
))
|
|
|
|
|
}
|
|
|
|
|
Poll::Pending => {
|
|
|
|
|
self.state =
|
|
|
|
|
ProtocolState::Inbound(InboundProtocolState::ReadyToPoll(protocol));
|
|
|
|
|
Poll::Pending
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
InboundProtocolState::Poisoned => {
|
|
|
|
|
unreachable!("Inbound protocol is poisoned (transient state)")
|
|
|
|
|
}
|
|
|
|
|
other => {
|
|
|
|
|
self.inbound_state = other;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
match mem::replace(&mut self.outbound_state, OutboundProtocolState::Poisoned) {
|
|
|
|
|
OutboundProtocolState::ReadyToPoll(mut protocol) => match protocol.poll_unpin(cx) {
|
|
|
|
|
Poll::Ready(Ok(value)) => {
|
|
|
|
|
self.outbound_state = OutboundProtocolState::Done;
|
|
|
|
|
return Poll::Ready(ProtocolsHandlerEvent::Custom(
|
|
|
|
|
ProtocolOutEvent::OutboundFinished(value),
|
|
|
|
|
));
|
|
|
|
|
}
|
|
|
|
|
Poll::Ready(Err(e)) => {
|
|
|
|
|
self.outbound_state = OutboundProtocolState::Done;
|
|
|
|
|
return Poll::Ready(ProtocolsHandlerEvent::Custom(
|
|
|
|
|
ProtocolOutEvent::OutboundFailed(e),
|
|
|
|
|
));
|
|
|
|
|
}
|
|
|
|
|
Poll::Pending => {
|
|
|
|
|
self.outbound_state = OutboundProtocolState::ReadyToPoll(protocol);
|
|
|
|
|
return Poll::Pending;
|
|
|
|
|
ProtocolState::Outbound(OutboundProtocolState::ReadyToPoll(mut protocol)) => {
|
|
|
|
|
match protocol.poll_unpin(cx) {
|
|
|
|
|
Poll::Ready(Ok(value)) => {
|
|
|
|
|
self.state = ProtocolState::Done;
|
|
|
|
|
Poll::Ready(ProtocolsHandlerEvent::Custom(
|
|
|
|
|
ProtocolOutEvent::OutboundFinished(value),
|
|
|
|
|
))
|
|
|
|
|
}
|
|
|
|
|
Poll::Ready(Err(e)) => {
|
|
|
|
|
self.state = ProtocolState::Done;
|
|
|
|
|
Poll::Ready(ProtocolsHandlerEvent::Custom(
|
|
|
|
|
ProtocolOutEvent::OutboundFailed(e),
|
|
|
|
|
))
|
|
|
|
|
}
|
|
|
|
|
Poll::Pending => {
|
|
|
|
|
self.state =
|
|
|
|
|
ProtocolState::Outbound(OutboundProtocolState::ReadyToPoll(protocol));
|
|
|
|
|
Poll::Pending
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
OutboundProtocolState::Poisoned => {
|
|
|
|
|
unreachable!("Outbound protocol is poisoned (transient state)")
|
|
|
|
|
}
|
|
|
|
|
ProtocolState::Poisoned => {
|
|
|
|
|
unreachable!("Protocol is poisoned (transient state)")
|
|
|
|
|
}
|
|
|
|
|
other => {
|
|
|
|
|
self.outbound_state = other;
|
|
|
|
|
self.state = other;
|
|
|
|
|
Poll::Pending
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
Poll::Pending
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|