Alice's spot price logic into dedicated behaviour

Move Alice's spot price logic into a dedicated network behaviour that handles all the logic.
The new behaviour encapsulates the complete state necessary for spot price request decision making.
The network behaviour cannot handle asynchronous calls, thus the balance is managed inside the spot price and has to updated regularly from the outside to ensure the spot price balance check has up to date data.
At the moment the balance is updated upon an incoming quote requests.

Code that is relevant for both ASB and CLI remains in the `network::spot_price` module (e.g. `network::spot_price::Error`).
pull/461/head
Daniel Karzel 3 years ago
parent ea76ae5821
commit 52f648e1de
No known key found for this signature in database
GPG Key ID: 30C3FC2E438ADB6E

@ -119,7 +119,17 @@ async fn main() -> Result<()> {
}
};
let mut swarm = swarm::alice(&seed)?;
let current_balance = monero_wallet.get_balance().await?;
let lock_fee = monero_wallet.static_tx_fee_estimate();
let kraken_rate = KrakenRate::new(ask_spread, kraken_price_updates);
let mut swarm = swarm::alice(
&seed,
current_balance,
lock_fee,
max_buy,
kraken_rate.clone(),
resume_only,
)?;
for listen in config.network.listen {
Swarm::listen_on(&mut swarm, listen.clone())
@ -132,9 +142,8 @@ async fn main() -> Result<()> {
Arc::new(bitcoin_wallet),
Arc::new(monero_wallet),
Arc::new(db),
KrakenRate::new(ask_spread, kraken_price_updates),
kraken_rate,
max_buy,
resume_only,
)
.unwrap();

@ -1,6 +1,6 @@
use crate::monero;
use crate::network::cbor_request_response::CborCodec;
use crate::protocol::{alice, bob};
use crate::protocol::bob;
use libp2p::core::ProtocolName;
use libp2p::request_response::{
ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent,
@ -10,7 +10,7 @@ use libp2p::PeerId;
use serde::{Deserialize, Serialize};
const PROTOCOL: &str = "/comit/xmr/btc/spot-price/1.0.0";
type OutEvent = RequestResponseEvent<Request, Response>;
pub type OutEvent = RequestResponseEvent<Request, Response>;
type Message = RequestResponseMessage<Request, Response>;
pub type Behaviour = RequestResponse<CborCodec<SpotPriceProtocol, Request, Response>>;
@ -50,7 +50,7 @@ pub enum Error {
#[error(
"This seller currently does not accept incoming swap requests, please try again later"
)]
MaintenanceMode,
NoSwapsAccepted,
#[error("Seller refused to buy {buy} because the maximum configured buy limit is {max}")]
MaxBuyAmountExceeded {
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
@ -63,18 +63,11 @@ pub enum Error {
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
buy: bitcoin::Amount,
},
}
/// Constructs a new instance of the `spot-price` behaviour to be used by Alice.
///
/// Alice only supports inbound connections, i.e. providing spot prices for BTC
/// in XMR.
pub fn alice() -> Behaviour {
Behaviour::new(
CborCodec::default(),
vec![(SpotPriceProtocol, ProtocolSupport::Inbound)],
RequestResponseConfig::default(),
)
/// To be used for errors that cannot be explained on the CLI side (e.g.
/// rate update problems on the seller side)
#[error("The seller encountered a problem, please try again later.")]
Other,
}
/// Constructs a new instance of the `spot-price` behaviour to be used by Bob.
@ -89,22 +82,6 @@ pub fn bob() -> Behaviour {
)
}
impl From<(PeerId, Message)> for alice::OutEvent {
fn from((peer, message): (PeerId, Message)) -> Self {
match message {
Message::Request {
request, channel, ..
} => Self::SpotPriceRequested {
request,
channel,
peer,
},
Message::Response { .. } => Self::unexpected_response(peer),
}
}
}
crate::impl_from_rr_event!(OutEvent, alice::OutEvent, PROTOCOL);
impl From<(PeerId, Message)> for bob::OutEvent {
fn from((peer, message): (PeerId, Message)) -> Self {
match message {

@ -1,13 +1,27 @@
use crate::network::transport;
use crate::protocol::alice::event_loop::LatestRate;
use crate::protocol::{alice, bob};
use crate::seed::Seed;
use crate::tor;
use crate::{monero, tor};
use anyhow::Result;
use libp2p::swarm::{NetworkBehaviour, SwarmBuilder};
use libp2p::{PeerId, Swarm};
pub fn alice(seed: &Seed) -> Result<Swarm<alice::Behaviour>> {
with_clear_net(seed, alice::Behaviour::default())
pub fn alice<LR>(
seed: &Seed,
balance: monero::Amount,
lock_fee: monero::Amount,
max_buy: bitcoin::Amount,
latest_rate: LR,
resume_only: bool,
) -> Result<Swarm<alice::Behaviour<LR>>>
where
LR: LatestRate + Send + 'static,
{
with_clear_net(
seed,
alice::Behaviour::new(balance, lock_fee, max_buy, latest_rate, resume_only),
)
}
pub async fn bob(

@ -14,6 +14,7 @@ pub use self::swap::{run, run_until};
mod behaviour;
pub mod event_loop;
mod execution_setup;
mod spot_price;
pub mod state;
pub mod swap;

@ -1,6 +1,8 @@
use crate::monero;
use crate::network::quote::BidQuote;
use crate::network::{encrypted_signature, quote, spot_price, transfer_proof};
use crate::protocol::alice::{execution_setup, State3};
use crate::network::{encrypted_signature, quote, transfer_proof};
use crate::protocol::alice::event_loop::LatestRate;
use crate::protocol::alice::{execution_setup, spot_price, State3};
use anyhow::{anyhow, Error};
use libp2p::request_response::{RequestId, ResponseChannel};
use libp2p::{NetworkBehaviour, PeerId};
@ -8,10 +10,10 @@ use uuid::Uuid;
#[derive(Debug)]
pub enum OutEvent {
SpotPriceRequested {
request: spot_price::Request,
channel: ResponseChannel<spot_price::Response>,
ExecutionSetupStart {
peer: PeerId,
btc: bitcoin::Amount,
xmr: monero::Amount,
},
QuoteRequested {
channel: ResponseChannel<BidQuote>,
@ -60,19 +62,34 @@ impl OutEvent {
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", event_process = false)]
#[allow(missing_debug_implementations)]
pub struct Behaviour {
pub struct Behaviour<LR: LatestRate + Send + 'static> {
pub quote: quote::Behaviour,
pub spot_price: spot_price::Behaviour,
pub spot_price: spot_price::Behaviour<LR>,
pub execution_setup: execution_setup::Behaviour,
pub transfer_proof: transfer_proof::Behaviour,
pub encrypted_signature: encrypted_signature::Behaviour,
}
impl Default for Behaviour {
fn default() -> Self {
impl<LR> Behaviour<LR>
where
LR: LatestRate + Send + 'static,
{
pub fn new(
balance: monero::Amount,
lock_fee: monero::Amount,
max_buy: bitcoin::Amount,
latest_rate: LR,
resume_only: bool,
) -> Self {
Self {
quote: quote::alice(),
spot_price: spot_price::alice(),
spot_price: spot_price::Behaviour::new(
balance,
lock_fee,
max_buy,
latest_rate,
resume_only,
),
execution_setup: Default::default(),
transfer_proof: transfer_proof::alice(),
encrypted_signature: encrypted_signature::alice(),

@ -2,7 +2,7 @@ use crate::asb::Rate;
use crate::database::Database;
use crate::env::Config;
use crate::network::quote::BidQuote;
use crate::network::{spot_price, transfer_proof};
use crate::network::transfer_proof;
use crate::protocol::alice::{AliceState, Behaviour, OutEvent, State0, State3, Swap};
use crate::{bitcoin, kraken, monero};
use anyhow::{Context, Result};
@ -31,17 +31,16 @@ type OutgoingTransferProof =
BoxFuture<'static, Result<(PeerId, transfer_proof::Request, bmrng::Responder<()>)>>;
#[allow(missing_debug_implementations)]
pub struct EventLoop<RS> {
swarm: libp2p::Swarm<Behaviour>,
pub struct EventLoop<LR: LatestRate + Send + 'static> {
swarm: libp2p::Swarm<Behaviour<LR>>,
env_config: Config,
bitcoin_wallet: Arc<bitcoin::Wallet>,
monero_wallet: Arc<monero::Wallet>,
db: Arc<Database>,
latest_rate: RS,
latest_rate: LR,
max_buy: bitcoin::Amount,
swap_sender: mpsc::Sender<Swap>,
resume_only: bool,
/// Stores incoming [`EncryptedSignature`]s per swap.
recv_encrypted_signature: HashMap<Uuid, bmrng::RequestSender<bitcoin::EncryptedSignature, ()>>,
@ -60,18 +59,17 @@ pub struct EventLoop<RS> {
impl<LR> EventLoop<LR>
where
LR: LatestRate,
LR: LatestRate + Send + 'static,
{
#[allow(clippy::too_many_arguments)]
pub fn new(
swarm: Swarm<Behaviour>,
swarm: Swarm<Behaviour<LR>>,
env_config: Config,
bitcoin_wallet: Arc<bitcoin::Wallet>,
monero_wallet: Arc<monero::Wallet>,
db: Arc<Database>,
latest_rate: LR,
max_buy: bitcoin::Amount,
resume_only: bool,
) -> Result<(Self, mpsc::Receiver<Swap>)> {
let swap_channel = MpscChannels::default();
@ -83,7 +81,6 @@ where
db,
latest_rate,
swap_sender: swap_channel.sender,
resume_only,
max_buy,
recv_encrypted_signature: Default::default(),
inflight_encrypted_signatures: Default::default(),
@ -146,38 +143,8 @@ where
tokio::select! {
swarm_event = self.swarm.next_event() => {
match swarm_event {
SwarmEvent::Behaviour(OutEvent::SpotPriceRequested { request, channel, peer }) => {
let btc = request.btc;
let xmr = match self.handle_spot_price_request(btc, self.monero_wallet.clone()).await {
Ok(xmr) => match xmr {
Ok(xmr) => xmr,
Err(e) => {
tracing::warn!(%peer, "Ignoring spot price request from {} because: {:#}", peer, e);
match self.swarm.behaviour_mut().spot_price.send_response(channel, spot_price::Response::Error(e)) {
Ok(_) => {
continue;
},
Err(_) => {
tracing::debug!(%peer, "Failed to respond with error to spot price request");
continue;
}
}
}
},
Err(e) => {
tracing::error!(%peer, "Unrecoverable error while producing spot price for {}: {:#}", btc, e);
continue;
}
};
SwarmEvent::Behaviour(OutEvent::ExecutionSetupStart { peer, btc, xmr }) => {
match self.swarm.behaviour_mut().spot_price.send_response(channel, spot_price::Response::Xmr(xmr)) {
Ok(_) => {},
Err(_) => {
// if we can't respond, the peer probably just disconnected so it is not a huge deal, only log this on debug
tracing::debug!(%peer, "Failed to respond with spot price");
continue;
}
}
let tx_redeem_fee = self.bitcoin_wallet
.estimate_fee(bitcoin::TxRedeem::weight(), btc)
.await;
@ -195,7 +162,7 @@ where
(redeem_address, punish_address)
}
_ => {
tracing::error!("Could not get new address.");
tracing::error!(%peer, "Failed to get new address during execution setup.");
continue;
}
};
@ -208,7 +175,7 @@ where
(tx_redeem_fee, tx_punish_fee)
}
_ => {
tracing::error!("Could not calculate transaction fees.");
tracing::error!(%peer, "Failed to calculate transaction fees during execution setup.");
continue;
}
};
@ -233,6 +200,17 @@ where
self.swarm.behaviour_mut().execution_setup.run(peer, state0);
}
SwarmEvent::Behaviour(OutEvent::QuoteRequested { channel, peer }) => {
// TODO: Move the spot-price update into dedicated update stream to decouple it from quote requests
let current_balance = self.monero_wallet.get_balance().await;
match current_balance {
Ok(balance) => {
self.swarm.behaviour_mut().spot_price.update_balance(balance);
}
Err(e) => {
tracing::error!("Failed to fetch Monero balance: {:#}", e);
}
}
let quote = match self.make_quote(self.max_buy).await {
Ok(quote) => quote,
Err(e) => {
@ -360,38 +338,6 @@ where
}
}
async fn handle_spot_price_request(
&mut self,
btc: bitcoin::Amount,
monero_wallet: Arc<monero::Wallet>,
) -> Result<Result<monero::Amount, spot_price::Error>> {
if self.resume_only {
return Ok(Err(spot_price::Error::MaintenanceMode));
}
let rate = self
.latest_rate
.latest_rate()
.context("Failed to get latest rate")?;
if btc > self.max_buy {
return Ok(Err(spot_price::Error::MaxBuyAmountExceeded {
buy: btc,
max: self.max_buy,
}));
}
let xmr_balance = monero_wallet.get_balance().await?;
let xmr_lock_fees = monero_wallet.static_tx_fee_estimate();
let xmr = rate.sell_quote(btc)?;
if xmr_balance < xmr + xmr_lock_fees {
return Ok(Err(spot_price::Error::BalanceTooLow { buy: btc }));
}
Ok(Ok(xmr))
}
async fn make_quote(&mut self, max_buy: bitcoin::Amount) -> Result<BidQuote> {
let rate = self
.latest_rate
@ -510,7 +456,7 @@ impl LatestRate for FixedRate {
/// Produces [`Rate`]s based on [`PriceUpdate`]s from kraken and a configured
/// spread.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct KrakenRate {
ask_spread: Decimal,
price_updates: kraken::PriceUpdates,

@ -0,0 +1,199 @@
use crate::monero;
use crate::network::cbor_request_response::CborCodec;
use crate::network::spot_price;
use crate::network::spot_price::SpotPriceProtocol;
use crate::protocol::alice;
use crate::protocol::alice::event_loop::LatestRate;
use libp2p::request_response::{
ProtocolSupport, RequestResponseConfig, RequestResponseEvent, RequestResponseMessage,
ResponseChannel,
};
use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters};
use libp2p::{NetworkBehaviour, PeerId};
use std::collections::VecDeque;
use std::task::{Context, Poll};
pub struct OutEvent {
peer: PeerId,
btc: bitcoin::Amount,
xmr: monero::Amount,
}
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", poll_method = "poll", event_process = true)]
#[allow(missing_debug_implementations)]
pub struct Behaviour<LR: LatestRate + Send + 'static> {
behaviour: spot_price::Behaviour,
#[behaviour(ignore)]
events: VecDeque<OutEvent>,
#[behaviour(ignore)]
balance: monero::Amount,
#[behaviour(ignore)]
lock_fee: monero::Amount,
#[behaviour(ignore)]
max_buy: bitcoin::Amount,
#[behaviour(ignore)]
latest_rate: LR,
#[behaviour(ignore)]
resume_only: bool,
}
/// Behaviour that handles spot prices.
/// All the logic how to react to a spot price request is contained here, events
/// reporting the successful handling of a spot price request or a failure are
/// bubbled up to the parent behaviour.
impl<LR> Behaviour<LR>
where
LR: LatestRate + Send + 'static,
{
pub fn new(
balance: monero::Amount,
lock_fee: monero::Amount,
max_buy: bitcoin::Amount,
latest_rate: LR,
resume_only: bool,
) -> Self {
Self {
behaviour: spot_price::Behaviour::new(
CborCodec::default(),
vec![(SpotPriceProtocol, ProtocolSupport::Inbound)],
RequestResponseConfig::default(),
),
events: Default::default(),
balance,
lock_fee,
max_buy,
latest_rate,
resume_only,
}
}
pub fn update_balance(&mut self, balance: monero::Amount) {
self.balance = balance;
}
fn send_error_response(
&mut self,
peer: PeerId,
channel: ResponseChannel<spot_price::Response>,
error: spot_price::Error,
) {
if self
.behaviour
.send_response(channel, spot_price::Response::Error(error))
.is_err()
{
tracing::debug!(%peer, "Unable to send error response for spot price request");
}
}
fn poll<BIE>(
&mut self,
_cx: &mut Context<'_>,
_params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<BIE, OutEvent>> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
// We trust in libp2p to poll us.
Poll::Pending
}
}
impl<LR> NetworkBehaviourEventProcess<spot_price::OutEvent> for Behaviour<LR>
where
LR: LatestRate + Send + 'static,
{
fn inject_event(&mut self, event: spot_price::OutEvent) {
let (peer, message) = match event {
RequestResponseEvent::Message { peer, message } => (peer, message),
RequestResponseEvent::OutboundFailure { peer, error, .. } => {
tracing::error!(%peer, "Failure sending spot price response: {:#}", error);
return;
}
RequestResponseEvent::InboundFailure { peer, error, .. } => {
tracing::warn!(%peer, "Inbound failure when handling spot price request: {:#}", error);
return;
}
RequestResponseEvent::ResponseSent { peer, .. } => {
tracing::debug!(%peer, "Spot price response sent");
return;
}
};
let (request, channel) = match message {
RequestResponseMessage::Request {
request, channel, ..
} => (request, channel),
RequestResponseMessage::Response { .. } => {
tracing::error!("Unexpected message");
return;
}
};
if self.resume_only {
tracing::warn!(%peer, "Ignoring spot price request from {} because ASB is running in resume-only mode", peer);
self.send_error_response(peer, channel, spot_price::Error::NoSwapsAccepted);
return;
}
let btc = request.btc;
if btc > self.max_buy {
tracing::warn!(%peer, "Ignoring spot price request from {} because max muy amount exceeded", peer);
self.send_error_response(peer, channel, spot_price::Error::MaxBuyAmountExceeded {
max: self.max_buy,
buy: btc,
});
return;
}
let rate = match self.latest_rate.latest_rate() {
Ok(rate) => rate,
Err(e) => {
tracing::error!(%peer, "Ignoring spot price request from {} because we encountered a problem with fetching the latest rate: {:#}", peer, e);
self.send_error_response(peer, channel, spot_price::Error::Other);
return;
}
};
let xmr = match rate.sell_quote(btc) {
Ok(xmr) => xmr,
Err(e) => {
tracing::error!(%peer, "Ignoring spot price request from {} because we encountered a problem with calculating the amount from rate: {:#}", peer, e);
self.send_error_response(peer, channel, spot_price::Error::Other);
return;
}
};
let xmr_balance = self.balance;
let xmr_lock_fees = self.lock_fee;
if xmr_balance < xmr + xmr_lock_fees {
tracing::error!(%peer, "Ignoring spot price request from {} because the XMR balance is too low to fulfill the swap: {}", peer, xmr_balance);
self.send_error_response(peer, channel, spot_price::Error::BalanceTooLow { buy: btc });
return;
}
if self
.behaviour
.send_response(channel, spot_price::Response::Xmr(xmr))
.is_err()
{
tracing::error!(%peer, "Failed to send spot price response of {} for {}", xmr, btc)
}
self.events.push_back(OutEvent { peer, btc, xmr });
}
}
impl From<OutEvent> for alice::OutEvent {
fn from(event: OutEvent) -> Self {
Self::ExecutionSetupStart {
peer: event.peer,
btc: event.btc,
xmr: event.xmr,
}
}
}

@ -90,7 +90,8 @@ where
env_config,
alice_bitcoin_wallet.clone(),
alice_monero_wallet.clone(),
);
)
.await;
let bob_seed = Seed::random().unwrap();
let bob_starting_balances = StartingBalances::new(btc_amount * 10, monero::Amount::ZERO, None);
@ -213,7 +214,7 @@ pub async fn init_electrs_container(
Ok(docker)
}
fn start_alice(
async fn start_alice(
seed: &Seed,
db_path: PathBuf,
listen_address: Multiaddr,
@ -223,7 +224,21 @@ fn start_alice(
) -> (AliceApplicationHandle, Receiver<alice::Swap>) {
let db = Arc::new(Database::open(db_path.as_path()).unwrap());
let mut swarm = swarm::alice(&seed).unwrap();
let current_balance = monero_wallet.get_balance().await.unwrap();
let lock_fee = monero_wallet.static_tx_fee_estimate();
let max_buy = bitcoin::Amount::from_sat(u64::MAX);
let latest_rate = FixedRate::default();
let resume_only = false;
let mut swarm = swarm::alice(
&seed,
current_balance,
lock_fee,
max_buy,
latest_rate,
resume_only,
)
.unwrap();
swarm.listen_on(listen_address).unwrap();
let (event_loop, swap_handle) = alice::EventLoop::new(
@ -234,7 +249,6 @@ fn start_alice(
db,
FixedRate::default(),
bitcoin::Amount::ONE_BTC,
false,
)
.unwrap();
@ -497,7 +511,8 @@ impl TestContext {
self.env_config,
self.alice_bitcoin_wallet.clone(),
self.alice_monero_wallet.clone(),
);
)
.await;
self.alice_handle = alice_handle;
self.alice_swap_handle = alice_swap_handle;

Loading…
Cancel
Save