random tests

use_fibres_instead_of_threads
moneroexamples 5 years ago
parent 8b339bf7f0
commit f0f2552be1

@ -184,13 +184,24 @@ if (!current_bc_status->init_monero_blockchain())
// by tx searching threads that are launched for each user independently,
// when they log back or create new account.
std::thread blockchain_monitoring_thread(
[&current_bc_status]()
//std::thread blockchain_monitoring_thread(
//[&current_bc_status]()
//{
//current_bc_status->monitor_blockchain();
//});
//
//
//boost::fibers::use_scheduling_algorithm<boost::fibers::algo::work_stealing>(2);
boost::fibers::fiber fb([&current_bc_status]()
{
current_bc_status->monitor_blockchain();
});
fb.detach();
OMINFO << "Blockchain monitoring thread started";
// try connecting to the mysql
@ -256,19 +267,19 @@ MAKE_RESOURCE(get_tx);
MAKE_RESOURCE(get_version);
// restbed service
Service service;
auto service = make_unique<Service>();
// Publish the Open Monero API created so that front end can use it
service.publish(login);
service.publish(get_address_txs);
service.publish(get_address_info);
service.publish(get_unspent_outs);
service.publish(get_random_outs);
service.publish(submit_raw_tx);
service.publish(import_wallet_request);
service.publish(import_recent_wallet_request);
service.publish(get_tx);
service.publish(get_version);
service->publish(login);
service->publish(get_address_txs);
service->publish(get_address_info);
service->publish(get_unspent_outs);
service->publish(get_random_outs);
service->publish(submit_raw_tx);
service->publish(import_wallet_request);
service->publish(import_recent_wallet_request);
service->publish(get_tx);
service->publish(get_version);
OMINFO << "JSON API endpoints published";
@ -317,13 +328,17 @@ signal(SIGINT, ExitHandler::exitHandler);
// restbed will be running and handling
// requests
//std::thread restbed_service(
//[&service, &settings]()
// [service = std::move(service), &settings]()
//{
//OMINFO << "Starting restbed service thread.";
//service.start(settings);
//boost::fibers::use_scheduling_algorithm<boost::fibers::algo::work_stealing>(2);
//service->start(settings);
//});
//fb.join();
service->start(settings);
service.start(settings);
//service.start(settings);
// we are going to whait here for as long
// as control+c hasn't been pressed
@ -333,7 +348,6 @@ service.start(settings);
return exitHandler.shouldExit();});
}
//////////////////////////////////////////////
// Try to gracefully stop all threads/services
//////////////////////////////////////////////
@ -342,9 +356,9 @@ OMINFO << "Stopping restbed service.";
//service.stop();
//restbed_service.join();
OMINFO << "Stopping blockchain_monitoring_thread. Please wait.";
current_bc_status->stop();
blockchain_monitoring_thread.join();
//OMINFO << "Stopping blockchain_monitoring_thread. Please wait.";
//current_bc_status->stop();
//blockchain_monitoring_thread.join();
OMINFO << "Stopping mysql_ping. Please wait.";
mysql_ping.stop();

