diff --git a/CMakeLists.txt b/CMakeLists.txt index a8547a5..b4dc426 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -90,6 +90,7 @@ find_package(Boost COMPONENTS date_time chrono regex + fiber serialization program_options date_time diff --git a/main.cpp b/main.cpp index a1b94a7..ecdeb3c 100755 --- a/main.cpp +++ b/main.cpp @@ -316,13 +316,14 @@ signal(SIGINT, ExitHandler::exitHandler); // main restbed thread. this is where // restbed will be running and handling // requests -std::thread restbed_service( - [&service, &settings]() -{ - OMINFO << "Starting restbed service thread."; - service.start(settings); -}); +//std::thread restbed_service( + //[&service, &settings]() +//{ + //OMINFO << "Starting restbed service thread."; + //service.start(settings); +//}); +service.start(settings); // we are going to whait here for as long // as control+c hasn't been pressed @@ -338,8 +339,8 @@ std::thread restbed_service( ////////////////////////////////////////////// OMINFO << "Stopping restbed service."; -service.stop(); -restbed_service.join(); +//service.stop(); +//restbed_service.join(); OMINFO << "Stopping blockchain_monitoring_thread. Please wait."; current_bc_status->stop(); diff --git a/src/CurrentBlockchainStatus.cpp b/src/CurrentBlockchainStatus.cpp index 841d9d2..352a323 100755 --- a/src/CurrentBlockchainStatus.cpp +++ b/src/CurrentBlockchainStatus.cpp @@ -56,10 +56,13 @@ CurrentBlockchainStatus::monitor_blockchain() OMINFO << "Current blockchain height: " << current_height << ", no of mempool txs: " << mempool_txs.size(); - clean_search_thread_map(); + //clean_search_thread_map(); std::this_thread::sleep_for( bc_setup.refresh_block_status_every); + + //boost::this_fiber::sleep_for( + // bc_setup.refresh_block_status_every); } is_running = false; @@ -764,8 +767,11 @@ CurrentBlockchainStatus::start_tx_search_thread( { // launch SearchTx thread for the given xmr account - searching_threads.insert( - {acc.address, ThreadRAII2(std::move(tx_search))}); + //searching_threads.insert( + //{acc.address, ThreadRAII2(std::move(tx_search))}); + + searching_fibers.insert( + {acc.address, FiberRAII(std::move(tx_search))}); OMINFO << acc.address.substr(0,6) + ": TxSearch thread created."; @@ -843,7 +849,8 @@ CurrentBlockchainStatus::search_thread_exist(const string& address) // no mutex here, as this will be executed // from other methods, which do use mutex. // so if you put mutex here, you will get into deadlock. - return searching_threads.count(address) > 0; + //return searching_threads.count(address) > 0; + return searching_fibers.count(address) > 0; } bool @@ -876,7 +883,7 @@ CurrentBlockchainStatus::find_txs_in_mempool( { std::lock_guard lck (getting_mempool_txs); - if (searching_threads.count(address_str) == 0) + if (searching_fibers.count(address_str) == 0) { // thread does not exist OMERROR << "thread for " << address_str << " does not exist"; @@ -1015,7 +1022,7 @@ CurrentBlockchainStatus::set_new_searched_blk_no( { std::lock_guard lck (searching_threads_map_mtx); - if (searching_threads.count(address) == 0) + if (searching_fibers.count(address) == 0) { // thread does not exist OMERROR << address.substr(0,6) @@ -1034,9 +1041,9 @@ CurrentBlockchainStatus::get_search_thread(string const& acc_address) { // do not need locking mutex here, as this function should be only // executed from others that do lock the mutex already. - auto it = searching_threads.find(acc_address); + auto it = searching_fibers.find(acc_address); - if (it == searching_threads.end()) + if (it == searching_fibers.end()) { OMERROR << "Search thread does not exisit for addr: " << acc_address; @@ -1044,7 +1051,7 @@ CurrentBlockchainStatus::get_search_thread(string const& acc_address) "non-existing search thread"); } - return searching_threads.find(acc_address)->second.get_functor(); + return searching_fibers.find(acc_address)->second.get_functor(); } void @@ -1052,7 +1059,7 @@ CurrentBlockchainStatus::clean_search_thread_map() { std::lock_guard lck (searching_threads_map_mtx); - for (auto& st: searching_threads) + for (auto& st: searching_fibers) { if (search_thread_exist(st.first) && st.second.get_functor().still_searching() == false) @@ -1073,7 +1080,7 @@ CurrentBlockchainStatus::clean_search_thread_map() } OMINFO << "Ereasing a search thread"; - searching_threads.erase(st.first); + searching_fibers.erase(st.first); } } } @@ -1083,7 +1090,7 @@ CurrentBlockchainStatus::stop_search_threads() { std::lock_guard lck (searching_threads_map_mtx); - for (auto& st: searching_threads) + for (auto& st: searching_fibers) { st.second.get_functor().stop(); } diff --git a/src/CurrentBlockchainStatus.h b/src/CurrentBlockchainStatus.h index 06fc756..95d0cc7 100755 --- a/src/CurrentBlockchainStatus.h +++ b/src/CurrentBlockchainStatus.h @@ -16,6 +16,8 @@ #include "../ext/ThreadPool.hpp" +#include + #include #include #include @@ -332,6 +334,8 @@ protected: // map, key is address to which a running thread belongs to. // make it static to guarantee only one such map exist. map> searching_threads; + + map> searching_fibers; // thread that will be dispachaed and will keep monitoring blockchain // and mempool changes @@ -339,7 +343,7 @@ protected: // to synchronize searching access to searching_threads map mutex searching_threads_map_mtx; - + // to synchronize access to mempool_txs vector mutex getting_mempool_txs; diff --git a/src/ThreadRAII.h b/src/ThreadRAII.h index e2b7921..a9a1896 100755 --- a/src/ThreadRAII.h +++ b/src/ThreadRAII.h @@ -8,6 +8,8 @@ #include #include +#include + namespace xmreg { @@ -48,6 +50,32 @@ protected: std::unique_ptr f; }; +template +class FiberRAII +{ + public: + + FiberRAII(std::unique_ptr _functor) + : f {std::move(_functor)}, + fbr {new boost::fibers::fiber(std::ref(*f))} + {} + + FiberRAII(FiberRAII&&) = default; + FiberRAII& operator=(FiberRAII&&) = default; + //virtual ~FiberRAII() { + //std::cout << "virtual ~FiberRAII() " << std::endl; + //if (fbr.joinable()) + //fbr.join(); + //}; + virtual ~FiberRAII() = default; + + T& get_functor() {return *f;} + + protected: + std::unique_ptr f; + std::unique_ptr fbr; +}; + } #endif //OPENMONERO_THREADRAII_H diff --git a/src/TxSearch.cpp b/src/TxSearch.cpp index 58da5cc..4054f63 100755 --- a/src/TxSearch.cpp +++ b/src/TxSearch.cpp @@ -107,7 +107,13 @@ TxSearch::operator()() "Last scanned was " << h2; } - std::this_thread::sleep_for( + //std::this_thread::sleep_for( + //std::chrono::seconds( + //current_bc_status->get_bc_setup() + //.refresh_block_status_every) + //); + + boost::this_fiber::sleep_for( std::chrono::seconds( current_bc_status->get_bc_setup() .refresh_block_status_every) @@ -138,6 +144,9 @@ TxSearch::operator()() continue; } + + boost::this_fiber::yield(); + OMINFO << address_prefix + ": analyzing " << blocks.size() << " blocks from " << h1 << " to " << h2