@ -3,10 +3,12 @@ pub use bob::Bob;
use anyhow ::{ anyhow , bail , Context , Result } ;
use itertools ::Itertools ;
use libp2p ::PeerId ;
use serde ::de ::DeserializeOwned ;
use serde ::{ Deserialize , Serialize } ;
use std ::fmt ::Display ;
use std ::path ::Path ;
use std ::str ::FromStr ;
use uuid ::Uuid ;
mod alice ;
@ -63,7 +65,10 @@ impl Swap {
}
}
pub struct Database ( sled ::Db ) ;
pub struct Database {
swaps : sled ::Tree ,
peers : sled ::Tree ,
}
impl Database {
pub fn open ( path : & Path ) -> Result < Self > {
@ -72,22 +77,51 @@ impl Database {
let db =
sled ::open ( path ) . with_context ( | | format! ( "Could not open the DB at {:?}" , path ) ) ? ;
Ok ( Database ( db ) )
let swaps = db . open_tree ( "swaps" ) ? ;
let peers = db . open_tree ( "peers" ) ? ;
Ok ( Database { swaps , peers } )
}
pub async fn insert_peer_id ( & self , swap_id : Uuid , peer_id : PeerId ) -> Result < ( ) > {
let peer_id_str = peer_id . to_string ( ) ;
let key = serialize ( & swap_id ) ? ;
let value = serialize ( & peer_id_str ) . context ( "Could not serialize peer-id" ) ? ;
self . peers . insert ( key , value ) ? ;
self . peers
. flush_async ( )
. await
. map ( | _ | ( ) )
. context ( "Could not flush db" )
}
pub fn get_peer_id ( & self , swap_id : Uuid ) -> Result < PeerId > {
let key = serialize ( & swap_id ) ? ;
let encoded = self
. peers
. get ( & key ) ?
. ok_or_else ( | | anyhow ! ( "No peer-id found for swap id {} in database" , swap_id ) ) ? ;
let peer_id : String = deserialize ( & encoded ) . context ( "Could not deserialize peer-id" ) ? ;
Ok ( PeerId ::from_str ( peer_id . as_str ( ) ) ? )
}
pub async fn insert_latest_state ( & self , swap_id : Uuid , state : Swap ) -> Result < ( ) > {
let key = serialize ( & swap_id ) ? ;
let new_value = serialize ( & state ) . context ( "Could not serialize new state value" ) ? ;
let old_value = self . 0. get ( & key ) ? ;
let old_value = self . swaps .get ( & key ) ? ;
self . 0
self . swaps
. compare_and_swap ( key , old_value , Some ( new_value ) )
. context ( "Could not write in the DB" ) ?
. context ( "Stored swap somehow changed, aborting saving" ) ? ;
// TODO: see if this can be done through sled config
self . 0
self . swaps
. flush_async ( )
. await
. map ( | _ | ( ) )
@ -98,7 +132,7 @@ impl Database {
let key = serialize ( & swap_id ) ? ;
let encoded = self
. 0
. swaps
. get ( & key ) ?
. ok_or_else ( | | anyhow ! ( "Swap with id {} not found in database" , swap_id ) ) ? ;
@ -129,7 +163,7 @@ impl Database {
}
fn all_swaps_iter ( & self ) -> impl Iterator < Item = Result < ( Uuid , Swap ) > > {
self . 0 .iter ( ) . map ( | item | {
self . swaps .iter ( ) . map ( | item | {
let ( key , value ) = item . context ( "Failed to retrieve swap from DB" ) ? ;
let swap_id = deserialize ::< Uuid > ( & key ) ? ;
@ -277,4 +311,52 @@ mod tests {
assert_eq! ( err . downcast_ref ::< NotBob > ( ) . unwrap ( ) , & NotBob ) ;
}
#[ tokio::test ]
async fn can_save_swap_state_and_peer_id_with_same_swap_id ( ) -> Result < ( ) > {
let db_dir = tempfile ::tempdir ( ) . unwrap ( ) ;
let db = Database ::open ( db_dir . path ( ) ) . unwrap ( ) ;
let alice_id = Uuid ::new_v4 ( ) ;
let alice_state = Alice ::Done ( AliceEndState ::BtcPunished ) ;
let alice_swap = Swap ::Alice ( alice_state ) ;
let peer_id = PeerId ::random ( ) ;
db . insert_latest_state ( alice_id , alice_swap . clone ( ) ) . await ? ;
db . insert_peer_id ( alice_id , peer_id ) . await ? ;
let loaded_swap = db . get_state ( alice_id ) ? ;
let loaded_peer_id = db . get_peer_id ( alice_id ) ? ;
assert_eq! ( alice_swap , loaded_swap ) ;
assert_eq! ( peer_id , loaded_peer_id ) ;
Ok ( ( ) )
}
#[ tokio::test ]
async fn test_reopen_db ( ) -> Result < ( ) > {
let db_dir = tempfile ::tempdir ( ) . unwrap ( ) ;
let alice_id = Uuid ::new_v4 ( ) ;
let alice_state = Alice ::Done ( AliceEndState ::BtcPunished ) ;
let alice_swap = Swap ::Alice ( alice_state ) ;
let peer_id = PeerId ::random ( ) ;
{
let db = Database ::open ( db_dir . path ( ) ) . unwrap ( ) ;
db . insert_latest_state ( alice_id , alice_swap . clone ( ) ) . await ? ;
db . insert_peer_id ( alice_id , peer_id ) . await ? ;
}
let db = Database ::open ( db_dir . path ( ) ) . unwrap ( ) ;
let loaded_swap = db . get_state ( alice_id ) ? ;
let loaded_peer_id = db . get_peer_id ( alice_id ) ? ;
assert_eq! ( alice_swap , loaded_swap ) ;
assert_eq! ( peer_id , loaded_peer_id ) ;
Ok ( ( ) )
}
}