474: Add more log details r=bonomat a=bonomat

Resolves #448 

1. The first commit adds an additional log statement of the exchange rate for each state-update. This is useful because it allows us to measure profitability easily, i.e. by knowing what was the exchange rate when the swap was started and what was it when it was finalized.
2. The second commit changes a bunch of log messages. 
3. The third commit is adds a new commandline flag to toggle json format.




Co-authored-by: Philipp Hoenisch <philipp@hoenisch.at>
Co-authored-by: Philipp Hoenisch <philipp@coblox.tech>
pull/483/head
bors[bot] 3 years ago committed by GitHub
commit f03e8fa5af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -27,6 +27,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
When started with `--resume-only` the ASB does not accept new, incoming swap requests but only finishes swaps that are resumed upon startup.
- A minimum accepted Bitcoin amount for the ASB similar to the maximum amount already present.
For the CLI the minimum amount is enforced by waiting until at least the minimum is available as max-giveable amount.
- Added a new argument to ASB: `--json` or `-j`. If set, log messages will be printed in JSON format.
### Fixed

13
Cargo.lock generated

@ -4289,6 +4289,16 @@ dependencies = [
"tracing-core",
]
[[package]]
name = "tracing-serde"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb65ea441fbb84f9f6748fd496cf7f63ec9af5bca94dd86456978d055e8eb28b"
dependencies = [
"serde",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.2.18"
@ -4300,11 +4310,14 @@ dependencies = [
"lazy_static",
"matchers",
"regex",
"serde",
"serde_json",
"sharded-slab",
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
"tracing-serde",
]
[[package]]

@ -61,7 +61,7 @@ torut = { version = "0.1", default-features = false, features = [ "v3", "control
tracing = { version = "0.1", features = [ "attributes" ] }
tracing-appender = "0.1"
tracing-futures = { version = "0.2", features = [ "std-future", "futures-03" ] }
tracing-subscriber = { version = "0.2", default-features = false, features = [ "fmt", "ansi", "env-filter", "chrono", "tracing-log" ] }
tracing-subscriber = { version = "0.2", default-features = false, features = [ "fmt", "ansi", "env-filter", "chrono", "tracing-log", "json" ] }
url = { version = "2", features = [ "serde" ] }
uuid = { version = "0.8", features = [ "serde", "v4" ] }
void = "1"

@ -10,6 +10,13 @@ use uuid::Uuid;
author
)]
pub struct Arguments {
#[structopt(
short,
long = "json",
help = "Changes the log messages to json vs plain-text. If you run ASB as a service, it is recommended to set this to true to simplify log analyses."
)]
pub json: bool,
#[structopt(
long = "config",
help = "Provide a custom path to the configuration file. The configuration file must be a toml file.",

