fiber initial trials

use_fibres_instead_of_threads
moneroexamples 5 years ago
parent 857c7c9e95
commit 8b339bf7f0

@ -90,6 +90,7 @@ find_package(Boost COMPONENTS
date_time date_time
chrono chrono
regex regex
fiber
serialization serialization
program_options program_options
date_time date_time

@ -316,13 +316,14 @@ signal(SIGINT, ExitHandler::exitHandler);
// main restbed thread. this is where // main restbed thread. this is where
// restbed will be running and handling // restbed will be running and handling
// requests // requests
std::thread restbed_service( //std::thread restbed_service(
[&service, &settings]() //[&service, &settings]()
{ //{
OMINFO << "Starting restbed service thread."; //OMINFO << "Starting restbed service thread.";
service.start(settings); //service.start(settings);
}); //});
service.start(settings);
// we are going to whait here for as long // we are going to whait here for as long
// as control+c hasn't been pressed // as control+c hasn't been pressed
@ -338,8 +339,8 @@ std::thread restbed_service(
////////////////////////////////////////////// //////////////////////////////////////////////
OMINFO << "Stopping restbed service."; OMINFO << "Stopping restbed service.";
service.stop(); //service.stop();
restbed_service.join(); //restbed_service.join();
OMINFO << "Stopping blockchain_monitoring_thread. Please wait."; OMINFO << "Stopping blockchain_monitoring_thread. Please wait.";
current_bc_status->stop(); current_bc_status->stop();

@ -56,10 +56,13 @@ CurrentBlockchainStatus::monitor_blockchain()
OMINFO << "Current blockchain height: " << current_height OMINFO << "Current blockchain height: " << current_height
<< ", no of mempool txs: " << mempool_txs.size(); << ", no of mempool txs: " << mempool_txs.size();
clean_search_thread_map(); //clean_search_thread_map();
std::this_thread::sleep_for( std::this_thread::sleep_for(
bc_setup.refresh_block_status_every); bc_setup.refresh_block_status_every);
//boost::this_fiber::sleep_for(
// bc_setup.refresh_block_status_every);
} }
is_running = false; is_running = false;
@ -764,8 +767,11 @@ CurrentBlockchainStatus::start_tx_search_thread(
{ {
// launch SearchTx thread for the given xmr account // launch SearchTx thread for the given xmr account
searching_threads.insert( //searching_threads.insert(
{acc.address, ThreadRAII2<TxSearch>(std::move(tx_search))}); //{acc.address, ThreadRAII2<TxSearch>(std::move(tx_search))});
searching_fibers.insert(
{acc.address, FiberRAII<TxSearch>(std::move(tx_search))});
OMINFO << acc.address.substr(0,6) OMINFO << acc.address.substr(0,6)
+ ": TxSearch thread created."; + ": TxSearch thread created.";
@ -843,7 +849,8 @@ CurrentBlockchainStatus::search_thread_exist(const string& address)
// no mutex here, as this will be executed // no mutex here, as this will be executed
// from other methods, which do use mutex. // from other methods, which do use mutex.
// so if you put mutex here, you will get into deadlock. // so if you put mutex here, you will get into deadlock.
return searching_threads.count(address) > 0; //return searching_threads.count(address) > 0;
return searching_fibers.count(address) > 0;
} }
bool bool
@ -876,7 +883,7 @@ CurrentBlockchainStatus::find_txs_in_mempool(
{ {
std::lock_guard<std::mutex> lck (getting_mempool_txs); std::lock_guard<std::mutex> lck (getting_mempool_txs);
if (searching_threads.count(address_str) == 0) if (searching_fibers.count(address_str) == 0)
{ {
// thread does not exist // thread does not exist
OMERROR << "thread for " << address_str << " does not exist"; OMERROR << "thread for " << address_str << " does not exist";
@ -1015,7 +1022,7 @@ CurrentBlockchainStatus::set_new_searched_blk_no(
{ {
std::lock_guard<std::mutex> lck (searching_threads_map_mtx); std::lock_guard<std::mutex> lck (searching_threads_map_mtx);
if (searching_threads.count(address) == 0) if (searching_fibers.count(address) == 0)
{ {
// thread does not exist // thread does not exist
OMERROR << address.substr(0,6) OMERROR << address.substr(0,6)
@ -1034,9 +1041,9 @@ CurrentBlockchainStatus::get_search_thread(string const& acc_address)
{ {
// do not need locking mutex here, as this function should be only // do not need locking mutex here, as this function should be only
// executed from others that do lock the mutex already. // executed from others that do lock the mutex already.
auto it = searching_threads.find(acc_address); auto it = searching_fibers.find(acc_address);
if (it == searching_threads.end()) if (it == searching_fibers.end())
{ {
OMERROR << "Search thread does not exisit for addr: " OMERROR << "Search thread does not exisit for addr: "
<< acc_address; << acc_address;
@ -1044,7 +1051,7 @@ CurrentBlockchainStatus::get_search_thread(string const& acc_address)
"non-existing search thread"); "non-existing search thread");
} }
return searching_threads.find(acc_address)->second.get_functor(); return searching_fibers.find(acc_address)->second.get_functor();
} }
void void
@ -1052,7 +1059,7 @@ CurrentBlockchainStatus::clean_search_thread_map()
{ {
std::lock_guard<std::mutex> lck (searching_threads_map_mtx); std::lock_guard<std::mutex> lck (searching_threads_map_mtx);
for (auto& st: searching_threads) for (auto& st: searching_fibers)
{ {
if (search_thread_exist(st.first) if (search_thread_exist(st.first)
&& st.second.get_functor().still_searching() == false) && st.second.get_functor().still_searching() == false)
@ -1073,7 +1080,7 @@ CurrentBlockchainStatus::clean_search_thread_map()
} }
OMINFO << "Ereasing a search thread"; OMINFO << "Ereasing a search thread";
searching_threads.erase(st.first); searching_fibers.erase(st.first);
} }
} }
} }
@ -1083,7 +1090,7 @@ CurrentBlockchainStatus::stop_search_threads()
{ {
std::lock_guard<std::mutex> lck (searching_threads_map_mtx); std::lock_guard<std::mutex> lck (searching_threads_map_mtx);
for (auto& st: searching_threads) for (auto& st: searching_fibers)
{ {
st.second.get_functor().stop(); st.second.get_functor().stop();
} }

@ -16,6 +16,8 @@
#include "../ext/ThreadPool.hpp" #include "../ext/ThreadPool.hpp"
#include <boost/fiber/all.hpp>
#include <iostream> #include <iostream>
#include <memory> #include <memory>
#include <thread> #include <thread>
@ -333,6 +335,8 @@ protected:
// make it static to guarantee only one such map exist. // make it static to guarantee only one such map exist.
map<string, ThreadRAII2<TxSearch>> searching_threads; map<string, ThreadRAII2<TxSearch>> searching_threads;
map<string, FiberRAII<TxSearch>> searching_fibers;
// thread that will be dispachaed and will keep monitoring blockchain // thread that will be dispachaed and will keep monitoring blockchain
// and mempool changes // and mempool changes
std::thread m_thread; std::thread m_thread;

@ -8,6 +8,8 @@
#include <thread> #include <thread>
#include <iostream> #include <iostream>
#include <boost/fiber/all.hpp>
namespace xmreg namespace xmreg
{ {
@ -48,6 +50,32 @@ protected:
std::unique_ptr<T> f; std::unique_ptr<T> f;
}; };
template <typename T>
class FiberRAII
{
public:
FiberRAII(std::unique_ptr<T> _functor)
: f {std::move(_functor)},
fbr {new boost::fibers::fiber(std::ref(*f))}
{}
FiberRAII(FiberRAII&&) = default;
FiberRAII& operator=(FiberRAII&&) = default;
//virtual ~FiberRAII() {
//std::cout << "virtual ~FiberRAII() " << std::endl;
//if (fbr.joinable())
//fbr.join();
//};
virtual ~FiberRAII() = default;
T& get_functor() {return *f;}
protected:
std::unique_ptr<T> f;
std::unique_ptr<boost::fibers::fiber> fbr;
};
} }
#endif //OPENMONERO_THREADRAII_H #endif //OPENMONERO_THREADRAII_H

@ -107,7 +107,13 @@ TxSearch::operator()()
"Last scanned was " << h2; "Last scanned was " << h2;
} }
std::this_thread::sleep_for( //std::this_thread::sleep_for(
//std::chrono::seconds(
//current_bc_status->get_bc_setup()
//.refresh_block_status_every)
//);
boost::this_fiber::sleep_for(
std::chrono::seconds( std::chrono::seconds(
current_bc_status->get_bc_setup() current_bc_status->get_bc_setup()
.refresh_block_status_every) .refresh_block_status_every)
@ -138,6 +144,9 @@ TxSearch::operator()()
continue; continue;
} }
boost::this_fiber::yield();
OMINFO << address_prefix + ": analyzing " OMINFO << address_prefix + ": analyzing "
<< blocks.size() << " blocks from " << blocks.size() << " blocks from "
<< h1 << " to " << h2 << h1 << " to " << h2

Loading…
Cancel
Save