Compare commits
5 Commits
master
...
cleanup-mo
Author | SHA1 | Date |
---|---|---|
Daniel Karzel | a11c9d9c03 | 3 years ago |
Daniel Karzel | 52e2e7e489 | 3 years ago |
Daniel Karzel | c87af7bf84 | 3 years ago |
Daniel Karzel | 27c255646f | 3 years ago |
Daniel Karzel | 85f5635663 | 3 years ago |
@ -1,7 +1,18 @@
|
||||
mod behaviour;
|
||||
pub mod command;
|
||||
pub mod config;
|
||||
mod event_loop;
|
||||
mod rate;
|
||||
mod recovery;
|
||||
pub mod tracing;
|
||||
pub mod transport;
|
||||
|
||||
pub use behaviour::{Behaviour, OutEvent};
|
||||
pub use event_loop::{EventLoop, EventLoopHandle, FixedRate, KrakenRate, LatestRate};
|
||||
pub use rate::Rate;
|
||||
pub use recovery::cancel::cancel;
|
||||
pub use recovery::punish::punish;
|
||||
pub use recovery::redeem::{redeem, Finality};
|
||||
pub use recovery::refund::refund;
|
||||
pub use recovery::safely_abort::safely_abort;
|
||||
pub use recovery::{cancel, refund};
|
||||
|
@ -1,3 +1,12 @@
|
||||
mod behaviour;
|
||||
pub mod cancel;
|
||||
pub mod command;
|
||||
mod event_loop;
|
||||
pub mod refund;
|
||||
pub mod tracing;
|
||||
pub mod transport;
|
||||
|
||||
pub use behaviour::{Behaviour, OutEvent};
|
||||
pub use cancel::cancel;
|
||||
pub use event_loop::{EventLoop, EventLoopHandle};
|
||||
pub use refund::refund;
|
||||
|
@ -0,0 +1 @@
|
||||
|
@ -0,0 +1 @@
|
||||
|
@ -1,138 +0,0 @@
|
||||
use crate::monero;
|
||||
use crate::network::cbor_request_response::CborCodec;
|
||||
use libp2p::core::ProtocolName;
|
||||
use libp2p::request_response::{RequestResponse, RequestResponseEvent, RequestResponseMessage};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub const PROTOCOL: &str = "/comit/xmr/btc/spot-price/1.0.0";
|
||||
pub type OutEvent = RequestResponseEvent<Request, Response>;
|
||||
pub type Message = RequestResponseMessage<Request, Response>;
|
||||
|
||||
pub type Behaviour = RequestResponse<CborCodec<SpotPriceProtocol, Request, Response>>;
|
||||
|
||||
/// The spot price protocol allows parties to **initiate** a trade by requesting
|
||||
/// a spot price.
|
||||
///
|
||||
/// A spot price is binding for both parties, i.e. after the spot-price protocol
|
||||
/// completes, both parties are expected to follow up with the `execution-setup`
|
||||
/// protocol.
|
||||
///
|
||||
/// If a party wishes to only inquire about the current price, they should use
|
||||
/// the `quote` protocol instead.
|
||||
#[derive(Debug, Clone, Copy, Default)]
|
||||
pub struct SpotPriceProtocol;
|
||||
|
||||
impl ProtocolName for SpotPriceProtocol {
|
||||
fn protocol_name(&self) -> &[u8] {
|
||||
PROTOCOL.as_bytes()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct Request {
|
||||
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
|
||||
pub btc: bitcoin::Amount,
|
||||
pub blockchain_network: BlockchainNetwork,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub enum Response {
|
||||
Xmr(monero::Amount),
|
||||
Error(Error),
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub enum Error {
|
||||
NoSwapsAccepted,
|
||||
AmountBelowMinimum {
|
||||
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
|
||||
min: bitcoin::Amount,
|
||||
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
|
||||
buy: bitcoin::Amount,
|
||||
},
|
||||
AmountAboveMaximum {
|
||||
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
|
||||
max: bitcoin::Amount,
|
||||
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
|
||||
buy: bitcoin::Amount,
|
||||
},
|
||||
BalanceTooLow {
|
||||
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
|
||||
buy: bitcoin::Amount,
|
||||
},
|
||||
BlockchainNetworkMismatch {
|
||||
cli: BlockchainNetwork,
|
||||
asb: BlockchainNetwork,
|
||||
},
|
||||
/// To be used for errors that cannot be explained on the CLI side (e.g.
|
||||
/// rate update problems on the seller side)
|
||||
Other,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq)]
|
||||
pub struct BlockchainNetwork {
|
||||
#[serde(with = "crate::bitcoin::network")]
|
||||
pub bitcoin: bitcoin::Network,
|
||||
#[serde(with = "crate::monero::network")]
|
||||
pub monero: monero::Network,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::monero;
|
||||
|
||||
#[test]
|
||||
fn snapshot_test_serialize() {
|
||||
let amount = monero::Amount::from_piconero(100_000u64);
|
||||
let xmr = r#"{"Xmr":100000}"#.to_string();
|
||||
let serialized = serde_json::to_string(&Response::Xmr(amount)).unwrap();
|
||||
assert_eq!(xmr, serialized);
|
||||
|
||||
let error = r#"{"Error":"NoSwapsAccepted"}"#.to_string();
|
||||
let serialized = serde_json::to_string(&Response::Error(Error::NoSwapsAccepted)).unwrap();
|
||||
assert_eq!(error, serialized);
|
||||
|
||||
let error = r#"{"Error":{"AmountBelowMinimum":{"min":0,"buy":0}}}"#.to_string();
|
||||
let serialized = serde_json::to_string(&Response::Error(Error::AmountBelowMinimum {
|
||||
min: Default::default(),
|
||||
buy: Default::default(),
|
||||
}))
|
||||
.unwrap();
|
||||
assert_eq!(error, serialized);
|
||||
|
||||
let error = r#"{"Error":{"AmountAboveMaximum":{"max":0,"buy":0}}}"#.to_string();
|
||||
let serialized = serde_json::to_string(&Response::Error(Error::AmountAboveMaximum {
|
||||
max: Default::default(),
|
||||
buy: Default::default(),
|
||||
}))
|
||||
.unwrap();
|
||||
assert_eq!(error, serialized);
|
||||
|
||||
let error = r#"{"Error":{"BalanceTooLow":{"buy":0}}}"#.to_string();
|
||||
let serialized = serde_json::to_string(&Response::Error(Error::BalanceTooLow {
|
||||
buy: Default::default(),
|
||||
}))
|
||||
.unwrap();
|
||||
assert_eq!(error, serialized);
|
||||
|
||||
let error = r#"{"Error":{"BlockchainNetworkMismatch":{"cli":{"bitcoin":"Mainnet","monero":"Mainnet"},"asb":{"bitcoin":"Testnet","monero":"Stagenet"}}}}"#.to_string();
|
||||
let serialized =
|
||||
serde_json::to_string(&Response::Error(Error::BlockchainNetworkMismatch {
|
||||
cli: BlockchainNetwork {
|
||||
bitcoin: bitcoin::Network::Bitcoin,
|
||||
monero: monero::Network::Mainnet,
|
||||
},
|
||||
asb: BlockchainNetwork {
|
||||
bitcoin: bitcoin::Network::Testnet,
|
||||
monero: monero::Network::Stagenet,
|
||||
},
|
||||
}))
|
||||
.unwrap();
|
||||
assert_eq!(error, serialized);
|
||||
|
||||
let error = r#"{"Error":"Other"}"#.to_string();
|
||||
let serialized = serde_json::to_string(&Response::Error(Error::Other)).unwrap();
|
||||
assert_eq!(error, serialized);
|
||||
}
|
||||
}
|
@ -0,0 +1,114 @@
|
||||
use crate::monero;
|
||||
use anyhow::{Context, Result};
|
||||
use libp2p::core::upgrade;
|
||||
use libp2p::swarm::NegotiatedSubstream;
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub mod alice;
|
||||
pub mod bob;
|
||||
|
||||
pub const BUF_SIZE: usize = 1024 * 1024;
|
||||
|
||||
pub mod protocol {
|
||||
use futures::future;
|
||||
use libp2p::core::upgrade::{from_fn, FromFnUpgrade};
|
||||
use libp2p::core::Endpoint;
|
||||
use libp2p::swarm::NegotiatedSubstream;
|
||||
use void::Void;
|
||||
|
||||
pub fn new() -> SwapSetup {
|
||||
from_fn(
|
||||
b"/comit/xmr/btc/swap_setup/1.0.0",
|
||||
Box::new(|socket, _| future::ready(Ok(socket))),
|
||||
)
|
||||
}
|
||||
|
||||
pub type SwapSetup = FromFnUpgrade<
|
||||
&'static [u8],
|
||||
Box<
|
||||
dyn Fn(
|
||||
NegotiatedSubstream,
|
||||
Endpoint,
|
||||
) -> future::Ready<Result<NegotiatedSubstream, Void>>
|
||||
+ Send
|
||||
+ 'static,
|
||||
>,
|
||||
>;
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq)]
|
||||
pub struct BlockchainNetwork {
|
||||
#[serde(with = "crate::bitcoin::network")]
|
||||
pub bitcoin: bitcoin::Network,
|
||||
#[serde(with = "crate::monero::network")]
|
||||
pub monero: monero::Network,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct SpotPriceRequest {
|
||||
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
|
||||
pub btc: bitcoin::Amount,
|
||||
pub blockchain_network: BlockchainNetwork,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub enum SpotPriceResponse {
|
||||
Xmr(monero::Amount),
|
||||
Error(SpotPriceError),
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub enum SpotPriceError {
|
||||
NoSwapsAccepted,
|
||||
AmountBelowMinimum {
|
||||
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
|
||||
min: bitcoin::Amount,
|
||||
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
|
||||
buy: bitcoin::Amount,
|
||||
},
|
||||
AmountAboveMaximum {
|
||||
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
|
||||
max: bitcoin::Amount,
|
||||
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
|
||||
buy: bitcoin::Amount,
|
||||
},
|
||||
BalanceTooLow {
|
||||
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
|
||||
buy: bitcoin::Amount,
|
||||
},
|
||||
BlockchainNetworkMismatch {
|
||||
cli: BlockchainNetwork,
|
||||
asb: BlockchainNetwork,
|
||||
},
|
||||
/// To be used for errors that cannot be explained on the CLI side (e.g.
|
||||
/// rate update problems on the seller side)
|
||||
Other,
|
||||
}
|
||||
|
||||
pub async fn read_cbor_message<T>(substream: &mut NegotiatedSubstream) -> Result<T>
|
||||
where
|
||||
T: DeserializeOwned,
|
||||
{
|
||||
let bytes = upgrade::read_one(substream, BUF_SIZE)
|
||||
.await
|
||||
.context("Failed to read length-prefixed message from stream")?;
|
||||
let mut de = serde_cbor::Deserializer::from_slice(&bytes);
|
||||
let message =
|
||||
T::deserialize(&mut de).context("Failed to deserialize bytes into message using CBOR")?;
|
||||
|
||||
Ok(message)
|
||||
}
|
||||
|
||||
pub async fn write_cbor_message<T>(substream: &mut NegotiatedSubstream, message: T) -> Result<()>
|
||||
where
|
||||
T: Serialize,
|
||||
{
|
||||
let bytes =
|
||||
serde_cbor::to_vec(&message).context("Failed to serialize message as bytes using CBOR")?;
|
||||
upgrade::write_with_len_prefix(substream, &bytes)
|
||||
.await
|
||||
.context("Failed to write bytes as length-prefixed message")?;
|
||||
|
||||
Ok(())
|
||||
}
|
@ -0,0 +1,513 @@
|
||||
use crate::asb::LatestRate;
|
||||
use crate::network::swap_setup;
|
||||
use crate::network::swap_setup::{
|
||||
protocol, BlockchainNetwork, SpotPriceError, SpotPriceRequest, SpotPriceResponse,
|
||||
};
|
||||
use crate::protocol::alice::{State0, State3};
|
||||
use crate::protocol::{Message0, Message2, Message4};
|
||||
use crate::{asb, bitcoin, env, monero};
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use futures::future::{BoxFuture, OptionFuture};
|
||||
use futures::FutureExt;
|
||||
use libp2p::core::connection::ConnectionId;
|
||||
use libp2p::core::upgrade;
|
||||
use libp2p::swarm::{
|
||||
KeepAlive, NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
|
||||
ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol,
|
||||
};
|
||||
use libp2p::{Multiaddr, PeerId};
|
||||
use std::collections::VecDeque;
|
||||
use std::fmt::Debug;
|
||||
use std::task::Poll;
|
||||
use std::time::{Duration, Instant};
|
||||
use uuid::Uuid;
|
||||
use void::Void;
|
||||
|
||||
#[derive(Debug)]
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
pub enum OutEvent {
|
||||
Initiated {
|
||||
send_wallet_snapshot: bmrng::RequestReceiver<bitcoin::Amount, WalletSnapshot>,
|
||||
},
|
||||
Completed {
|
||||
peer_id: PeerId,
|
||||
swap_id: Uuid,
|
||||
state3: State3,
|
||||
},
|
||||
Error {
|
||||
peer_id: PeerId,
|
||||
error: anyhow::Error,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct WalletSnapshot {
|
||||
balance: monero::Amount,
|
||||
lock_fee: monero::Amount,
|
||||
|
||||
// TODO: Consider using the same address for punish and redeem (they are mutually exclusive, so
|
||||
// effectively the address will only be used once)
|
||||
redeem_address: bitcoin::Address,
|
||||
punish_address: bitcoin::Address,
|
||||
|
||||
redeem_fee: bitcoin::Amount,
|
||||
punish_fee: bitcoin::Amount,
|
||||
}
|
||||
|
||||
impl WalletSnapshot {
|
||||
pub async fn capture(
|
||||
bitcoin_wallet: &bitcoin::Wallet,
|
||||
monero_wallet: &monero::Wallet,
|
||||
transfer_amount: bitcoin::Amount,
|
||||
) -> Result<Self> {
|
||||
let balance = monero_wallet.get_balance().await?;
|
||||
let redeem_address = bitcoin_wallet.new_address().await?;
|
||||
let punish_address = bitcoin_wallet.new_address().await?;
|
||||
let redeem_fee = bitcoin_wallet
|
||||
.estimate_fee(bitcoin::TxRedeem::weight(), transfer_amount)
|
||||
.await?;
|
||||
let punish_fee = bitcoin_wallet
|
||||
.estimate_fee(bitcoin::TxPunish::weight(), transfer_amount)
|
||||
.await?;
|
||||
|
||||
Ok(Self {
|
||||
balance,
|
||||
lock_fee: monero::MONERO_FEE,
|
||||
redeem_address,
|
||||
punish_address,
|
||||
redeem_fee,
|
||||
punish_fee,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl From<OutEvent> for asb::OutEvent {
|
||||
fn from(event: OutEvent) -> Self {
|
||||
match event {
|
||||
OutEvent::Initiated {
|
||||
send_wallet_snapshot,
|
||||
} => asb::OutEvent::SwapSetupInitiated {
|
||||
send_wallet_snapshot,
|
||||
},
|
||||
OutEvent::Completed {
|
||||
peer_id: bob_peer_id,
|
||||
swap_id,
|
||||
state3,
|
||||
} => asb::OutEvent::SwapSetupCompleted {
|
||||
peer_id: bob_peer_id,
|
||||
swap_id,
|
||||
state3: Box::new(state3),
|
||||
},
|
||||
OutEvent::Error { peer_id, error } => asb::OutEvent::Failure {
|
||||
peer: peer_id,
|
||||
error: anyhow!(error),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct Behaviour<LR> {
|
||||
events: VecDeque<OutEvent>,
|
||||
min_buy: bitcoin::Amount,
|
||||
max_buy: bitcoin::Amount,
|
||||
env_config: env::Config,
|
||||
|
||||
latest_rate: LR,
|
||||
resume_only: bool,
|
||||
}
|
||||
|
||||
impl<LR> Behaviour<LR> {
|
||||
pub fn new(
|
||||
min_buy: bitcoin::Amount,
|
||||
max_buy: bitcoin::Amount,
|
||||
env_config: env::Config,
|
||||
latest_rate: LR,
|
||||
resume_only: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
events: Default::default(),
|
||||
min_buy,
|
||||
max_buy,
|
||||
env_config,
|
||||
latest_rate,
|
||||
resume_only,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<LR> NetworkBehaviour for Behaviour<LR>
|
||||
where
|
||||
LR: LatestRate + Send + 'static + Clone,
|
||||
{
|
||||
type ProtocolsHandler = Handler<LR>;
|
||||
type OutEvent = OutEvent;
|
||||
|
||||
fn new_handler(&mut self) -> Self::ProtocolsHandler {
|
||||
Handler::new(
|
||||
self.min_buy,
|
||||
self.max_buy,
|
||||
self.env_config,
|
||||
self.latest_rate.clone(),
|
||||
self.resume_only,
|
||||
)
|
||||
}
|
||||
|
||||
fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
|
||||
Vec::new()
|
||||
}
|
||||
|
||||
fn inject_connected(&mut self, _: &PeerId) {}
|
||||
|
||||
fn inject_disconnected(&mut self, _: &PeerId) {}
|
||||
|
||||
fn inject_event(&mut self, peer_id: PeerId, _: ConnectionId, event: HandlerOutEvent) {
|
||||
match event {
|
||||
HandlerOutEvent::Initiated(send_wallet_snapshot) => {
|
||||
self.events.push_back(OutEvent::Initiated {
|
||||
send_wallet_snapshot,
|
||||
})
|
||||
}
|
||||
HandlerOutEvent::Completed(Ok((swap_id, state3))) => {
|
||||
self.events.push_back(OutEvent::Completed {
|
||||
peer_id,
|
||||
swap_id,
|
||||
state3,
|
||||
})
|
||||
}
|
||||
HandlerOutEvent::Completed(Err(error)) => {
|
||||
self.events.push_back(OutEvent::Error { peer_id, error })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn poll(
|
||||
&mut self,
|
||||
_cx: &mut std::task::Context<'_>,
|
||||
_params: &mut impl PollParameters,
|
||||
) -> Poll<NetworkBehaviourAction<(), Self::OutEvent>> {
|
||||
if let Some(event) = self.events.pop_front() {
|
||||
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
type InboundStream = BoxFuture<'static, Result<(Uuid, State3)>>;
|
||||
|
||||
pub struct Handler<LR> {
|
||||
inbound_stream: OptionFuture<InboundStream>,
|
||||
events: VecDeque<HandlerOutEvent>,
|
||||
|
||||
min_buy: bitcoin::Amount,
|
||||
max_buy: bitcoin::Amount,
|
||||
env_config: env::Config,
|
||||
|
||||
latest_rate: LR,
|
||||
resume_only: bool,
|
||||
|
||||
timeout: Duration,
|
||||
keep_alive: KeepAlive,
|
||||
}
|
||||
|
||||
impl<LR> Handler<LR> {
|
||||
fn new(
|
||||
min_buy: bitcoin::Amount,
|
||||
max_buy: bitcoin::Amount,
|
||||
env_config: env::Config,
|
||||
latest_rate: LR,
|
||||
resume_only: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
inbound_stream: OptionFuture::from(None),
|
||||
events: Default::default(),
|
||||
min_buy,
|
||||
max_buy,
|
||||
env_config,
|
||||
latest_rate,
|
||||
resume_only,
|
||||
timeout: Duration::from_secs(60),
|
||||
keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(5)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
pub enum HandlerOutEvent {
|
||||
Initiated(bmrng::RequestReceiver<bitcoin::Amount, WalletSnapshot>),
|
||||
Completed(Result<(Uuid, State3)>),
|
||||
}
|
||||
|
||||
impl<LR> ProtocolsHandler for Handler<LR>
|
||||
where
|
||||
LR: LatestRate + Send + 'static,
|
||||
{
|
||||
type InEvent = ();
|
||||
type OutEvent = HandlerOutEvent;
|
||||
type Error = Error;
|
||||
type InboundProtocol = protocol::SwapSetup;
|
||||
type OutboundProtocol = upgrade::DeniedUpgrade;
|
||||
type InboundOpenInfo = ();
|
||||
type OutboundOpenInfo = ();
|
||||
|
||||
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
|
||||
SubstreamProtocol::new(protocol::new(), ())
|
||||
}
|
||||
|
||||
fn inject_fully_negotiated_inbound(
|
||||
&mut self,
|
||||
mut substream: NegotiatedSubstream,
|
||||
_: Self::InboundOpenInfo,
|
||||
) {
|
||||
self.keep_alive = KeepAlive::Yes;
|
||||
|
||||
let (sender, receiver) = bmrng::channel_with_timeout::<bitcoin::Amount, WalletSnapshot>(
|
||||
1,
|
||||
Duration::from_secs(5),
|
||||
);
|
||||
let resume_only = self.resume_only;
|
||||
let min_buy = self.min_buy;
|
||||
let max_buy = self.max_buy;
|
||||
let latest_rate = self.latest_rate.latest_rate();
|
||||
let env_config = self.env_config;
|
||||
|
||||
let protocol = tokio::time::timeout(self.timeout, async move {
|
||||
let request = swap_setup::read_cbor_message::<SpotPriceRequest>(&mut substream)
|
||||
.await
|
||||
.context("Failed to read spot price request")?;
|
||||
|
||||
let wallet_snapshot = sender
|
||||
.send_receive(request.btc)
|
||||
.await
|
||||
.context("Failed to receive wallet snapshot")?;
|
||||
|
||||
// wrap all of these into another future so we can `return` from all the
|
||||
// different blocks
|
||||
let validate = async {
|
||||
if resume_only {
|
||||
return Err(Error::ResumeOnlyMode);
|
||||
};
|
||||
|
||||
let blockchain_network = BlockchainNetwork {
|
||||
bitcoin: env_config.bitcoin_network,
|
||||
monero: env_config.monero_network,
|
||||
};
|
||||
|
||||
if request.blockchain_network != blockchain_network {
|
||||
return Err(Error::BlockchainNetworkMismatch {
|
||||
cli: request.blockchain_network,
|
||||
asb: blockchain_network,
|
||||
});
|
||||
}
|
||||
|
||||
let btc = request.btc;
|
||||
|
||||
if btc < min_buy {
|
||||
return Err(Error::AmountBelowMinimum {
|
||||
min: min_buy,
|
||||
buy: btc,
|
||||
});
|
||||
}
|
||||
|
||||
if btc > max_buy {
|
||||
return Err(Error::AmountAboveMaximum {
|
||||
max: max_buy,
|
||||
buy: btc,
|
||||
});
|
||||
}
|
||||
|
||||
let rate = latest_rate.map_err(|e| Error::LatestRateFetchFailed(Box::new(e)))?;
|
||||
let xmr = rate
|
||||
.sell_quote(btc)
|
||||
.map_err(Error::SellQuoteCalculationFailed)?;
|
||||
|
||||
if wallet_snapshot.balance < xmr + wallet_snapshot.lock_fee {
|
||||
return Err(Error::BalanceTooLow {
|
||||
balance: wallet_snapshot.balance,
|
||||
buy: btc,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(xmr)
|
||||
};
|
||||
|
||||
let result = validate.await;
|
||||
|
||||
swap_setup::write_cbor_message(
|
||||
&mut substream,
|
||||
SpotPriceResponse::from_result_ref(&result),
|
||||
)
|
||||
.await
|
||||
.context("Failed to write spot price response")?;
|
||||
|
||||
let xmr = result?;
|
||||
|
||||
let state0 = State0::new(
|
||||
request.btc,
|
||||
xmr,
|
||||
env_config,
|
||||
wallet_snapshot.redeem_address,
|
||||
wallet_snapshot.punish_address,
|
||||
wallet_snapshot.redeem_fee,
|
||||
wallet_snapshot.punish_fee,
|
||||
&mut rand::thread_rng(),
|
||||
);
|
||||
|
||||
let message0 = swap_setup::read_cbor_message::<Message0>(&mut substream)
|
||||
.await
|
||||
.context("Failed to read message0")?;
|
||||
let (swap_id, state1) = state0
|
||||
.receive(message0)
|
||||
.context("Failed to transition state0 -> state1 using message0")?;
|
||||
|
||||
swap_setup::write_cbor_message(&mut substream, state1.next_message())
|
||||
.await
|
||||
.context("Failed to send message1")?;
|
||||
|
||||
let message2 = swap_setup::read_cbor_message::<Message2>(&mut substream)
|
||||
.await
|
||||
.context("Failed to read message2")?;
|
||||
let state2 = state1
|
||||
.receive(message2)
|
||||
.context("Failed to transition state1 -> state2 using message2")?;
|
||||
|
||||
swap_setup::write_cbor_message(&mut substream, state2.next_message())
|
||||
.await
|
||||
.context("Failed to send message3")?;
|
||||
|
||||
let message4 = swap_setup::read_cbor_message::<Message4>(&mut substream)
|
||||
.await
|
||||
.context("Failed to read message4")?;
|
||||
let state3 = state2
|
||||
.receive(message4)
|
||||
.context("Failed to transition state2 -> state3 using message4")?;
|
||||
|
||||
Ok((swap_id, state3))
|
||||
});
|
||||
|
||||
let max_seconds = self.timeout.as_secs();
|
||||
self.inbound_stream = OptionFuture::from(Some(
|
||||
async move {
|
||||
protocol.await.with_context(|| {
|
||||
format!("Failed to complete execution setup within {}s", max_seconds)
|
||||
})?
|
||||
}
|
||||
.boxed(),
|
||||
));
|
||||
|
||||
self.events.push_back(HandlerOutEvent::Initiated(receiver));
|
||||
}
|
||||
|
||||
fn inject_fully_negotiated_outbound(&mut self, _: Void, _: Self::OutboundOpenInfo) {
|
||||
unreachable!("Alice does not support outbound in the handler")
|
||||
}
|
||||
|
||||
fn inject_event(&mut self, _: Self::InEvent) {
|
||||
unreachable!("Alice does not receive events from the Behaviour in the handler")
|
||||
}
|
||||
|
||||
fn inject_dial_upgrade_error(
|
||||
&mut self,
|
||||
_: Self::OutboundOpenInfo,
|
||||
_: ProtocolsHandlerUpgrErr<Void>,
|
||||
) {
|
||||
unreachable!("Alice does not dial")
|
||||
}
|
||||
|
||||
fn connection_keep_alive(&self) -> KeepAlive {
|
||||
self.keep_alive
|
||||
}
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
fn poll(
|
||||
&mut self,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> Poll<
|
||||
ProtocolsHandlerEvent<
|
||||
Self::OutboundProtocol,
|
||||
Self::OutboundOpenInfo,
|
||||
Self::OutEvent,
|
||||
Self::Error,
|
||||
>,
|
||||
> {
|
||||
if let Some(event) = self.events.pop_front() {
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Custom(event));
|
||||
}
|
||||
|
||||
if let Some(result) = futures::ready!(self.inbound_stream.poll_unpin(cx)) {
|
||||
self.keep_alive = KeepAlive::No;
|
||||
self.inbound_stream = OptionFuture::from(None);
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Custom(HandlerOutEvent::Completed(
|
||||
result,
|
||||
)));
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
impl SpotPriceResponse {
|
||||
pub fn from_result_ref(result: &Result<monero::Amount, Error>) -> Self {
|
||||
match result {
|
||||
Ok(amount) => SpotPriceResponse::Xmr(*amount),
|
||||
Err(error) => SpotPriceResponse::Error(error.to_error_response()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum Error {
|
||||
#[error("ASB is running in resume-only mode")]
|
||||
ResumeOnlyMode,
|
||||
#[error("Amount {buy} below minimum {min}")]
|
||||
AmountBelowMinimum {
|
||||
min: bitcoin::Amount,
|
||||
buy: bitcoin::Amount,
|
||||
},
|
||||
#[error("Amount {buy} above maximum {max}")]
|
||||
AmountAboveMaximum {
|
||||
max: bitcoin::Amount,
|
||||
buy: bitcoin::Amount,
|
||||
},
|
||||
#[error("Balance {balance} too low to fulfill swapping {buy}")]
|
||||
BalanceTooLow {
|
||||
balance: monero::Amount,
|
||||
buy: bitcoin::Amount,
|
||||
},
|
||||
#[error("Failed to fetch latest rate")]
|
||||
LatestRateFetchFailed(#[source] Box<dyn std::error::Error + Send + Sync + 'static>),
|
||||
#[error("Failed to calculate quote")]
|
||||
SellQuoteCalculationFailed(#[source] anyhow::Error),
|
||||
#[error("Blockchain networks did not match, we are on {asb:?}, but request from {cli:?}")]
|
||||
BlockchainNetworkMismatch {
|
||||
cli: BlockchainNetwork,
|
||||
asb: BlockchainNetwork,
|
||||
},
|
||||
}
|
||||
|
||||
impl Error {
|
||||
pub fn to_error_response(&self) -> SpotPriceError {
|
||||
match self {
|
||||
Error::ResumeOnlyMode => SpotPriceError::NoSwapsAccepted,
|
||||
Error::AmountBelowMinimum { min, buy } => SpotPriceError::AmountBelowMinimum {
|
||||
min: *min,
|
||||
buy: *buy,
|
||||
},
|
||||
Error::AmountAboveMaximum { max, buy } => SpotPriceError::AmountAboveMaximum {
|
||||
max: *max,
|
||||
buy: *buy,
|
||||
},
|
||||
Error::BalanceTooLow { buy, .. } => SpotPriceError::BalanceTooLow { buy: *buy },
|
||||
Error::BlockchainNetworkMismatch { cli, asb } => {
|
||||
SpotPriceError::BlockchainNetworkMismatch {
|
||||
cli: *cli,
|
||||
asb: *asb,
|
||||
}
|
||||
}
|
||||
Error::LatestRateFetchFailed(_) | Error::SellQuoteCalculationFailed(_) => {
|
||||
SpotPriceError::Other
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,307 @@
|
||||
use crate::network::swap_setup::{
|
||||
protocol, read_cbor_message, write_cbor_message, BlockchainNetwork, SpotPriceError,
|
||||
SpotPriceRequest, SpotPriceResponse,
|
||||
};
|
||||
use crate::protocol::bob::{State0, State2};
|
||||
use crate::protocol::{Message1, Message3};
|
||||
use crate::{bitcoin, cli, env, monero};
|
||||
use anyhow::Result;
|
||||
use futures::future::{BoxFuture, OptionFuture};
|
||||
use futures::FutureExt;
|
||||
use libp2p::core::connection::ConnectionId;
|
||||
use libp2p::core::upgrade;
|
||||
use libp2p::swarm::{
|
||||
KeepAlive, NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler,
|
||||
PollParameters, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr,
|
||||
SubstreamProtocol,
|
||||
};
|
||||
use libp2p::{Multiaddr, PeerId};
|
||||
use std::collections::VecDeque;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::{Duration, Instant};
|
||||
use uuid::Uuid;
|
||||
use void::Void;
|
||||
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct Behaviour {
|
||||
env_config: env::Config,
|
||||
bitcoin_wallet: Arc<bitcoin::Wallet>,
|
||||
new_swaps: VecDeque<(PeerId, NewSwap)>,
|
||||
completed_swaps: VecDeque<(PeerId, Completed)>,
|
||||
}
|
||||
|
||||
impl Behaviour {
|
||||
pub fn new(env_config: env::Config, bitcoin_wallet: Arc<bitcoin::Wallet>) -> Self {
|
||||
Self {
|
||||
env_config,
|
||||
bitcoin_wallet,
|
||||
new_swaps: VecDeque::default(),
|
||||
completed_swaps: VecDeque::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start(&mut self, alice: PeerId, swap: NewSwap) {
|
||||
self.new_swaps.push_back((alice, swap))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Completed> for cli::OutEvent {
|
||||
fn from(completed: Completed) -> Self {
|
||||
cli::OutEvent::SwapSetupCompleted(Box::new(completed.0))
|
||||
}
|
||||
}
|
||||
|
||||
impl NetworkBehaviour for Behaviour {
|
||||
type ProtocolsHandler = Handler;
|
||||
type OutEvent = Completed;
|
||||
|
||||
fn new_handler(&mut self) -> Self::ProtocolsHandler {
|
||||
Handler::new(self.env_config, self.bitcoin_wallet.clone())
|
||||
}
|
||||
|
||||
fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
|
||||
Vec::new()
|
||||
}
|
||||
|
||||
fn inject_connected(&mut self, _: &PeerId) {}
|
||||
|
||||
fn inject_disconnected(&mut self, _: &PeerId) {}
|
||||
|
||||
fn inject_event(&mut self, peer: PeerId, _: ConnectionId, completed: Completed) {
|
||||
self.completed_swaps.push_back((peer, completed));
|
||||
}
|
||||
|
||||
fn poll(
|
||||
&mut self,
|
||||
_cx: &mut Context<'_>,
|
||||
_params: &mut impl PollParameters,
|
||||
) -> Poll<NetworkBehaviourAction<NewSwap, Self::OutEvent>> {
|
||||
if let Some((_, event)) = self.completed_swaps.pop_front() {
|
||||
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
|
||||
}
|
||||
|
||||
if let Some((peer, event)) = self.new_swaps.pop_front() {
|
||||
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
|
||||
peer_id: peer,
|
||||
handler: NotifyHandler::Any,
|
||||
event,
|
||||
});
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
type OutboundStream = BoxFuture<'static, Result<State2>>;
|
||||
|
||||
pub struct Handler {
|
||||
outbound_stream: OptionFuture<OutboundStream>,
|
||||
env_config: env::Config,
|
||||
timeout: Duration,
|
||||
new_swaps: VecDeque<NewSwap>,
|
||||
bitcoin_wallet: Arc<bitcoin::Wallet>,
|
||||
keep_alive: KeepAlive,
|
||||
}
|
||||
|
||||
impl Handler {
|
||||
fn new(env_config: env::Config, bitcoin_wallet: Arc<bitcoin::Wallet>) -> Self {
|
||||
Self {
|
||||
env_config,
|
||||
outbound_stream: OptionFuture::from(None),
|
||||
timeout: Duration::from_secs(60),
|
||||
new_swaps: VecDeque::default(),
|
||||
bitcoin_wallet,
|
||||
keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(5)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct NewSwap {
|
||||
pub swap_id: Uuid,
|
||||
pub btc: bitcoin::Amount,
|
||||
pub tx_refund_fee: bitcoin::Amount,
|
||||
pub tx_cancel_fee: bitcoin::Amount,
|
||||
pub bitcoin_refund_address: bitcoin::Address,
|
||||
}
|
||||
|
||||
pub struct Completed(Result<State2>);
|
||||
|
||||
impl ProtocolsHandler for Handler {
|
||||
type InEvent = NewSwap;
|
||||
type OutEvent = Completed;
|
||||
type Error = Void;
|
||||
type InboundProtocol = upgrade::DeniedUpgrade;
|
||||
type OutboundProtocol = protocol::SwapSetup;
|
||||
type InboundOpenInfo = ();
|
||||
type OutboundOpenInfo = NewSwap;
|
||||
|
||||
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
|
||||
SubstreamProtocol::new(upgrade::DeniedUpgrade, ())
|
||||
}
|
||||
|
||||
fn inject_fully_negotiated_inbound(&mut self, _: Void, _: Self::InboundOpenInfo) {
|
||||
unreachable!("Bob does not support inbound substreams")
|
||||
}
|
||||
|
||||
fn inject_fully_negotiated_outbound(
|
||||
&mut self,
|
||||
mut substream: NegotiatedSubstream,
|
||||
info: Self::OutboundOpenInfo,
|
||||
) {
|
||||
let bitcoin_wallet = self.bitcoin_wallet.clone();
|
||||
let env_config = self.env_config;
|
||||
|
||||
let protocol = tokio::time::timeout(self.timeout, async move {
|
||||
write_cbor_message(&mut substream, SpotPriceRequest {
|
||||
btc: info.btc,
|
||||
blockchain_network: BlockchainNetwork {
|
||||
bitcoin: env_config.bitcoin_network,
|
||||
monero: env_config.monero_network,
|
||||
},
|
||||
})
|
||||
.await?;
|
||||
|
||||
let xmr = Result::from(read_cbor_message::<SpotPriceResponse>(&mut substream).await?)?;
|
||||
|
||||
let state0 = State0::new(
|
||||
info.swap_id,
|
||||
&mut rand::thread_rng(),
|
||||
info.btc,
|
||||
xmr,
|
||||
env_config.bitcoin_cancel_timelock,
|
||||
env_config.bitcoin_punish_timelock,
|
||||
info.bitcoin_refund_address,
|
||||
env_config.monero_finality_confirmations,
|
||||
info.tx_refund_fee,
|
||||
info.tx_cancel_fee,
|
||||
);
|
||||
|
||||
write_cbor_message(&mut substream, state0.next_message()).await?;
|
||||
let message1 = read_cbor_message::<Message1>(&mut substream).await?;
|
||||
let state1 = state0.receive(bitcoin_wallet.as_ref(), message1).await?;
|
||||
|
||||
write_cbor_message(&mut substream, state1.next_message()).await?;
|
||||
let message3 = read_cbor_message::<Message3>(&mut substream).await?;
|
||||
let state2 = state1.receive(message3)?;
|
||||
|
||||
write_cbor_message(&mut substream, state2.next_message()).await?;
|
||||
|
||||
Ok(state2)
|
||||
});
|
||||
|
||||
let max_seconds = self.timeout.as_secs();
|
||||
self.outbound_stream = OptionFuture::from(Some(
|
||||
async move {
|
||||
protocol.await.map_err(|_| Error::Timeout {
|
||||
seconds: max_seconds,
|
||||
})?
|
||||
}
|
||||
.boxed(),
|
||||
));
|
||||
}
|
||||
|
||||
fn inject_event(&mut self, new_swap: Self::InEvent) {
|
||||
self.new_swaps.push_back(new_swap);
|
||||
}
|
||||
|
||||
fn inject_dial_upgrade_error(
|
||||
&mut self,
|
||||
_: Self::OutboundOpenInfo,
|
||||
_: ProtocolsHandlerUpgrErr<Void>,
|
||||
) {
|
||||
}
|
||||
|
||||
fn connection_keep_alive(&self) -> KeepAlive {
|
||||
self.keep_alive
|
||||
}
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
fn poll(
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<
|
||||
ProtocolsHandlerEvent<
|
||||
Self::OutboundProtocol,
|
||||
Self::OutboundOpenInfo,
|
||||
Self::OutEvent,
|
||||
Self::Error,
|
||||
>,
|
||||
> {
|
||||
if let Some(new_swap) = self.new_swaps.pop_front() {
|
||||
self.keep_alive = KeepAlive::Yes;
|
||||
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
|
||||
protocol: SubstreamProtocol::new(protocol::new(), new_swap),
|
||||
});
|
||||
}
|
||||
|
||||
if let Some(result) = futures::ready!(self.outbound_stream.poll_unpin(cx)) {
|
||||
self.keep_alive = KeepAlive::No;
|
||||
self.outbound_stream = OptionFuture::from(None);
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Custom(Completed(result)));
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
impl From<SpotPriceResponse> for Result<monero::Amount, Error> {
|
||||
fn from(response: SpotPriceResponse) -> Self {
|
||||
match response {
|
||||
SpotPriceResponse::Xmr(amount) => Ok(amount),
|
||||
SpotPriceResponse::Error(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, thiserror::Error, PartialEq)]
|
||||
pub enum Error {
|
||||
#[error("Seller currently does not accept incoming swap requests, please try again later")]
|
||||
NoSwapsAccepted,
|
||||
#[error("Seller refused to buy {buy} because the minimum configured buy limit is {min}")]
|
||||
AmountBelowMinimum {
|
||||
min: bitcoin::Amount,
|
||||
buy: bitcoin::Amount,
|
||||
},
|
||||
#[error("Seller refused to buy {buy} because the maximum configured buy limit is {max}")]
|
||||
AmountAboveMaximum {
|
||||
max: bitcoin::Amount,
|
||||
buy: bitcoin::Amount,
|
||||
},
|
||||
#[error("Seller's XMR balance is currently too low to fulfill the swap request to buy {buy}, please try again later")]
|
||||
BalanceTooLow { buy: bitcoin::Amount },
|
||||
|
||||
#[error("Seller blockchain network {asb:?} setup did not match your blockchain network setup {cli:?}")]
|
||||
BlockchainNetworkMismatch {
|
||||
cli: BlockchainNetwork,
|
||||
asb: BlockchainNetwork,
|
||||
},
|
||||
|
||||
#[error("Failed to complete swap setup within {seconds}s")]
|
||||
Timeout { seconds: u64 },
|
||||
|
||||
/// To be used for errors that cannot be explained on the CLI side (e.g.
|
||||
/// rate update problems on the seller side)
|
||||
#[error("Seller encountered a problem, please try again later.")]
|
||||
Other,
|
||||
}
|
||||
|
||||
impl From<SpotPriceError> for Error {
|
||||
fn from(error: SpotPriceError) -> Self {
|
||||
match error {
|
||||
SpotPriceError::NoSwapsAccepted => Error::NoSwapsAccepted,
|
||||
SpotPriceError::AmountBelowMinimum { min, buy } => {
|
||||
Error::AmountBelowMinimum { min, buy }
|
||||
}
|
||||
SpotPriceError::AmountAboveMaximum { max, buy } => {
|
||||
Error::AmountAboveMaximum { max, buy }
|
||||
}
|
||||
SpotPriceError::BalanceTooLow { buy } => Error::BalanceTooLow { buy },
|
||||
SpotPriceError::BlockchainNetworkMismatch { cli, asb } => {
|
||||
Error::BlockchainNetworkMismatch { cli, asb }
|
||||
}
|
||||
SpotPriceError::Other => Error::Other,
|
||||
}
|
||||
}
|
||||
}
|
@ -1,109 +0,0 @@
|
||||
use crate::network::cbor_request_response::BUF_SIZE;
|
||||
use crate::protocol::alice::{State0, State3};
|
||||
use crate::protocol::{alice, Message0, Message2, Message4};
|
||||
use anyhow::{Context, Error};
|
||||
use libp2p::PeerId;
|
||||
use libp2p_async_await::BehaviourOutEvent;
|
||||
use std::time::Duration;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum OutEvent {
|
||||
Done {
|
||||
bob_peer_id: PeerId,
|
||||
swap_id: Uuid,
|
||||
state3: State3,
|
||||
},
|
||||
Failure {
|
||||
peer: PeerId,
|
||||
error: Error,
|
||||
},
|
||||
}
|
||||
|
||||
impl From<BehaviourOutEvent<(PeerId, (Uuid, State3)), (), Error>> for OutEvent {
|
||||
fn from(event: BehaviourOutEvent<(PeerId, (Uuid, State3)), (), Error>) -> Self {
|
||||
match event {
|
||||
BehaviourOutEvent::Inbound(_, Ok((bob_peer_id, (swap_id, state3)))) => OutEvent::Done {
|
||||
bob_peer_id,
|
||||
swap_id,
|
||||
state3,
|
||||
},
|
||||
BehaviourOutEvent::Inbound(peer, Err(e)) => OutEvent::Failure { peer, error: e },
|
||||
BehaviourOutEvent::Outbound(..) => unreachable!("Alice only supports inbound"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(libp2p::NetworkBehaviour)]
|
||||
#[behaviour(out_event = "OutEvent", event_process = false)]
|
||||
pub struct Behaviour {
|
||||
inner: libp2p_async_await::Behaviour<(PeerId, (Uuid, State3)), (), anyhow::Error>,
|
||||
}
|
||||
|
||||
impl Default for Behaviour {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
inner: libp2p_async_await::Behaviour::new(b"/comit/xmr/btc/execution_setup/1.0.0"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Behaviour {
|
||||
pub fn run(&mut self, bob: PeerId, state0: State0) {
|
||||
self.inner.do_protocol_listener(bob, move |mut substream| {
|
||||
let protocol = async move {
|
||||
let message0 =
|
||||
serde_cbor::from_slice::<Message0>(&substream.read_message(BUF_SIZE).await?)
|
||||
.context("Failed to deserialize message0")?;
|
||||
let (swap_id, state1) = state0.receive(message0)?;
|
||||
|
||||
substream
|
||||
.write_message(
|
||||
&serde_cbor::to_vec(&state1.next_message())
|
||||
.context("Failed to serialize message1")?,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let message2 =
|
||||
serde_cbor::from_slice::<Message2>(&substream.read_message(BUF_SIZE).await?)
|
||||
.context("Failed to deserialize message2")?;
|
||||
let state2 = state1
|
||||
.receive(message2)
|
||||
.context("Failed to receive Message2")?;
|
||||
|
||||
substream
|
||||
.write_message(
|
||||
&serde_cbor::to_vec(&state2.next_message())
|
||||
.context("Failed to serialize message3")?,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let message4 =
|
||||
serde_cbor::from_slice::<Message4>(&substream.read_message(BUF_SIZE).await?)
|
||||
.context("Failed to deserialize message4")?;
|
||||
let state3 = state2.receive(message4)?;
|
||||
|
||||
Ok((bob, (swap_id, state3)))
|
||||
};
|
||||
|
||||
async move { tokio::time::timeout(Duration::from_secs(60), protocol).await? }
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
impl From<OutEvent> for alice::OutEvent {
|
||||
fn from(event: OutEvent) -> Self {
|
||||
match event {
|
||||
OutEvent::Done {
|
||||
bob_peer_id,
|
||||
state3,
|
||||
swap_id,
|
||||
} => Self::ExecutionSetupDone {
|
||||
bob_peer_id,
|
||||
state3: Box::new(state3),
|
||||
swap_id,
|
||||
},
|
||||
OutEvent::Failure { peer, error } => Self::Failure { peer, error },
|
||||
}
|
||||
}
|
||||
}
|
@ -1,825 +0,0 @@
|
||||
use crate::network::cbor_request_response::CborCodec;
|
||||
use crate::network::spot_price;
|
||||
use crate::network::spot_price::{BlockchainNetwork, SpotPriceProtocol};
|
||||
use crate::protocol::alice;
|
||||
use crate::protocol::alice::event_loop::LatestRate;
|
||||
use crate::{env, monero};
|
||||
use libp2p::request_response::{
|
||||
ProtocolSupport, RequestResponseConfig, RequestResponseEvent, RequestResponseMessage,
|
||||
ResponseChannel,
|
||||
};
|
||||
use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters};
|
||||
use libp2p::{NetworkBehaviour, PeerId};
|
||||
use std::collections::VecDeque;
|
||||
use std::fmt::Debug;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum OutEvent {
|
||||
ExecutionSetupParams {
|
||||
peer: PeerId,
|
||||
btc: bitcoin::Amount,
|
||||
xmr: monero::Amount,
|
||||
},
|
||||
Error {
|
||||
peer: PeerId,
|
||||
error: Error,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(NetworkBehaviour)]
|
||||
#[behaviour(out_event = "OutEvent", poll_method = "poll", event_process = true)]
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct Behaviour<LR>
|
||||
where
|
||||
LR: LatestRate + Send + 'static,
|
||||
{
|
||||
behaviour: spot_price::Behaviour,
|
||||
|
||||
#[behaviour(ignore)]
|
||||
events: VecDeque<OutEvent>,
|
||||
|
||||
#[behaviour(ignore)]
|
||||
balance: monero::Amount,
|
||||
#[behaviour(ignore)]
|
||||
lock_fee: monero::Amount,
|
||||
#[behaviour(ignore)]
|
||||
min_buy: bitcoin::Amount,
|
||||
#[behaviour(ignore)]
|
||||
max_buy: bitcoin::Amount,
|
||||
#[behaviour(ignore)]
|
||||
env_config: env::Config,
|
||||
#[behaviour(ignore)]
|
||||
latest_rate: LR,
|
||||
#[behaviour(ignore)]
|
||||
resume_only: bool,
|
||||
}
|
||||
|
||||
/// Behaviour that handles spot prices.
|
||||
/// All the logic how to react to a spot price request is contained here, events
|
||||
/// reporting the successful handling of a spot price request or a failure are
|
||||
/// bubbled up to the parent behaviour.
|
||||
impl<LR> Behaviour<LR>
|
||||
where
|
||||
LR: LatestRate + Send + 'static,
|
||||
{
|
||||
pub fn new(
|
||||
balance: monero::Amount,
|
||||
lock_fee: monero::Amount,
|
||||
min_buy: bitcoin::Amount,
|
||||
max_buy: bitcoin::Amount,
|
||||
env_config: env::Config,
|
||||
latest_rate: LR,
|
||||
resume_only: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
behaviour: spot_price::Behaviour::new(
|
||||
CborCodec::default(),
|
||||
vec![(SpotPriceProtocol, ProtocolSupport::Inbound)],
|
||||
RequestResponseConfig::default(),
|
||||
),
|
||||
events: Default::default(),
|
||||
balance,
|
||||
lock_fee,
|
||||
min_buy,
|
||||
max_buy,
|
||||
env_config,
|
||||
latest_rate,
|
||||
resume_only,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_balance(&mut self, balance: monero::Amount) {
|
||||
self.balance = balance;
|
||||
}
|
||||
|
||||
fn decline(
|
||||
&mut self,
|
||||
peer: PeerId,
|
||||
channel: ResponseChannel<spot_price::Response>,
|
||||
error: Error,
|
||||
) {
|
||||
if self
|
||||
.behaviour
|
||||
.send_response(
|
||||
channel,
|
||||
spot_price::Response::Error(error.to_error_response()),
|
||||
)
|
||||
.is_err()
|
||||
{
|
||||
tracing::debug!(%peer, "Unable to send error response for spot price request");
|
||||
}
|
||||
|
||||
self.events.push_back(OutEvent::Error { peer, error });
|
||||
}
|
||||
|
||||
fn poll<BIE>(
|
||||
&mut self,
|
||||
_cx: &mut Context<'_>,
|
||||
_params: &mut impl PollParameters,
|
||||
) -> Poll<NetworkBehaviourAction<BIE, OutEvent>> {
|
||||
if let Some(event) = self.events.pop_front() {
|
||||
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
|
||||
}
|
||||
|
||||
// We trust in libp2p to poll us.
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
impl<LR> NetworkBehaviourEventProcess<spot_price::OutEvent> for Behaviour<LR>
|
||||
where
|
||||
LR: LatestRate + Send + 'static,
|
||||
{
|
||||
fn inject_event(&mut self, event: spot_price::OutEvent) {
|
||||
let (peer, message) = match event {
|
||||
RequestResponseEvent::Message { peer, message } => (peer, message),
|
||||
RequestResponseEvent::OutboundFailure { peer, error, .. } => {
|
||||
tracing::error!(%peer, "Failure sending spot price response: {:#}", error);
|
||||
return;
|
||||
}
|
||||
RequestResponseEvent::InboundFailure { peer, error, .. } => {
|
||||
tracing::warn!(%peer, "Inbound failure when handling spot price request: {:#}", error);
|
||||
return;
|
||||
}
|
||||
RequestResponseEvent::ResponseSent { peer, .. } => {
|
||||
tracing::debug!(%peer, "Spot price response sent");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let (request, channel) = match message {
|
||||
RequestResponseMessage::Request {
|
||||
request, channel, ..
|
||||
} => (request, channel),
|
||||
RequestResponseMessage::Response { .. } => {
|
||||
tracing::error!("Unexpected message");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let blockchain_network = BlockchainNetwork {
|
||||
bitcoin: self.env_config.bitcoin_network,
|
||||
monero: self.env_config.monero_network,
|
||||
};
|
||||
|
||||
if request.blockchain_network != blockchain_network {
|
||||
self.decline(peer, channel, Error::BlockchainNetworkMismatch {
|
||||
cli: request.blockchain_network,
|
||||
asb: blockchain_network,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
if self.resume_only {
|
||||
self.decline(peer, channel, Error::ResumeOnlyMode);
|
||||
return;
|
||||
}
|
||||
|
||||
let btc = request.btc;
|
||||
|
||||
if btc < self.min_buy {
|
||||
self.decline(peer, channel, Error::AmountBelowMinimum {
|
||||
min: self.min_buy,
|
||||
buy: btc,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
if btc > self.max_buy {
|
||||
self.decline(peer, channel, Error::AmountAboveMaximum {
|
||||
max: self.max_buy,
|
||||
buy: btc,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
let rate = match self.latest_rate.latest_rate() {
|
||||
Ok(rate) => rate,
|
||||
Err(e) => {
|
||||
self.decline(peer, channel, Error::LatestRateFetchFailed(Box::new(e)));
|
||||
return;
|
||||
}
|
||||
};
|
||||
let xmr = match rate.sell_quote(btc) {
|
||||
Ok(xmr) => xmr,
|
||||
Err(e) => {
|
||||
self.decline(peer, channel, Error::SellQuoteCalculationFailed(e));
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let xmr_balance = self.balance;
|
||||
let xmr_lock_fees = self.lock_fee;
|
||||
|
||||
if xmr_balance < xmr + xmr_lock_fees {
|
||||
self.decline(peer, channel, Error::BalanceTooLow {
|
||||
balance: xmr_balance,
|
||||
buy: btc,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
if self
|
||||
.behaviour
|
||||
.send_response(channel, spot_price::Response::Xmr(xmr))
|
||||
.is_err()
|
||||
{
|
||||
tracing::error!(%peer, "Failed to send spot price response of {} for {}", xmr, btc)
|
||||
}
|
||||
|
||||
self.events
|
||||
.push_back(OutEvent::ExecutionSetupParams { peer, btc, xmr });
|
||||
}
|
||||
}
|
||||
|
||||
impl From<OutEvent> for alice::OutEvent {
|
||||
fn from(event: OutEvent) -> Self {
|
||||
match event {
|
||||
OutEvent::ExecutionSetupParams { peer, btc, xmr } => {
|
||||
Self::ExecutionSetupStart { peer, btc, xmr }
|
||||
}
|
||||
OutEvent::Error { peer, error } => Self::SwapRequestDeclined { peer, error },
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum Error {
|
||||
#[error("ASB is running in resume-only mode")]
|
||||
ResumeOnlyMode,
|
||||
#[error("Amount {buy} below minimum {min}")]
|
||||
AmountBelowMinimum {
|
||||
min: bitcoin::Amount,
|
||||
buy: bitcoin::Amount,
|
||||
},
|
||||
#[error("Amount {buy} above maximum {max}")]
|
||||
AmountAboveMaximum {
|
||||
max: bitcoin::Amount,
|
||||
buy: bitcoin::Amount,
|
||||
},
|
||||
#[error("Balance {balance} too low to fulfill swapping {buy}")]
|
||||
BalanceTooLow {
|
||||
balance: monero::Amount,
|
||||
buy: bitcoin::Amount,
|
||||
},
|
||||
#[error("Failed to fetch latest rate")]
|
||||
LatestRateFetchFailed(#[source] Box<dyn std::error::Error + Send + 'static>),
|
||||
#[error("Failed to calculate quote: {0}")]
|
||||
SellQuoteCalculationFailed(#[source] anyhow::Error),
|
||||
#[error("Blockchain networks did not match, we are on {asb:?}, but request from {cli:?}")]
|
||||
BlockchainNetworkMismatch {
|
||||
cli: spot_price::BlockchainNetwork,
|
||||
asb: spot_price::BlockchainNetwork,
|
||||
},
|
||||
}
|
||||
|
||||
impl Error {
|
||||
pub fn to_error_response(&self) -> spot_price::Error {
|
||||
match self {
|
||||
Error::ResumeOnlyMode => spot_price::Error::NoSwapsAccepted,
|
||||
Error::AmountBelowMinimum { min, buy } => spot_price::Error::AmountBelowMinimum {
|
||||
min: *min,
|
||||
buy: *buy,
|
||||
},
|
||||
Error::AmountAboveMaximum { max, buy } => spot_price::Error::AmountAboveMaximum {
|
||||
max: *max,
|
||||
buy: *buy,
|
||||
},
|
||||
Error::BalanceTooLow { buy, .. } => spot_price::Error::BalanceTooLow { buy: *buy },
|
||||
Error::BlockchainNetworkMismatch { cli, asb } => {
|
||||
spot_price::Error::BlockchainNetworkMismatch {
|
||||
cli: *cli,
|
||||
asb: *asb,
|
||||
}
|
||||
}
|
||||
Error::LatestRateFetchFailed(_) | Error::SellQuoteCalculationFailed(_) => {
|
||||
spot_price::Error::Other
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::asb::Rate;
|
||||
use crate::env::GetConfig;
|
||||
use crate::monero;
|
||||
use crate::network::test::{await_events_or_timeout, connect, new_swarm};
|
||||
use crate::protocol::{alice, bob};
|
||||
use anyhow::anyhow;
|
||||
use libp2p::Swarm;
|
||||
use rust_decimal::Decimal;
|
||||
|
||||
impl Default for AliceBehaviourValues {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
balance: monero::Amount::from_monero(1.0).unwrap(),
|
||||
lock_fee: monero::Amount::ZERO,
|
||||
min_buy: bitcoin::Amount::from_btc(0.001).unwrap(),
|
||||
max_buy: bitcoin::Amount::from_btc(0.01).unwrap(),
|
||||
rate: TestRate::default(), // 0.01
|
||||
resume_only: false,
|
||||
env_config: env::Testnet::get_config(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn given_alice_has_sufficient_balance_then_returns_price() {
|
||||
let mut test = SpotPriceTest::setup(AliceBehaviourValues::default()).await;
|
||||
|
||||
let btc_to_swap = bitcoin::Amount::from_btc(0.01).unwrap();
|
||||
let expected_xmr = monero::Amount::from_monero(1.0).unwrap();
|
||||
|
||||
test.construct_and_send_request(btc_to_swap);
|
||||
test.assert_price((btc_to_swap, expected_xmr), expected_xmr)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn given_alice_has_insufficient_balance_then_returns_error() {
|
||||
let mut test = SpotPriceTest::setup(
|
||||
AliceBehaviourValues::default().with_balance(monero::Amount::ZERO),
|
||||
)
|
||||
.await;
|
||||
|
||||
let btc_to_swap = bitcoin::Amount::from_btc(0.01).unwrap();
|
||||
|
||||
test.construct_and_send_request(btc_to_swap);
|
||||
test.assert_error(
|
||||
alice::spot_price::Error::BalanceTooLow {
|
||||
balance: monero::Amount::ZERO,
|
||||
buy: btc_to_swap,
|
||||
},
|
||||
bob::spot_price::Error::BalanceTooLow { buy: btc_to_swap },
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn given_alice_has_insufficient_balance_after_balance_update_then_returns_error() {
|
||||
let mut test = SpotPriceTest::setup(AliceBehaviourValues::default()).await;
|
||||
|
||||
let btc_to_swap = bitcoin::Amount::from_btc(0.01).unwrap();
|
||||
let expected_xmr = monero::Amount::from_monero(1.0).unwrap();
|
||||
|
||||
test.construct_and_send_request(btc_to_swap);
|
||||
test.assert_price((btc_to_swap, expected_xmr), expected_xmr)
|
||||
.await;
|
||||
|
||||
test.alice_swarm
|
||||
.behaviour_mut()
|
||||
.update_balance(monero::Amount::ZERO);
|
||||
|
||||
test.construct_and_send_request(btc_to_swap);
|
||||
test.assert_error(
|
||||
alice::spot_price::Error::BalanceTooLow {
|
||||
balance: monero::Amount::ZERO,
|
||||
buy: btc_to_swap,
|
||||
},
|
||||
bob::spot_price::Error::BalanceTooLow { buy: btc_to_swap },
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn given_alice_has_insufficient_balance_because_of_lock_fee_then_returns_error() {
|
||||
let balance = monero::Amount::from_monero(1.0).unwrap();
|
||||
|
||||
let mut test = SpotPriceTest::setup(
|
||||
AliceBehaviourValues::default()
|
||||
.with_balance(balance)
|
||||
.with_lock_fee(monero::Amount::from_piconero(1)),
|
||||
)
|
||||
.await;
|
||||
|
||||
let btc_to_swap = bitcoin::Amount::from_btc(0.01).unwrap();
|
||||
test.construct_and_send_request(btc_to_swap);
|
||||
test.assert_error(
|
||||
alice::spot_price::Error::BalanceTooLow {
|
||||
balance,
|
||||
buy: btc_to_swap,
|
||||
},
|
||||
bob::spot_price::Error::BalanceTooLow { buy: btc_to_swap },
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn given_below_min_buy_then_returns_error() {
|
||||
let min_buy = bitcoin::Amount::from_btc(0.001).unwrap();
|
||||
|
||||
let mut test =
|
||||
SpotPriceTest::setup(AliceBehaviourValues::default().with_min_buy(min_buy)).await;
|
||||
|
||||
let btc_to_swap = bitcoin::Amount::from_btc(0.0001).unwrap();
|
||||
test.construct_and_send_request(btc_to_swap);
|
||||
test.assert_error(
|
||||
alice::spot_price::Error::AmountBelowMinimum {
|
||||
buy: btc_to_swap,
|
||||
min: min_buy,
|
||||
},
|
||||
bob::spot_price::Error::AmountBelowMinimum {
|
||||
buy: btc_to_swap,
|
||||
min: min_buy,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn given_above_max_buy_then_returns_error() {
|
||||
let max_buy = bitcoin::Amount::from_btc(0.001).unwrap();
|
||||
|
||||
let mut test =
|
||||
SpotPriceTest::setup(AliceBehaviourValues::default().with_max_buy(max_buy)).await;
|
||||
|
||||
let btc_to_swap = bitcoin::Amount::from_btc(0.01).unwrap();
|
||||
|
||||
test.construct_and_send_request(btc_to_swap);
|
||||
test.assert_error(
|
||||
alice::spot_price::Error::AmountAboveMaximum {
|
||||
buy: btc_to_swap,
|
||||
max: max_buy,
|
||||
},
|
||||
bob::spot_price::Error::AmountAboveMaximum {
|
||||
buy: btc_to_swap,
|
||||
max: max_buy,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn given_alice_in_resume_only_mode_then_returns_error() {
|
||||
let mut test =
|
||||
SpotPriceTest::setup(AliceBehaviourValues::default().with_resume_only(true)).await;
|
||||
|
||||
let btc_to_swap = bitcoin::Amount::from_btc(0.01).unwrap();
|
||||
test.construct_and_send_request(btc_to_swap);
|
||||
test.assert_error(
|
||||
alice::spot_price::Error::ResumeOnlyMode,
|
||||
bob::spot_price::Error::NoSwapsAccepted,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn given_rate_fetch_problem_then_returns_error() {
|
||||
let mut test =
|
||||
SpotPriceTest::setup(AliceBehaviourValues::default().with_rate(TestRate::error_rate()))
|
||||
.await;
|
||||
|
||||
let btc_to_swap = bitcoin::Amount::from_btc(0.01).unwrap();
|
||||
test.construct_and_send_request(btc_to_swap);
|
||||
test.assert_error(
|
||||
alice::spot_price::Error::LatestRateFetchFailed(Box::new(TestRateError {})),
|
||||
bob::spot_price::Error::Other,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn given_rate_calculation_problem_then_returns_error() {
|
||||
let mut test = SpotPriceTest::setup(
|
||||
AliceBehaviourValues::default().with_rate(TestRate::from_rate_and_spread(0.0, 0)),
|
||||
)
|
||||
.await;
|
||||
|
||||
let btc_to_swap = bitcoin::Amount::from_btc(0.01).unwrap();
|
||||
|
||||
test.construct_and_send_request(btc_to_swap);
|
||||
test.assert_error(
|
||||
alice::spot_price::Error::SellQuoteCalculationFailed(anyhow!(
|
||||
"Error text irrelevant, won't be checked here"
|
||||
)),
|
||||
bob::spot_price::Error::Other,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn given_alice_mainnnet_bob_testnet_then_network_mismatch_error() {
|
||||
let mut test = SpotPriceTest::setup(
|
||||
AliceBehaviourValues::default().with_env_config(env::Mainnet::get_config()),
|
||||
)
|
||||
.await;
|
||||
|
||||
let btc_to_swap = bitcoin::Amount::from_btc(0.01).unwrap();
|
||||
test.construct_and_send_request(btc_to_swap);
|
||||
test.assert_error(
|
||||
alice::spot_price::Error::BlockchainNetworkMismatch {
|
||||
cli: BlockchainNetwork {
|
||||
bitcoin: bitcoin::Network::Testnet,
|
||||
monero: monero::Network::Stagenet,
|
||||
},
|
||||
asb: BlockchainNetwork {
|
||||
bitcoin: bitcoin::Network::Bitcoin,
|
||||
monero: monero::Network::Mainnet,
|
||||
},
|
||||
},
|
||||
bob::spot_price::Error::BlockchainNetworkMismatch {
|
||||
cli: BlockchainNetwork {
|
||||
bitcoin: bitcoin::Network::Testnet,
|
||||
monero: monero::Network::Stagenet,
|
||||
},
|
||||
asb: BlockchainNetwork {
|
||||
bitcoin: bitcoin::Network::Bitcoin,
|
||||
monero: monero::Network::Mainnet,
|
||||
},
|
||||
},
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn given_alice_testnet_bob_mainnet_then_network_mismatch_error() {
|
||||
let mut test = SpotPriceTest::setup(AliceBehaviourValues::default()).await;
|
||||
|
||||
let btc_to_swap = bitcoin::Amount::from_btc(0.01).unwrap();
|
||||
let request = spot_price::Request {
|
||||
btc: btc_to_swap,
|
||||
blockchain_network: BlockchainNetwork {
|
||||
bitcoin: bitcoin::Network::Bitcoin,
|
||||
monero: monero::Network::Mainnet,
|
||||
},
|
||||
};
|
||||
|
||||
test.send_request(request);
|
||||
test.assert_error(
|
||||
alice::spot_price::Error::BlockchainNetworkMismatch {
|
||||
cli: BlockchainNetwork {
|
||||
bitcoin: bitcoin::Network::Bitcoin,
|
||||
monero: monero::Network::Mainnet,
|
||||
},
|
||||
asb: BlockchainNetwork {
|
||||
bitcoin: bitcoin::Network::Testnet,
|
||||
monero: monero::Network::Stagenet,
|
||||
},
|
||||
},
|
||||
bob::spot_price::Error::BlockchainNetworkMismatch {
|
||||
cli: BlockchainNetwork {
|
||||
bitcoin: bitcoin::Network::Bitcoin,
|
||||
monero: monero::Network::Mainnet,
|
||||
},
|
||||
asb: BlockchainNetwork {
|
||||
bitcoin: bitcoin::Network::Testnet,
|
||||
monero: monero::Network::Stagenet,
|
||||
},
|
||||
},
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
struct SpotPriceTest {
|
||||
alice_swarm: Swarm<alice::spot_price::Behaviour<TestRate>>,
|
||||
bob_swarm: Swarm<spot_price::Behaviour>,
|
||||
|
||||
alice_peer_id: PeerId,
|
||||
}
|
||||
|
||||
impl SpotPriceTest {
|
||||
pub async fn setup(values: AliceBehaviourValues) -> Self {
|
||||
let (mut alice_swarm, _, alice_peer_id) = new_swarm(|_, _| {
|
||||
Behaviour::new(
|
||||
values.balance,
|
||||
values.lock_fee,
|
||||
values.min_buy,
|
||||
values.max_buy,
|
||||
values.env_config,
|
||||
values.rate.clone(),
|
||||
values.resume_only,
|
||||
)
|
||||
});
|
||||
let (mut bob_swarm, ..) = new_swarm(|_, _| bob::spot_price::bob());
|
||||
|
||||
connect(&mut alice_swarm, &mut bob_swarm).await;
|
||||
|
||||
Self {
|
||||
alice_swarm,
|
||||
bob_swarm,
|
||||
alice_peer_id,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn construct_and_send_request(&mut self, btc_to_swap: bitcoin::Amount) {
|
||||
let request = spot_price::Request {
|
||||
btc: btc_to_swap,
|
||||
blockchain_network: BlockchainNetwork {
|
||||
bitcoin: bitcoin::Network::Testnet,
|
||||
monero: monero::Network::Stagenet,
|
||||
},
|
||||
};
|
||||
self.send_request(request);
|
||||
}
|
||||
|
||||
pub fn send_request(&mut self, spot_price_request: spot_price::Request) {
|
||||
self.bob_swarm
|
||||
.behaviour_mut()
|
||||
.send_request(&self.alice_peer_id, spot_price_request);
|
||||
}
|
||||
|
||||
async fn assert_price(
|
||||
&mut self,
|
||||
alice_assert: (bitcoin::Amount, monero::Amount),
|
||||
bob_assert: monero::Amount,
|
||||
) {
|
||||
match await_events_or_timeout(self.alice_swarm.next(), self.bob_swarm.next()).await {
|
||||
(
|
||||
alice::spot_price::OutEvent::ExecutionSetupParams { btc, xmr, .. },
|
||||
spot_price::OutEvent::Message { message, .. },
|
||||
) => {
|
||||
assert_eq!(alice_assert, (btc, xmr));
|
||||
|
||||
let response = match message {
|
||||
RequestResponseMessage::Response { response, .. } => response,
|
||||
_ => panic!("Unexpected message {:?} for Bob", message),
|
||||
};
|
||||
|
||||
match response {
|
||||
spot_price::Response::Xmr(xmr) => {
|
||||
assert_eq!(bob_assert, xmr)
|
||||
}
|
||||
_ => panic!("Unexpected response {:?} for Bob", response),
|
||||
}
|
||||
}
|
||||
(alice_event, bob_event) => panic!(
|
||||
"Received unexpected event, alice emitted {:?} and bob emitted {:?}",
|
||||
alice_event, bob_event
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
async fn assert_error(
|
||||
&mut self,
|
||||
alice_assert: alice::spot_price::Error,
|
||||
bob_assert: bob::spot_price::Error,
|
||||
) {
|
||||
match await_events_or_timeout(self.alice_swarm.next(), self.bob_swarm.next()).await {
|
||||
(
|
||||
alice::spot_price::OutEvent::Error { error, .. },
|
||||
spot_price::OutEvent::Message { message, .. },
|
||||
) => {
|
||||
// TODO: Somehow make PartialEq work on Alice's spot_price::Error
|
||||
match (alice_assert, error) {
|
||||
(
|
||||
alice::spot_price::Error::BalanceTooLow {
|
||||
balance: balance1,
|
||||
buy: buy1,
|
||||
},
|
||||
alice::spot_price::Error::BalanceTooLow {
|
||||
balance: balance2,
|
||||
buy: buy2,
|
||||
},
|
||||
) => {
|
||||
assert_eq!(balance1, balance2);
|
||||
assert_eq!(buy1, buy2);
|
||||
}
|
||||
(
|
||||
alice::spot_price::Error::BlockchainNetworkMismatch {
|
||||
cli: cli1,
|
||||
asb: asb1,
|
||||
},
|
||||
alice::spot_price::Error::BlockchainNetworkMismatch {
|
||||
cli: cli2,
|
||||
asb: asb2,
|
||||
},
|
||||
) => {
|
||||
assert_eq!(cli1, cli2);
|
||||
assert_eq!(asb1, asb2);
|
||||
}
|
||||
(
|
||||
alice::spot_price::Error::AmountBelowMinimum { .. },
|
||||
alice::spot_price::Error::AmountBelowMinimum { .. },
|
||||
)
|
||||
| (
|
||||
alice::spot_price::Error::AmountAboveMaximum { .. },
|
||||
alice::spot_price::Error::AmountAboveMaximum { .. },
|
||||
)
|
||||
| (
|
||||
alice::spot_price::Error::LatestRateFetchFailed(_),
|
||||
alice::spot_price::Error::LatestRateFetchFailed(_),
|
||||
)
|
||||
| (
|
||||
alice::spot_price::Error::SellQuoteCalculationFailed(_),
|
||||
alice::spot_price::Error::SellQuoteCalculationFailed(_),
|
||||
)
|
||||
| (
|
||||
alice::spot_price::Error::ResumeOnlyMode,
|
||||
alice::spot_price::Error::ResumeOnlyMode,
|
||||
) => {}
|
||||
(alice_assert, error) => {
|
||||
panic!("Expected: {:?} Actual: {:?}", alice_assert, error)
|
||||
}
|
||||
}
|
||||
|
||||
let response = match message {
|
||||
RequestResponseMessage::Response { response, .. } => response,
|
||||
_ => panic!("Unexpected message {:?} for Bob", message),
|
||||
};
|
||||
|
||||
match response {
|
||||
spot_price::Response::Error(error) => {
|
||||
assert_eq!(bob_assert, error.into())
|
||||
}
|
||||
_ => panic!("Unexpected response {:?} for Bob", response),
|
||||
}
|
||||
}
|
||||
(alice_event, bob_event) => panic!(
|
||||
"Received unexpected event, alice emitted {:?} and bob emitted {:?}",
|
||||
alice_event, bob_event
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct AliceBehaviourValues {
|
||||
pub balance: monero::Amount,
|
||||
pub lock_fee: monero::Amount,
|
||||
pub min_buy: bitcoin::Amount,
|
||||
pub max_buy: bitcoin::Amount,
|
||||
pub rate: TestRate, // 0.01
|
||||
pub resume_only: bool,
|
||||
pub env_config: env::Config,
|
||||
}
|
||||
|
||||
impl AliceBehaviourValues {
|
||||
pub fn with_balance(mut self, balance: monero::Amount) -> AliceBehaviourValues {
|
||||
self.balance = balance;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_lock_fee(mut self, lock_fee: monero::Amount) -> AliceBehaviourValues {
|
||||
self.lock_fee = lock_fee;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_min_buy(mut self, min_buy: bitcoin::Amount) -> AliceBehaviourValues {
|
||||
self.min_buy = min_buy;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_max_buy(mut self, max_buy: bitcoin::Amount) -> AliceBehaviourValues {
|
||||
self.max_buy = max_buy;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_resume_only(mut self, resume_only: bool) -> AliceBehaviourValues {
|
||||
self.resume_only = resume_only;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_rate(mut self, rate: TestRate) -> AliceBehaviourValues {
|
||||
self.rate = rate;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_env_config(mut self, env_config: env::Config) -> AliceBehaviourValues {
|
||||
self.env_config = env_config;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum TestRate {
|
||||
Rate(Rate),
|
||||
Err(TestRateError),
|
||||
}
|
||||
|
||||
impl TestRate {
|
||||
pub const RATE: f64 = 0.01;
|
||||
|
||||
pub fn from_rate_and_spread(rate: f64, spread: u64) -> Self {
|
||||
let ask = bitcoin::Amount::from_btc(rate).expect("Static value should never fail");
|
||||
let spread = Decimal::from(spread);
|
||||
Self::Rate(Rate::new(ask, spread))
|
||||
}
|
||||
|
||||
pub fn error_rate() -> Self {
|
||||
Self::Err(TestRateError {})
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for TestRate {
|
||||
fn default() -> Self {
|
||||
TestRate::from_rate_and_spread(Self::RATE, 0)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, thiserror::Error)]
|
||||
#[error("Could not fetch rate")]
|
||||
pub struct TestRateError {}
|
||||
|
||||
impl LatestRate for TestRate {
|
||||
type Error = TestRateError;
|
||||
|
||||
fn latest_rate(&mut self) -> Result<Rate, Self::Error> {
|
||||
match self {
|
||||
TestRate::Rate(rate) => Ok(*rate),
|
||||
TestRate::Err(error) => Err(error.clone()),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -1,95 +0,0 @@
|
||||
use crate::network::cbor_request_response::BUF_SIZE;
|
||||
use crate::protocol::bob::{State0, State2};
|
||||
use crate::protocol::{bob, Message1, Message3};
|
||||
use anyhow::{Context, Error, Result};
|
||||
use libp2p::PeerId;
|
||||
use libp2p_async_await::BehaviourOutEvent;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum OutEvent {
|
||||
Done(Result<State2>),
|
||||
}
|
||||
|
||||
impl From<BehaviourOutEvent<(), State2, anyhow::Error>> for OutEvent {
|
||||
fn from(event: BehaviourOutEvent<(), State2, Error>) -> Self {
|
||||
match event {
|
||||
BehaviourOutEvent::Outbound(_, Ok(State2)) => OutEvent::Done(Ok(State2)),
|
||||
BehaviourOutEvent::Outbound(_, Err(e)) => OutEvent::Done(Err(e)),
|
||||
BehaviourOutEvent::Inbound(..) => unreachable!("Bob only supports outbound"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(libp2p::NetworkBehaviour)]
|
||||
#[behaviour(out_event = "OutEvent", event_process = false)]
|
||||
pub struct Behaviour {
|
||||
inner: libp2p_async_await::Behaviour<(), State2, anyhow::Error>,
|
||||
}
|
||||
|
||||
impl Default for Behaviour {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
inner: libp2p_async_await::Behaviour::new(b"/comit/xmr/btc/execution_setup/1.0.0"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Behaviour {
|
||||
pub fn run(
|
||||
&mut self,
|
||||
alice: PeerId,
|
||||
state0: State0,
|
||||
bitcoin_wallet: Arc<crate::bitcoin::Wallet>,
|
||||
) {
|
||||
self.inner.do_protocol_dialer(alice, move |mut substream| {
|
||||
let protocol = async move {
|
||||
tracing::debug!("Starting execution setup with {}", alice);
|
||||
|
||||
substream
|
||||
.write_message(
|
||||
&serde_cbor::to_vec(&state0.next_message())
|
||||
.context("Failed to serialize message0")?,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let message1 =
|
||||
serde_cbor::from_slice::<Message1>(&substream.read_message(BUF_SIZE).await?)
|
||||
.context("Failed to deserialize message1")?;
|
||||
let state1 = state0.receive(bitcoin_wallet.as_ref(), message1).await?;
|
||||
|
||||
substream
|
||||
.write_message(
|
||||
&serde_cbor::to_vec(&state1.next_message())
|
||||
.context("Failed to serialize message2")?,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let message3 =
|
||||
serde_cbor::from_slice::<Message3>(&substream.read_message(BUF_SIZE).await?)
|
||||
.context("Failed to deserialize message3")?;
|
||||
let state2 = state1.receive(message3)?;
|
||||
|
||||
substream
|
||||
.write_message(
|
||||
&serde_cbor::to_vec(&state2.next_message())
|
||||
.context("Failed to serialize message4")?,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(state2)
|
||||
};
|
||||
|
||||
async move { tokio::time::timeout(Duration::from_secs(60), protocol).await? }
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl From<OutEvent> for bob::OutEvent {
|
||||
fn from(event: OutEvent) -> Self {
|
||||
match event {
|
||||
OutEvent::Done(res) => Self::ExecutionSetupDone(Box::new(res)),
|
||||
}
|
||||
}
|
||||
}
|
@ -1,86 +0,0 @@
|
||||
use crate::network::cbor_request_response::CborCodec;
|
||||
use crate::network::spot_price;
|
||||
use crate::network::spot_price::SpotPriceProtocol;
|
||||
use crate::protocol::bob::OutEvent;
|
||||
use libp2p::request_response::{ProtocolSupport, RequestResponseConfig};
|
||||
use libp2p::PeerId;
|
||||
|
||||
const PROTOCOL: &str = spot_price::PROTOCOL;
|
||||
pub type SpotPriceOutEvent = spot_price::OutEvent;
|
||||
|
||||
/// Constructs a new instance of the `spot-price` behaviour to be used by Bob.
|
||||
///
|
||||
/// Bob only supports outbound connections, i.e. requesting a spot price for a
|
||||
/// given amount of BTC in XMR.
|
||||
pub fn bob() -> spot_price::Behaviour {
|
||||
spot_price::Behaviour::new(
|
||||
CborCodec::default(),
|
||||
vec![(SpotPriceProtocol, ProtocolSupport::Outbound)],
|
||||
RequestResponseConfig::default(),
|
||||
)
|
||||
}
|
||||
|
||||
impl From<(PeerId, spot_price::Message)> for OutEvent {
|
||||
fn from((peer, message): (PeerId, spot_price::Message)) -> Self {
|
||||
match message {
|
||||
spot_price::Message::Request { .. } => Self::unexpected_request(peer),
|
||||
spot_price::Message::Response {
|
||||
response,
|
||||
request_id,
|
||||
} => Self::SpotPriceReceived {
|
||||
id: request_id,
|
||||
response,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
crate::impl_from_rr_event!(SpotPriceOutEvent, OutEvent, PROTOCOL);
|
||||
|
||||
#[derive(Clone, Debug, thiserror::Error, PartialEq)]
|
||||
pub enum Error {
|
||||
#[error("Seller currently does not accept incoming swap requests, please try again later")]
|
||||
NoSwapsAccepted,
|
||||
#[error("Seller refused to buy {buy} because the minimum configured buy limit is {min}")]
|
||||
AmountBelowMinimum {
|
||||
min: bitcoin::Amount,
|
||||
buy: bitcoin::Amount,
|
||||
},
|
||||
#[error("Seller refused to buy {buy} because the maximum configured buy limit is {max}")]
|
||||
AmountAboveMaximum {
|
||||
max: bitcoin::Amount,
|
||||
buy: bitcoin::Amount,
|
||||
},
|
||||
#[error("Seller's XMR balance is currently too low to fulfill the swap request to buy {buy}, please try again later")]
|
||||
BalanceTooLow { buy: bitcoin::Amount },
|
||||
|
||||
#[error("Seller blockchain network {asb:?} setup did not match your blockchain network setup {cli:?}")]
|
||||
BlockchainNetworkMismatch {
|
||||
cli: spot_price::BlockchainNetwork,
|
||||
asb: spot_price::BlockchainNetwork,
|
||||
},
|
||||
|
||||
/// To be used for errors that cannot be explained on the CLI side (e.g.
|
||||
/// rate update problems on the seller side)
|
||||
#[error("Seller encountered a problem, please try again later.")]
|
||||
Other,
|
||||
}
|
||||
|
||||
impl From<spot_price::Error> for Error {
|
||||
fn from(error: spot_price::Error) -> Self {
|
||||
match error {
|
||||
spot_price::Error::NoSwapsAccepted => Error::NoSwapsAccepted,
|
||||
spot_price::Error::AmountBelowMinimum { min, buy } => {
|
||||
Error::AmountBelowMinimum { min, buy }
|
||||
}
|
||||
spot_price::Error::AmountAboveMaximum { max, buy } => {
|
||||
Error::AmountAboveMaximum { max, buy }
|
||||
}
|
||||
spot_price::Error::BalanceTooLow { buy } => Error::BalanceTooLow { buy },
|
||||
spot_price::Error::BlockchainNetworkMismatch { cli, asb } => {
|
||||
Error::BlockchainNetworkMismatch { cli, asb }
|
||||
}
|
||||
spot_price::Error::Other => Error::Other,
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in new issue