@ -105,8 +105,8 @@ pub struct ConfigNotInitialized {}
pub fn read_config(config_path: PathBuf) -> Result<Result<Config, ConfigNotInitialized>> {
if config_path.exists() {
info!(
"Using config file at default path: {}",
config_path.display()
path = %config_path.display(),
"Using config file at",
);
} else {
return Ok(Err(ConfigNotInitialized {}));
@ -148,8 +148,8 @@ where
fs::write(&config_path, toml)?;
info!(
"Initial setup complete, config file created at {} ",
config_path.as_path().display()
path = %config_path.as_path().display(),
"Initial setup complete, config file created",
);
Ok(())
}

@ -1,8 +1,9 @@
use anyhow::Result;
use tracing_subscriber::filter::LevelFilter;
use tracing_subscriber::fmt::time::ChronoLocal;
use tracing_subscriber::FmtSubscriber;
pub fn init(level: LevelFilter) -> Result<()> {
pub fn init(level: LevelFilter, json_format: bool) -> Result<()> {
if level == LevelFilter::OFF {
return Ok(());
}
@ -13,15 +14,18 @@ pub fn init(level: LevelFilter) -> Result<()> {
.with_env_filter(format!("asb={},swap={}", level, level))
.with_writer(std::io::stderr)
.with_ansi(is_terminal)
.with_timer(ChronoLocal::with_format("%F %T".to_owned()))
.with_target(false);
if !is_terminal {
builder.without_time().init();
} else {
if json_format {
builder.json().init();
} else if is_terminal {
builder.init();
} else {
builder.without_time().init();
}
tracing::info!("Initialized tracing with level: {}", level);
tracing::info!(%level, "Initialized tracing");
Ok(())
}

@ -35,7 +35,7 @@ use swap::protocol::alice::{redeem, run, EventLoop};
use swap::seed::Seed;
use swap::tor::AuthenticatedClient;
use swap::{asb, bitcoin, env, kraken, monero, tor};
use tracing::{info, warn};
use tracing::{debug, info, warn};
use tracing_subscriber::filter::LevelFilter;
#[macro_use]
@ -45,9 +45,8 @@ const DEFAULT_WALLET_NAME: &str = "asb-wallet";
#[tokio::main]
async fn main() -> Result<()> {
asb::tracing::init(LevelFilter::DEBUG).expect("initialize tracing");
let opt = Arguments::from_args();
asb::tracing::init(LevelFilter::DEBUG, opt.json).expect("initialize tracing");
let config_path = if let Some(config_path) = opt.config {
config_path
@ -64,8 +63,8 @@ async fn main() -> Result<()> {
};
info!(
"Database and Seed will be stored in directory: {}",
config.data.dir.display()
db_folder = %config.data.dir.display(),
"Database and Seed will be stored in",
);
let db_path = config.data.dir.join("database");
@ -81,20 +80,21 @@ async fn main() -> Result<()> {
match opt.cmd {
Command::Start { resume_only } => {
let bitcoin_wallet = init_bitcoin_wallet(&config, &seed, env_config).await?;
let monero_wallet = init_monero_wallet(&config, env_config).await?;
let bitcoin_balance = bitcoin_wallet.balance().await?;
info!("Bitcoin balance: {}", bitcoin_balance);
info!(%bitcoin_balance, "Initialized Bitcoin wallet");
let monero_balance = monero_wallet.get_balance().await?;
if monero_balance == Amount::ZERO {
let deposit_address = monero_wallet.get_main_address();
let monero_address = monero_wallet.get_main_address();
warn!(
"The Monero balance is 0, make sure to deposit funds at: {}",
deposit_address
%monero_address,
"The Monero balance is 0, make sure to deposit funds at",
)
} else {
info!("Monero balance: {}", monero_balance);
info!(%monero_balance, "Initialized Monero wallet");
}
let kraken_price_updates = kraken::connect()?;
@ -104,14 +104,14 @@ async fn main() -> Result<()> {
tor::Client::new(config.tor.socks5_port).with_control_port(config.tor.control_port);
let _ac = match tor_client.assert_tor_running().await {
Ok(_) => {
tracing::info!("Tor found. Setting up hidden service. ");
tracing::info!("Tor found. Setting up hidden service");
let ac =
register_tor_services(config.network.clone().listen, tor_client, &seed)
.await?;
Some(ac)
}
Err(_) => {
tracing::warn!("Tor not found. Running on clear net. ");
tracing::warn!("Tor not found. Running on clear net");
None
}
};
@ -140,7 +140,7 @@ async fn main() -> Result<()> {
Arc::new(bitcoin_wallet),
Arc::new(monero_wallet),
Arc::new(db),
kraken_rate,
kraken_rate.clone(),
config.maker.min_buy_btc,
config.maker.max_buy_btc,
)
@ -148,21 +148,22 @@ async fn main() -> Result<()> {
tokio::spawn(async move {
while let Some(swap) = swap_receiver.recv().await {
let rate = kraken_rate.clone();
tokio::spawn(async move {
let swap_id = swap.swap_id;
match run(swap).await {
match run(swap, rate).await {
Ok(state) => {
tracing::debug!(%swap_id, "Swap finished with state {}", state)
tracing::debug!(%swap_id, %state, "Swap finished with state")
}
Err(e) => {
tracing::error!(%swap_id, "Swap failed with {:#}", e)
Err(error) => {
tracing::error!(%swap_id, "Swap failed. Error {:#}", error)
}
}
});
}
});
info!("Our peer id is {}", event_loop.peer_id());
info!(perr_id = %event_loop.peer_id(), "Our peer id");
event_loop.run().await;
}
@ -202,7 +203,10 @@ async fn main() -> Result<()> {
let bitcoin_balance = bitcoin_wallet.balance().await?;
let monero_balance = monero_wallet.get_balance().await?;
tracing::info!("Current balance: {}, {}", bitcoin_balance, monero_balance);
tracing::info!(
%bitcoin_balance,
%monero_balance,
"Current balance");
}
Command::ManualRecovery(ManualRecovery::Cancel {
cancel_params: RecoverCommandParams { swap_id, force },
@ -273,6 +277,7 @@ async fn init_bitcoin_wallet(
seed: &Seed,
env_config: swap::env::Config,
) -> Result<bitcoin::Wallet> {
debug!("Opening Bitcoin wallet");
let wallet_dir = config.data.dir.join("wallet");
let wallet = bitcoin::Wallet::new(
@ -294,6 +299,7 @@ async fn init_monero_wallet(
config: &Config,
env_config: swap::env::Config,
) -> Result<monero::Wallet> {
debug!("Opening Monero wallet");
let wallet = monero::Wallet::open_or_create(
config.monero.wallet_rpc_url.clone(),
DEFAULT_WALLET_NAME.to_string(),
@ -340,7 +346,8 @@ async fn register_tor_services(
.get_address_without_dot_onion();
hidden_services_details.iter().for_each(|(port, _)| {
tracing::info!("/onion3/{}:{}", onion_address, port);
let onion_address = format!("/onion3/{}:{}", onion_address, port);
tracing::info!(%onion_address);
});
Ok(ac)

@ -245,8 +245,7 @@ async fn main() -> Result<()> {
debug!("Cancel transaction successfully published with id {}", txid)
}
Err(bob::cancel::Error::CancelTimelockNotExpiredYet) => error!(
"The Cancel Transaction cannot be published yet, \
because the timelock has not expired. Please try again later."
"The Cancel Transaction cannot be published yet, because the timelock has not expired. Please try again later"
),
}
}

@ -102,7 +102,7 @@ impl Wallet {
format!("Failed to broadcast Bitcoin {} transaction {}", kind, txid)
})?;
tracing::info!(%txid, "Published Bitcoin {} transaction", kind);
tracing::info!(%txid, %kind, "Published Bitcoin transaction");
Ok((txid, subscription))
}
@ -154,15 +154,16 @@ impl Wallet {
let new_status = match client.lock().await.status_of_script(&tx) {
Ok(new_status) => new_status,
Err(e) => {
tracing::warn!(%txid, "Failed to get status of script: {:#}", e);
Err(error) => {
tracing::warn!(%txid, "Failed to get status of script. Error {:#}", error);
return;
}
};
if Some(new_status) != last_status {
tracing::debug!(%txid, "Transaction is {}", new_status);
tracing::debug!(%txid, status = %new_status, "Transaction");
}
last_status = Some(new_status);
let all_receivers_gone = sender.send(new_status).is_err();
@ -200,7 +201,7 @@ impl Subscription {
let conf_target = self.finality_confirmations;
let txid = self.txid;
tracing::info!(%txid, "Waiting for {} confirmation{} of Bitcoin transaction", conf_target, if conf_target > 1 { "s" } else { "" });
tracing::info!(%txid, required_confirmation=%conf_target, "Waiting for Bitcoin transaction finality");
let mut seen_confirmations = 0;
@ -209,15 +210,18 @@ impl Subscription {
let confirmations = inner.confirmations();
if confirmations > seen_confirmations {
tracing::info!(%txid, "Bitcoin tx has {} out of {} confirmation{}", confirmations, conf_target, if conf_target > 1 { "s" } else { "" });
tracing::info!(%txid,
seen_confirmations = %confirmations,
needed_confirmations = %conf_target,
"Waiting for Bitcoin transaction finality");
seen_confirmations = confirmations;
}
inner.meets_target(conf_target)
},
_ => false
}
_ => false,
})
.await
.await
}
pub async fn wait_until_seen(&self) -> Result<()> {
@ -589,7 +593,7 @@ impl Client {
[] => Ok(ScriptStatus::Unseen),
[remaining @ .., last] => {
if !remaining.is_empty() {
tracing::warn!("Found more than a single history entry for script. This is highly unexpected and those history entries will be ignored.")
tracing::warn!("Found more than a single history entry for script. This is highly unexpected and those history entries will be ignored")
}
if last.height <= 0 {
@ -614,8 +618,8 @@ impl Client {
if let Some(new_block) = latest_block {
tracing::debug!(
"Got notification for new block at height {}",
new_block.height
block_height = new_block.height,
"Got notification for new block"
);
self.latest_block = BlockHeight::try_from(new_block)?;
}

@ -24,8 +24,8 @@ pub fn ensure_directory_exists(file: &Path) -> Result<(), std::io::Error> {
if let Some(path) = file.parent() {
if !path.exists() {
tracing::info!(
"Parent directory does not exist, creating recursively: {}",
file.display()
directory = %file.display(),
"Parent directory does not exist, creating recursively",
);
return std::fs::create_dir_all(path);
}

@ -43,8 +43,12 @@ pub fn connect() -> Result<PriceUpdates> {
}
},
|error, next: Duration| {
tracing::info!(%error, "Kraken websocket connection failed, retrying in {}ms", next.as_millis());
}
tracing::info!(
"Kraken websocket connection failed, retrying in {}ms. Error {:#}",
next.as_millis(),
error
);
},
)
.await;
@ -183,9 +187,8 @@ mod connection {
// if the message is not an event, it is a ticker update or an unknown event
Err(_) => match serde_json::from_str::<wire::PriceUpdate>(&msg) {
Ok(ticker) => ticker,
Err(e) => {
tracing::warn!(%e, "Failed to deserialize message '{}' as ticker update", msg);
Err(error) => {
tracing::warn!(%msg, "Failed to deserialize message as ticker update. Error {:#}", error);
return Ok(None);
}
},

@ -33,9 +33,9 @@ impl Wallet {
"Unable to create Monero wallet, please ensure that the monero-wallet-rpc is available",
)?;
tracing::debug!("Created Monero wallet {}", name);
tracing::debug!(monero_wallet_name = %name, "Created Monero wallet");
} else {
tracing::debug!("Opened Monero wallet {}", name);
tracing::debug!(monero_wallet_name = %name, "Opened Monero wallet");
}
Self::connect(client, name, env_config).await
@ -148,19 +148,21 @@ impl Wallet {
Ok(_) => match wallet.sweep_all(self.main_address.to_string()).await {
Ok(sweep_all) => {
for tx in sweep_all.tx_hash_list {
tracing::info!(%tx, "Monero transferred back to default wallet {}", self.main_address);
tracing::info!(
%tx,
monero_address = %self.main_address,
"Monero transferred back to default wallet");
}
}
Err(e) => {
Err(error) => {
tracing::warn!(
"Transferring Monero back to default wallet {} failed with {:#}",
self.main_address,
e
address = %self.main_address,
"Transferring Monero back to default wallet failed. Error {:#}", error
);
}
},
Err(e) => {
tracing::warn!("Refreshing the generated wallet failed with {:#}", e);
Err(error) => {
tracing::warn!("Refreshing the generated wallet failed. Error {:#}", error);
}
}
@ -187,10 +189,10 @@ impl Wallet {
.await?;
tracing::debug!(
"sent transfer of {} to {} in {}",
amount,
public_spend_key,
res.tx_hash
%amount,
to = %public_spend_key,
tx_id = %res.tx_hash,
"Sent transfer"
);
Ok(TransferProof::new(
@ -211,7 +213,11 @@ impl Wallet {
let txid = transfer_proof.tx_hash();
tracing::info!(%txid, "Waiting for {} confirmation{} of Monero transaction", conf_target, if conf_target > 1 { "s" } else { "" });
tracing::info!(
%txid,
target_confirmations = %conf_target,
"Waiting for Monero transaction finality"
);
let address = Address::standard(self.network, public_spend_key, public_view_key.into());
@ -311,7 +317,10 @@ where
let tx = match fetch_tx(txid.clone()).await {
Ok(proof) => proof,
Err(error) => {
tracing::debug!(%txid, "Failed to retrieve tx from blockchain: {:#}", error);
tracing::debug!(
%txid,
"Failed to retrieve tx from blockchain. Error {:#}", error
);
continue; // treating every error as transient and retrying
// is obviously wrong but the jsonrpc client is
// too primitive to differentiate between all the
@ -330,7 +339,12 @@ where
if tx.confirmations > seen_confirmations {
seen_confirmations = tx.confirmations;
tracing::info!(%txid, "Monero lock tx has {} out of {} confirmations", tx.confirmations, conf_target);
tracing::info!(
%txid,
%seen_confirmations,
needed_confirmations = %conf_target,
"Received new confirmation for Monero lock tx"
);
}
}

@ -121,7 +121,10 @@ impl WalletRpc {
.local_addr()?
.port();
tracing::debug!("Starting monero-wallet-rpc on port {}", port);
tracing::debug!(
%port,
"Starting monero-wallet-rpc on"
);
let mut child = Command::new(self.exec_path())
.env("LANG", "en_AU.UTF-8")

@ -45,10 +45,8 @@ where
e => io::Error::new(io::ErrorKind::Other, e),
})?;
let mut de = serde_cbor::Deserializer::from_slice(&message);
let msg = Req::deserialize(&mut de).map_err(|e| {
tracing::debug!("serde read_request error: {:?}", e);
io::Error::new(io::ErrorKind::Other, e)
})?;
let msg = Req::deserialize(&mut de)
.map_err(|error| io::Error::new(io::ErrorKind::Other, error))?;
Ok(msg)
}
@ -65,10 +63,8 @@ where
.await
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
let mut de = serde_cbor::Deserializer::from_slice(&message);
let msg = Res::deserialize(&mut de).map_err(|e| {
tracing::debug!("serde read_response error: {:?}", e);
io::Error::new(io::ErrorKind::InvalidData, e)
})?;
let msg = Res::deserialize(&mut de)
.map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
Ok(msg)
}
@ -99,10 +95,8 @@ where
where
T: AsyncWrite + Unpin + Send,
{
let bytes = serde_cbor::to_vec(&res).map_err(|e| {
tracing::debug!("serde write_response error: {:?}", e);
io::Error::new(io::ErrorKind::InvalidData, e)
})?;
let bytes = serde_cbor::to_vec(&res)
.map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
upgrade::write_one(io, &bytes).await?;
Ok(())

@ -59,10 +59,8 @@ where
.await
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
let mut de = serde_json::Deserializer::from_slice(&message);
let msg = Res::deserialize(&mut de).map_err(|e| {
tracing::debug!("serde read_response error: {:?}", e);
io::Error::new(io::ErrorKind::InvalidData, e)
})?;
let msg = Res::deserialize(&mut de)
.map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
Ok(msg)
}
@ -88,10 +86,8 @@ where
where
T: AsyncWrite + Unpin + Send,
{
let bytes = serde_json::to_vec(&res).map_err(|e| {
tracing::debug!("serde write_response error: {:?}", e);
io::Error::new(io::ErrorKind::InvalidData, e)
})?;
let bytes = serde_json::to_vec(&res)
.map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
upgrade::write_one(io, &bytes).await?;
Ok(())

@ -49,11 +49,11 @@ fn with_clear_net<B>(seed: &Seed, behaviour: B) -> Result<Swarm<B>>
where
B: NetworkBehaviour,
{
tracing::info!("All connections will go through clear net.");
tracing::info!("All connections will go through clear net");
let identity = seed.derive_libp2p_identity();
let transport = transport::build_clear_net(&identity)?;
let peer_id = identity.public().into_peer_id();
tracing::debug!("Our peer-id: {}", peer_id);
tracing::debug!(%peer_id, "Our peer-id");
let swarm = SwarmBuilder::new(transport, behaviour, peer_id)
.executor(Box::new(|f| {
@ -68,11 +68,11 @@ async fn with_tor<B>(seed: &Seed, behaviour: B, tor_socks5_port: u16) -> Result<
where
B: NetworkBehaviour,
{
tracing::info!("All connections will go through Tor socks5 proxy.");
tracing::info!("All connections will go through Tor socks5 proxy");
let identity = seed.derive_libp2p_identity();
let transport = transport::build_tor(&identity, tor_socks5_port)?;
let peer_id = identity.public().into_peer_id();
tracing::debug!("Our peer-id: {}", peer_id);
tracing::debug!(%peer_id, "Our peer-id");
let swarm = SwarmBuilder::new(transport, behaviour, peer_id)
.executor(Box::new(|f| {

@ -58,11 +58,10 @@ impl Transport for TorTcpConfig {
Ok(tor_address_string) => {
Ok(Box::pin(do_tor_dial(self.socks_port, tor_address_string)))
}
Err(e) => {
Err(error) => {
tracing::warn!(
"Address {} could not be formatted. Dialling via clear net. Details: {}",
addr,
e
address = %addr,
"Address could not be formatted. Dialling via clear net. Error {:#}", error,
);
self.inner.dial(addr)
}

@ -170,7 +170,7 @@ where
(redeem_address, punish_address)
}
_ => {
tracing::error!(%peer, "Failed to get new address during execution setup.");
tracing::error!(%peer, "Failed to get new address during execution setup");
continue;
}
};
@ -183,7 +183,7 @@ where
(tx_redeem_fee, tx_punish_fee)
}
_ => {
tracing::error!(%peer, "Failed to calculate transaction fees during execution setup.");
tracing::error!(%peer, "Failed to calculate transaction fees during execution setup");
continue;
}
};
@ -199,8 +199,8 @@ where
&mut OsRng
) {
Ok(state) => state,
Err(e) => {
tracing::warn!(%peer, "Failed to make State0 for execution setup: {:#}", e);
Err(error) => {
tracing::warn!(%peer, "Failed to make State0 for execution setup. Error {:#}", error);
continue;
}
};
@ -235,8 +235,8 @@ where
let quote = match self.make_quote(self.min_buy, self.max_buy).await {
Ok(quote) => quote,
Err(e) => {
tracing::warn!(%peer, "Failed to make quote: {:#}", e);
Err(error) => {
tracing::warn!(%peer, "Failed to make quote. Error {:#}", error);
continue;
}
};
@ -262,7 +262,10 @@ where
let swap_peer = match swap_peer {
Ok(swap_peer) => swap_peer,
Err(_) => {
tracing::warn!("Ignoring encrypted signature for unknown swap {} from {}", swap_id, peer);
tracing::warn!(
unknown_swap_id = %swap_id,
from = %peer,
"Ignoring encrypted signature for unknown swap");
continue;
}
};
@ -270,9 +273,10 @@ where
if swap_peer != peer {
tracing::warn!(
%swap_id,
"Ignoring malicious encrypted signature from {}, expected to receive it from {}",
peer,
swap_peer);
received_from = %peer,
expected_from = %swap_peer,
"Ignoring malicious encrypted signature which was not expected from this peer",
);
continue;
}
@ -300,7 +304,9 @@ where
}.boxed());
}
SwarmEvent::Behaviour(OutEvent::Failure {peer, error}) => {
tracing::error!(%peer, "Communication error: {:#}", error);
tracing::error!(
%peer,
"Communication error. Error {:#}", error);
}
SwarmEvent::ConnectionEstablished { peer_id: peer, endpoint, .. } => {
tracing::debug!(%peer, address = %endpoint.get_remote_address(), "New connection established");
@ -315,20 +321,20 @@ where
}
}
SwarmEvent::IncomingConnectionError { send_back_addr: address, error, .. } => {
tracing::warn!(%address, "Failed to set up connection with peer: {}", error);
tracing::warn!(%address, "Failed to set up connection with peer. Error {:#}", error);
}
SwarmEvent::ConnectionClosed { peer_id: peer, num_established, endpoint, cause } if num_established == 0 => {
match cause {
Some(error) => {
tracing::warn!(%peer, address = %endpoint.get_remote_address(), "Lost connection: {}", error);
tracing::warn!(%peer, address = %endpoint.get_remote_address(), "Lost connection. Error {:#}", error);
},
None => {
tracing::info!(%peer, address = %endpoint.get_remote_address(), "Successfully closed connection");
}
}
}
SwarmEvent::NewListenAddr(addr) => {
tracing::info!("Listening on {}", addr);
SwarmEvent::NewListenAddr(address) => {
tracing::info!(%address, "Listening on");
}
_ => {}
}
@ -345,8 +351,8 @@ where
let id = self.swarm.behaviour_mut().transfer_proof.send_request(&peer, transfer_proof);
self.inflight_transfer_proofs.insert(id, responder);
},
Some(Err(e)) => {
tracing::debug!("A swap stopped without sending a transfer proof: {:#}", e);
Some(Err(error)) => {
tracing::debug!("A swap stopped without sending a transfer proof. Error {:#}", error);
}
None => {
unreachable!("stream of transfer proof receivers must never terminate")

@ -2,21 +2,31 @@
//! Alice holds XMR and wishes receive BTC.
use crate::bitcoin::ExpiredTimelocks;
use crate::env::Config;
use crate::protocol::alice::event_loop::EventLoopHandle;
use crate::protocol::alice::event_loop::{EventLoopHandle, LatestRate};
use crate::protocol::alice::{AliceState, Swap};
use crate::{bitcoin, database, monero};
use anyhow::{bail, Context, Result};
use tokio::select;
use tokio::time::timeout;
use tracing::{error, info};
use tracing::{error, info, warn};
use uuid::Uuid;
pub async fn run(swap: Swap) -> Result<AliceState> {
run_until(swap, |_| false).await
pub async fn run<LR>(swap: Swap, rate_service: LR) -> Result<AliceState>
where
LR: LatestRate + Clone,
{
run_until(swap, |_| false, rate_service).await
}
#[tracing::instrument(name = "swap", skip(swap,exit_early), fields(id = %swap.swap_id), err)]
pub async fn run_until(mut swap: Swap, exit_early: fn(&AliceState) -> bool) -> Result<AliceState> {
#[tracing::instrument(name = "swap", skip(swap,exit_early,rate_service), fields(id = %swap.swap_id), err)]
pub async fn run_until<LR>(
mut swap: Swap,
exit_early: fn(&AliceState) -> bool,
rate_service: LR,
) -> Result<AliceState>
where
LR: LatestRate + Clone,
{
let mut current_state = swap.state;
while !is_complete(&current_state) && !exit_early(&current_state) {
@ -27,6 +37,7 @@ pub async fn run_until(mut swap: Swap, exit_early: fn(&AliceState) -> bool) -> R
swap.bitcoin_wallet.as_ref(),
swap.monero_wallet.as_ref(),
&swap.env_config,
rate_service.clone(),
)
.await?;
@ -39,15 +50,23 @@ pub async fn run_until(mut swap: Swap, exit_early: fn(&AliceState) -> bool) -> R
Ok(current_state)
}
async fn next_state(
async fn next_state<LR>(
swap_id: Uuid,
state: AliceState,
event_loop_handle: &mut EventLoopHandle,
bitcoin_wallet: &bitcoin::Wallet,
monero_wallet: &monero::Wallet,
env_config: &Config,
) -> Result<AliceState> {
info!("Current state: {}", state);
mut rate_service: LR,
) -> Result<AliceState>
where
LR: LatestRate,
{
let rate = rate_service
.latest_rate()
.map_or("NaN".to_string(), |rate| format!("{}", rate));
info!(%state, %rate, "Advancing state");
Ok(match state {
AliceState::Started { state3 } => {
@ -59,10 +78,10 @@ async fn next_state(
.await
{
Err(_) => {
tracing::info!(
"TxLock lock did not get {} confirmations in {} minutes",
env_config.bitcoin_finality_confirmations,
env_config.bitcoin_lock_confirmed_timeout.as_secs_f64() / 60.0
info!(
confirmations_needed = %env_config.bitcoin_finality_confirmations,
minutes = %env_config.bitcoin_lock_confirmed_timeout.as_secs_f64() / 60.0,
"TxLock lock did not get enough confirmations in time",
);
AliceState::SafelyAborted
}
@ -164,7 +183,7 @@ async fn next_state(
}
}
enc_sig = event_loop_handle.recv_encrypted_signature() => {
tracing::info!("Received encrypted signature");
info!("Received encrypted signature");
AliceState::EncSigLearned {
monero_wallet_restore_blockheight,
@ -191,8 +210,11 @@ async fn next_state(
bail!("Waiting for Bitcoin transaction finality failed with {}! The redeem transaction was published, but it is not ensured that the transaction was included! You're screwed.", e)
}
},
Err(e) => {
error!("Publishing the redeem transaction failed with {}, attempting to wait for cancellation now. If you restart the application before the timelock is expired publishing the redeem transaction will be retried.", e);
Err(error) => {
error!(
"Publishing the redeem transaction failed. Error {:#}",
error
);
tx_lock_status
.wait_until_confirmed_with(state3.cancel_timelock)
.await?;
@ -204,8 +226,9 @@ async fn next_state(
}
}
},
Err(e) => {
error!("Constructing the redeem transaction failed with {}, attempting to wait for cancellation now.", e);
Err(error) => {
error!(
"Constructing the redeem transaction failed. Attempting to wait for cancellation now. Error {:#}", error);
tx_lock_status
.wait_until_confirmed_with(state3.cancel_timelock)
.await?;
@ -306,10 +329,10 @@ async fn next_state(
match punish {
Ok(_) => AliceState::BtcPunished,
Err(e) => {
tracing::warn!(
"Falling back to refund because punish transaction failed with {:#}",
e
Err(error) => {
warn!(
"Falling back to refund because punish transaction failed. Error {:#}",
error
);
// Upon punish failure we assume that the refund tx was included but we

@ -134,7 +134,7 @@ impl EventLoop {
if swap_id != self.swap_id {
// TODO: Save unexpected transfer proofs in the database and check for messages in the database when handling swaps
tracing::warn!("Received unexpected transfer proof for swap {} while running swap {}. This transfer proof will be ignored.", swap_id, self.swap_id);
tracing::warn!("Received unexpected transfer proof for swap {} while running swap {}. This transfer proof will be ignored", swap_id, self.swap_id);
// When receiving a transfer proof that is unexpected we still have to acknowledge that it was received
let _ = self.swarm.behaviour_mut().transfer_proof.send_response(channel, ());

@ -3,6 +3,7 @@ pub mod harness;
use harness::alice_run_until::is_xmr_lock_transaction_sent;
use harness::bob_run_until::is_btc_locked;
use harness::FastCancelConfig;
use swap::protocol::alice::event_loop::FixedRate;
use swap::protocol::alice::AliceState;
use swap::protocol::bob::BobState;
use swap::protocol::{alice, bob};
@ -15,7 +16,11 @@ async fn given_alice_and_bob_manually_refund_after_funds_locked_both_refund() {
let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked));
let alice_swap = ctx.alice_next_swap().await;
let alice_swap = tokio::spawn(alice::run_until(alice_swap, is_xmr_lock_transaction_sent));
let alice_swap = tokio::spawn(alice::run_until(
alice_swap,
is_xmr_lock_transaction_sent,
FixedRate::default(),
));
let bob_state = bob_swap.await??;
assert!(matches!(bob_state, BobState::BtcLocked { .. }));

@ -3,6 +3,7 @@ pub mod harness;
use harness::alice_run_until::is_xmr_lock_transaction_sent;
use harness::bob_run_until::is_btc_locked;
use harness::SlowCancelConfig;
use swap::protocol::alice::event_loop::FixedRate;
use swap::protocol::alice::AliceState;
use swap::protocol::bob::BobState;
use swap::protocol::{alice, bob};
@ -15,7 +16,11 @@ async fn given_alice_and_bob_manually_cancel_when_timelock_not_expired_errors()
let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked));
let alice_swap = ctx.alice_next_swap().await;
let alice_swap = tokio::spawn(alice::run_until(alice_swap, is_xmr_lock_transaction_sent));
let alice_swap = tokio::spawn(alice::run_until(
alice_swap,
is_xmr_lock_transaction_sent,
FixedRate::default(),
));
let bob_state = bob_swap.await??;
assert!(matches!(bob_state, BobState::BtcLocked { .. }));

@ -3,6 +3,7 @@ pub mod harness;
use harness::alice_run_until::is_xmr_lock_transaction_sent;
use harness::bob_run_until::is_btc_locked;
use harness::SlowCancelConfig;
use swap::protocol::alice::event_loop::FixedRate;
use swap::protocol::alice::AliceState;
use swap::protocol::bob::BobState;
use swap::protocol::{alice, bob};
@ -15,7 +16,11 @@ async fn given_alice_and_bob_manually_force_cancel_when_timelock_not_expired_err
let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked));
let alice_swap = ctx.alice_next_swap().await;
let alice_swap = tokio::spawn(alice::run_until(alice_swap, is_xmr_lock_transaction_sent));
let alice_swap = tokio::spawn(alice::run_until(
alice_swap,
is_xmr_lock_transaction_sent,
FixedRate::default(),
));
let bob_state = bob_swap.await??;
assert!(matches!(bob_state, BobState::BtcLocked { .. }));

@ -3,6 +3,7 @@ pub mod harness;
use harness::alice_run_until::is_xmr_lock_transaction_sent;
use harness::bob_run_until::is_btc_locked;
use harness::FastPunishConfig;
use swap::protocol::alice::event_loop::FixedRate;
use swap::protocol::alice::AliceState;
use swap::protocol::bob::BobState;
use swap::protocol::{alice, bob};
@ -20,7 +21,11 @@ async fn alice_manually_punishes_after_bob_dead() {
let alice_swap = ctx.alice_next_swap().await;
let alice_bitcoin_wallet = alice_swap.bitcoin_wallet.clone();
let alice_swap = tokio::spawn(alice::run_until(alice_swap, is_xmr_lock_transaction_sent));
let alice_swap = tokio::spawn(alice::run_until(
alice_swap,
is_xmr_lock_transaction_sent,
FixedRate::default(),
));
let bob_state = bob_swap.await??;
assert!(matches!(bob_state, BobState::BtcLocked { .. }));

@ -2,6 +2,7 @@ pub mod harness;
use harness::alice_run_until::is_encsig_learned;
use harness::SlowCancelConfig;
use swap::protocol::alice::event_loop::FixedRate;
use swap::protocol::alice::redeem::Finality;
use swap::protocol::alice::AliceState;
use swap::protocol::{alice, bob};
@ -15,7 +16,11 @@ async fn alice_manually_redeems_after_enc_sig_learned() {
let bob_swap = tokio::spawn(bob::run(bob_swap));
let alice_swap = ctx.alice_next_swap().await;
let alice_swap = tokio::spawn(alice::run_until(alice_swap, is_encsig_learned));
let alice_swap = tokio::spawn(alice::run_until(
alice_swap,
is_encsig_learned,
FixedRate::default(),
));
let alice_state = alice_swap.await??;
assert!(matches!(alice_state, AliceState::EncSigLearned { .. }));

@ -3,6 +3,7 @@ pub mod harness;
use harness::alice_run_until::is_xmr_lock_transaction_sent;
use harness::bob_run_until::is_btc_locked;
use harness::FastPunishConfig;
use swap::protocol::alice::event_loop::FixedRate;
use swap::protocol::alice::AliceState;
use swap::protocol::bob::BobState;
use swap::protocol::{alice, bob};
@ -19,7 +20,11 @@ async fn alice_punishes_after_restart_if_bob_dead() {
let alice_swap = ctx.alice_next_swap().await;
let alice_bitcoin_wallet = alice_swap.bitcoin_wallet.clone();
let alice_swap = tokio::spawn(alice::run_until(alice_swap, is_xmr_lock_transaction_sent));
let alice_swap = tokio::spawn(alice::run_until(
alice_swap,
is_xmr_lock_transaction_sent,
FixedRate::default(),
));
let bob_state = bob_swap.await??;
assert!(matches!(bob_state, BobState::BtcLocked { .. }));
@ -40,7 +45,7 @@ async fn alice_punishes_after_restart_if_bob_dead() {
ctx.restart_alice().await;
let alice_swap = ctx.alice_next_swap().await;
let alice_swap = tokio::spawn(alice::run(alice_swap));
let alice_swap = tokio::spawn(alice::run(alice_swap, FixedRate::default()));
let alice_state = alice_swap.await??;
ctx.assert_alice_punished(alice_state).await;

@ -2,6 +2,7 @@ pub mod harness;
use harness::alice_run_until::is_xmr_lock_transaction_sent;
use harness::FastCancelConfig;
use swap::protocol::alice::event_loop::FixedRate;
use swap::protocol::alice::AliceState;
use swap::protocol::{alice, bob};
@ -14,7 +15,11 @@ async fn alice_refunds_after_restart_if_bob_already_refunded() {
let bob_swap = tokio::spawn(bob::run(bob_swap));
let alice_swap = ctx.alice_next_swap().await;
let alice_swap = tokio::spawn(alice::run_until(alice_swap, is_xmr_lock_transaction_sent));
let alice_swap = tokio::spawn(alice::run_until(
alice_swap,
is_xmr_lock_transaction_sent,
FixedRate::default(),
));
let bob_state = bob_swap.await??;
ctx.assert_bob_refunded(bob_state).await;
@ -27,7 +32,7 @@ async fn alice_refunds_after_restart_if_bob_already_refunded() {
ctx.restart_alice().await;
let alice_swap = ctx.alice_next_swap().await;
let alice_swap = tokio::spawn(alice::run(alice_swap));
let alice_swap = tokio::spawn(alice::run(alice_swap, FixedRate::default()));
let alice_state = alice_swap.await??;
ctx.assert_alice_refunded(alice_state).await;

@ -2,6 +2,7 @@ pub mod harness;
use harness::bob_run_until::is_xmr_locked;
use harness::SlowCancelConfig;
use swap::protocol::alice::event_loop::FixedRate;
use swap::protocol::alice::AliceState;
use swap::protocol::bob::BobState;
use swap::protocol::{alice, bob};
@ -16,7 +17,7 @@ async fn concurrent_bobs_after_xmr_lock_proof_sent() {
let bob_swap_1 = tokio::spawn(bob::run_until(bob_swap_1, is_xmr_locked));
let alice_swap_1 = ctx.alice_next_swap().await;
let alice_swap_1 = tokio::spawn(alice::run(alice_swap_1));
let alice_swap_1 = tokio::spawn(alice::run(alice_swap_1, FixedRate::default()));
let bob_state_1 = bob_swap_1.await??;
assert!(matches!(bob_state_1, BobState::XmrLocked { .. }));
@ -28,7 +29,7 @@ async fn concurrent_bobs_after_xmr_lock_proof_sent() {
let bob_swap_2 = tokio::spawn(bob::run(bob_swap_2));
let alice_swap_2 = ctx.alice_next_swap().await;
let alice_swap_2 = tokio::spawn(alice::run(alice_swap_2));
let alice_swap_2 = tokio::spawn(alice::run(alice_swap_2, FixedRate::default()));
// The 2nd swap should ALWAYS finish successfully in this
// scenario

@ -2,6 +2,7 @@ pub mod harness;
use harness::bob_run_until::is_btc_locked;
use harness::SlowCancelConfig;
use swap::protocol::alice::event_loop::FixedRate;
use swap::protocol::alice::AliceState;
use swap::protocol::bob::BobState;
use swap::protocol::{alice, bob};
@ -16,7 +17,7 @@ async fn concurrent_bobs_before_xmr_lock_proof_sent() {
let bob_swap_1 = tokio::spawn(bob::run_until(bob_swap_1, is_btc_locked));
let alice_swap_1 = ctx.alice_next_swap().await;
let alice_swap_1 = tokio::spawn(alice::run(alice_swap_1));
let alice_swap_1 = tokio::spawn(alice::run(alice_swap_1, FixedRate::default()));
let bob_state_1 = bob_swap_1.await??;
assert!(matches!(bob_state_1, BobState::BtcLocked(_)));
@ -28,7 +29,7 @@ async fn concurrent_bobs_before_xmr_lock_proof_sent() {
let bob_swap_2 = tokio::spawn(bob::run(bob_swap_2));
let alice_swap_2 = ctx.alice_next_swap().await;
let alice_swap_2 = tokio::spawn(alice::run(alice_swap_2));
let alice_swap_2 = tokio::spawn(alice::run(alice_swap_2, FixedRate::default()));
// The 2nd swap ALWAYS finish successfully in this
// scenario, but will receive an "unwanted" transfer proof that is ignored in

@ -1,6 +1,7 @@
pub mod harness;
use harness::SlowCancelConfig;
use swap::protocol::alice::event_loop::FixedRate;
use swap::protocol::{alice, bob};
use tokio::join;
@ -11,7 +12,7 @@ async fn happy_path() {
let bob_swap = tokio::spawn(bob::run(bob_swap));
let alice_swap = ctx.alice_next_swap().await;
let alice_swap = tokio::spawn(alice::run(alice_swap));
let alice_swap = tokio::spawn(alice::run(alice_swap, FixedRate::default()));
let (bob_state, alice_state) = join!(bob_swap, alice_swap);

@ -2,6 +2,7 @@ pub mod harness;
use harness::alice_run_until::is_xmr_lock_transaction_sent;
use harness::SlowCancelConfig;
use swap::protocol::alice::event_loop::FixedRate;
use swap::protocol::alice::AliceState;
use swap::protocol::{alice, bob};
@ -12,7 +13,11 @@ async fn given_alice_restarts_after_xmr_is_locked_resume_swap() {
let bob_swap = tokio::spawn(bob::run(bob_swap));
let alice_swap = ctx.alice_next_swap().await;
let alice_swap = tokio::spawn(alice::run_until(alice_swap, is_xmr_lock_transaction_sent));
let alice_swap = tokio::spawn(alice::run_until(
alice_swap,
is_xmr_lock_transaction_sent,
FixedRate::default(),
));
let alice_state = alice_swap.await??;
@ -28,7 +33,7 @@ async fn given_alice_restarts_after_xmr_is_locked_resume_swap() {
AliceState::XmrLockTransactionSent { .. }
));
let alice_state = alice::run(alice_swap).await?;
let alice_state = alice::run(alice_swap, FixedRate::default()).await?;
ctx.assert_alice_redeemed(alice_state).await;
let bob_state = bob_swap.await??;

@ -2,6 +2,7 @@ pub mod harness;
use harness::bob_run_until::is_xmr_locked;
use harness::SlowCancelConfig;
use swap::protocol::alice::event_loop::FixedRate;
use swap::protocol::bob::BobState;
use swap::protocol::{alice, bob};
@ -13,7 +14,7 @@ async fn given_bob_restarts_after_xmr_is_locked_resume_swap() {
let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_xmr_locked));
let alice_swap = ctx.alice_next_swap().await;
let alice_swap = tokio::spawn(alice::run(alice_swap));
let alice_swap = tokio::spawn(alice::run(alice_swap, FixedRate::default()));
let bob_state = bob_swap.await??;

@ -2,6 +2,7 @@ pub mod harness;
use harness::bob_run_until::is_xmr_locked;
use harness::SlowCancelConfig;
use swap::protocol::alice::event_loop::FixedRate;
use swap::protocol::bob::BobState;
use swap::protocol::{alice, bob};
@ -13,7 +14,7 @@ async fn given_bob_restarts_after_xmr_is_locked_resume_swap() {
let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_xmr_locked));
let alice_swap = ctx.alice_next_swap().await;
let alice_swap = tokio::spawn(alice::run(alice_swap));
let alice_swap = tokio::spawn(alice::run(alice_swap, FixedRate::default()));
let bob_state = bob_swap.await??;

@ -2,6 +2,7 @@ pub mod harness;
use harness::bob_run_until::is_btc_locked;
use harness::FastPunishConfig;
use swap::protocol::alice::event_loop::FixedRate;
use swap::protocol::bob::BobState;
use swap::protocol::{alice, bob};
@ -15,7 +16,7 @@ async fn alice_punishes_if_bob_never_acts_after_fund() {
let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked));
let alice_swap = ctx.alice_next_swap().await;
let alice_swap = tokio::spawn(alice::run(alice_swap));
let alice_swap = tokio::spawn(alice::run(alice_swap, FixedRate::default()));
let bob_state = bob_swap.await??;
assert!(matches!(bob_state, BobState::BtcLocked { .. }));

Loading…
Cancel
Save