@ -58,11 +58,12 @@ CurrentBlockchainStatus::monitor_blockchain()
//clean_search_thread_map();
std::this_thread::sleep_for(
bc_setup.refresh_block_status_every);
// std::this_thread::sleep_for(
// bc_setup.refresh_block_status_every);
//boost::this_fiber::sleep_for(
// bc_setup.refresh_block_status_every);
cout << " monitor threadid " << std::this_thread::get_id() << endl;
boost::this_fiber::sleep_for(
bc_setup.refresh_block_status_every);
}
is_running = false;
@ -770,11 +771,47 @@ CurrentBlockchainStatus::start_tx_search_thread(
//searching_threads.insert(
//{acc.address, ThreadRAII2<TxSearch>(std::move(tx_search))});
searching_fibers.insert(
{acc.address, FiberRAII<TxSearch>(std::move(tx_search))});
// searching_fibers.insert(
// {acc.address, std::move(tx_search)});
// boost::fibers::fiber(std::ref(*tx_search)).detach();
// boost::fibers::fiber(
// std::ref(*searching_fibers.find(acc.address)->second)
// ).detach();
//boost::fibers::fiber fb(boost::fibers::launch::dispatch,
auto fb = make_unique<boost::fibers::fiber>(
[ts = std::move(tx_search)]()
{
cout << "Fiber starting" << endl;
(*ts)();
});
searching_fibers2.insert(
{acc.address, std::move(fb)});
//searching_fibers2.insert(
//{acc.address, boost::fibers::fiber(
//boost::fibers::launch::post,
//[ts = std::move(tx_search)]() -> void
//{
//cout << "Fiber starting" << endl;
//(*ts)();
//cout << "Fiber finished" << endl;
//})});
cout << "threadid " << std::this_thread::get_id() << endl;
searching_fibers2.find(acc.address)->second->detach();
//fb.detach();
//boost::this_fiber::sleep_for(120s);
OMINFO << acc.address.substr(0,6)
+ ": TxSearch thread created.";
//boost::this_fiber::yield();
}
catch (const std::exception& e)
{
@ -1051,7 +1088,7 @@ CurrentBlockchainStatus::get_search_thread(string const& acc_address)
"non-existing search thread");
}
return searching_fibers.find(acc_address)->second.get_functor();
return *searching_fibers.find(acc_address)->second;
}
void
@ -1062,14 +1099,14 @@ CurrentBlockchainStatus::clean_search_thread_map()
for (auto& st: searching_fibers)
{
if (search_thread_exist(st.first)
&& st.second.get_functor().still_searching() == false)
&& st.second->still_searching() == false)
{
// before erasing a search thread, check if there was any
// exception thrown by it
try
{
auto eptr = st.second.get_functor().get_exception_ptr();
auto eptr = st.second->get_exception_ptr();
if (eptr != nullptr)
std::rethrow_exception(eptr);
}
@ -1092,7 +1129,7 @@ CurrentBlockchainStatus::stop_search_threads()
for (auto& st: searching_fibers)
{
st.second.get_functor().stop();
st.second->stop();
}
}

@ -335,7 +335,8 @@ protected:
// make it static to guarantee only one such map exist.
map<string, ThreadRAII2<TxSearch>> searching_threads;
map<string, FiberRAII<TxSearch>> searching_fibers;
map<string, std::unique_ptr<TxSearch>> searching_fibers;
map<string, std::unique_ptr<boost::fibers::fiber>> searching_fibers2;
// thread that will be dispachaed and will keep monitoring blockchain
// and mempool changes

@ -57,11 +57,18 @@ class FiberRAII
FiberRAII(std::unique_ptr<T> _functor)
: f {std::move(_functor)},
fbr {new boost::fibers::fiber(std::ref(*f))}
{}
FiberRAII(FiberRAII&&) = default;
FiberRAII& operator=(FiberRAII&&) = default;
fbr {std::ref(*f)}
{
fbr.detach();
}
FiberRAII(FiberRAII&& other)
: f {std::move(other.f)},
fbr {std::move(other.fbr)}
{
std::cout << "FiberRAII(FiberRAII&& other)" << std::endl;
};
//FiberRAII& operator=(FiberRAII&&) = default;
//virtual ~FiberRAII() {
//std::cout << "virtual ~FiberRAII() " << std::endl;
//if (fbr.joinable())
@ -73,7 +80,7 @@ class FiberRAII
protected:
std::unique_ptr<T> f;
std::unique_ptr<boost::fibers::fiber> fbr;
boost::fibers::fiber fbr;
};
}

@ -89,6 +89,8 @@ TxSearch::operator()()
vector<block> blocks;
blocks = current_bc_status->get_blocks_range(h1, h2);
cout << "tx search threadid " << std::this_thread::get_id() << endl;
if (blocks.empty())
{
@ -113,11 +115,10 @@ TxSearch::operator()()
//.refresh_block_status_every)
//);
OMINFO << address_prefix + ": sleeping ";
boost::this_fiber::sleep_for(
std::chrono::seconds(
current_bc_status->get_bc_setup()
.refresh_block_status_every)
);
.refresh_block_status_every);
loop_timestamp = get_current_timestamp();
@ -145,12 +146,15 @@ TxSearch::operator()()
}
boost::this_fiber::yield();
OMINFO << address_prefix + ": analyzing "
<< blocks.size() << " blocks from "
<< h1 << " to " << h2
<< " out of " << last_block_height << " blocks";
OMINFO << address_prefix + ": yielding ";
boost::this_fiber::yield();
OMINFO << address_prefix + ": back from yield ";
vector<crypto::hash> txs_hashes_from_blocks;
vector<transaction> txs_in_blocks;

Loading…
Cancel
Save