Compare commits

...

2 Commits

Author SHA1 Message Date
moneroexamples f0f2552be1 random tests
5 years ago
moneroexamples 8b339bf7f0 fiber initial trials
5 years ago

@ -90,6 +90,7 @@ find_package(Boost COMPONENTS
date_time date_time
chrono chrono
regex regex
fiber
serialization serialization
program_options program_options
date_time date_time

@ -184,13 +184,24 @@ if (!current_bc_status->init_monero_blockchain())
// by tx searching threads that are launched for each user independently, // by tx searching threads that are launched for each user independently,
// when they log back or create new account. // when they log back or create new account.
std::thread blockchain_monitoring_thread( //std::thread blockchain_monitoring_thread(
[&current_bc_status]() //[&current_bc_status]()
//{
//current_bc_status->monitor_blockchain();
//});
//
//
//boost::fibers::use_scheduling_algorithm<boost::fibers::algo::work_stealing>(2);
boost::fibers::fiber fb([&current_bc_status]()
{ {
current_bc_status->monitor_blockchain(); current_bc_status->monitor_blockchain();
}); });
fb.detach();
OMINFO << "Blockchain monitoring thread started"; OMINFO << "Blockchain monitoring thread started";
// try connecting to the mysql // try connecting to the mysql
@ -256,19 +267,19 @@ MAKE_RESOURCE(get_tx);
MAKE_RESOURCE(get_version); MAKE_RESOURCE(get_version);
// restbed service // restbed service
Service service; auto service = make_unique<Service>();
// Publish the Open Monero API created so that front end can use it // Publish the Open Monero API created so that front end can use it
service.publish(login); service->publish(login);
service.publish(get_address_txs); service->publish(get_address_txs);
service.publish(get_address_info); service->publish(get_address_info);
service.publish(get_unspent_outs); service->publish(get_unspent_outs);
service.publish(get_random_outs); service->publish(get_random_outs);
service.publish(submit_raw_tx); service->publish(submit_raw_tx);
service.publish(import_wallet_request); service->publish(import_wallet_request);
service.publish(import_recent_wallet_request); service->publish(import_recent_wallet_request);
service.publish(get_tx); service->publish(get_tx);
service.publish(get_version); service->publish(get_version);
OMINFO << "JSON API endpoints published"; OMINFO << "JSON API endpoints published";
@ -316,13 +327,18 @@ signal(SIGINT, ExitHandler::exitHandler);
// main restbed thread. this is where // main restbed thread. this is where
// restbed will be running and handling // restbed will be running and handling
// requests // requests
std::thread restbed_service( //std::thread restbed_service(
[&service, &settings]() // [service = std::move(service), &settings]()
{
OMINFO << "Starting restbed service thread.";
service.start(settings);
});
//{
//OMINFO << "Starting restbed service thread.";
//boost::fibers::use_scheduling_algorithm<boost::fibers::algo::work_stealing>(2);
//service->start(settings);
//});
//fb.join();
service->start(settings);
//service.start(settings);
// we are going to whait here for as long // we are going to whait here for as long
// as control+c hasn't been pressed // as control+c hasn't been pressed
@ -332,18 +348,17 @@ std::thread restbed_service(
return exitHandler.shouldExit();}); return exitHandler.shouldExit();});
} }
////////////////////////////////////////////// //////////////////////////////////////////////
// Try to gracefully stop all threads/services // Try to gracefully stop all threads/services
////////////////////////////////////////////// //////////////////////////////////////////////
OMINFO << "Stopping restbed service."; OMINFO << "Stopping restbed service.";
service.stop(); //service.stop();
restbed_service.join(); //restbed_service.join();
OMINFO << "Stopping blockchain_monitoring_thread. Please wait."; //OMINFO << "Stopping blockchain_monitoring_thread. Please wait.";
current_bc_status->stop(); //current_bc_status->stop();
blockchain_monitoring_thread.join(); //blockchain_monitoring_thread.join();
OMINFO << "Stopping mysql_ping. Please wait."; OMINFO << "Stopping mysql_ping. Please wait.";
mysql_ping.stop(); mysql_ping.stop();

@ -56,9 +56,13 @@ CurrentBlockchainStatus::monitor_blockchain()
OMINFO << "Current blockchain height: " << current_height OMINFO << "Current blockchain height: " << current_height
<< ", no of mempool txs: " << mempool_txs.size(); << ", no of mempool txs: " << mempool_txs.size();
clean_search_thread_map(); //clean_search_thread_map();
// std::this_thread::sleep_for(
// bc_setup.refresh_block_status_every);
std::this_thread::sleep_for( cout << " monitor threadid " << std::this_thread::get_id() << endl;
boost::this_fiber::sleep_for(
bc_setup.refresh_block_status_every); bc_setup.refresh_block_status_every);
} }
@ -764,11 +768,50 @@ CurrentBlockchainStatus::start_tx_search_thread(
{ {
// launch SearchTx thread for the given xmr account // launch SearchTx thread for the given xmr account
searching_threads.insert( //searching_threads.insert(
{acc.address, ThreadRAII2<TxSearch>(std::move(tx_search))}); //{acc.address, ThreadRAII2<TxSearch>(std::move(tx_search))});
// searching_fibers.insert(
// {acc.address, std::move(tx_search)});
// boost::fibers::fiber(std::ref(*tx_search)).detach();
// boost::fibers::fiber(
// std::ref(*searching_fibers.find(acc.address)->second)
// ).detach();
//boost::fibers::fiber fb(boost::fibers::launch::dispatch,
auto fb = make_unique<boost::fibers::fiber>(
[ts = std::move(tx_search)]()
{
cout << "Fiber starting" << endl;
(*ts)();
});
searching_fibers2.insert(
{acc.address, std::move(fb)});
//searching_fibers2.insert(
//{acc.address, boost::fibers::fiber(
//boost::fibers::launch::post,
//[ts = std::move(tx_search)]() -> void
//{
//cout << "Fiber starting" << endl;
//(*ts)();
//cout << "Fiber finished" << endl;
//})});
cout << "threadid " << std::this_thread::get_id() << endl;
searching_fibers2.find(acc.address)->second->detach();
//fb.detach();
//boost::this_fiber::sleep_for(120s);
OMINFO << acc.address.substr(0,6) OMINFO << acc.address.substr(0,6)
+ ": TxSearch thread created."; + ": TxSearch thread created.";
//boost::this_fiber::yield();
} }
catch (const std::exception& e) catch (const std::exception& e)
{ {
@ -843,7 +886,8 @@ CurrentBlockchainStatus::search_thread_exist(const string& address)
// no mutex here, as this will be executed // no mutex here, as this will be executed
// from other methods, which do use mutex. // from other methods, which do use mutex.
// so if you put mutex here, you will get into deadlock. // so if you put mutex here, you will get into deadlock.
return searching_threads.count(address) > 0; //return searching_threads.count(address) > 0;
return searching_fibers.count(address) > 0;
} }
bool bool
@ -876,7 +920,7 @@ CurrentBlockchainStatus::find_txs_in_mempool(
{ {
std::lock_guard<std::mutex> lck (getting_mempool_txs); std::lock_guard<std::mutex> lck (getting_mempool_txs);
if (searching_threads.count(address_str) == 0) if (searching_fibers.count(address_str) == 0)
{ {
// thread does not exist // thread does not exist
OMERROR << "thread for " << address_str << " does not exist"; OMERROR << "thread for " << address_str << " does not exist";
@ -1015,7 +1059,7 @@ CurrentBlockchainStatus::set_new_searched_blk_no(
{ {
std::lock_guard<std::mutex> lck (searching_threads_map_mtx); std::lock_guard<std::mutex> lck (searching_threads_map_mtx);
if (searching_threads.count(address) == 0) if (searching_fibers.count(address) == 0)
{ {
// thread does not exist // thread does not exist
OMERROR << address.substr(0,6) OMERROR << address.substr(0,6)
@ -1034,9 +1078,9 @@ CurrentBlockchainStatus::get_search_thread(string const& acc_address)
{ {
// do not need locking mutex here, as this function should be only // do not need locking mutex here, as this function should be only
// executed from others that do lock the mutex already. // executed from others that do lock the mutex already.
auto it = searching_threads.find(acc_address); auto it = searching_fibers.find(acc_address);
if (it == searching_threads.end()) if (it == searching_fibers.end())
{ {
OMERROR << "Search thread does not exisit for addr: " OMERROR << "Search thread does not exisit for addr: "
<< acc_address; << acc_address;
@ -1044,7 +1088,7 @@ CurrentBlockchainStatus::get_search_thread(string const& acc_address)
"non-existing search thread"); "non-existing search thread");
} }
return searching_threads.find(acc_address)->second.get_functor(); return *searching_fibers.find(acc_address)->second;
} }
void void
@ -1052,17 +1096,17 @@ CurrentBlockchainStatus::clean_search_thread_map()
{ {
std::lock_guard<std::mutex> lck (searching_threads_map_mtx); std::lock_guard<std::mutex> lck (searching_threads_map_mtx);
for (auto& st: searching_threads) for (auto& st: searching_fibers)
{ {
if (search_thread_exist(st.first) if (search_thread_exist(st.first)
&& st.second.get_functor().still_searching() == false) && st.second->still_searching() == false)
{ {
// before erasing a search thread, check if there was any // before erasing a search thread, check if there was any
// exception thrown by it // exception thrown by it
try try
{ {
auto eptr = st.second.get_functor().get_exception_ptr(); auto eptr = st.second->get_exception_ptr();
if (eptr != nullptr) if (eptr != nullptr)
std::rethrow_exception(eptr); std::rethrow_exception(eptr);
} }
@ -1073,7 +1117,7 @@ CurrentBlockchainStatus::clean_search_thread_map()
} }
OMINFO << "Ereasing a search thread"; OMINFO << "Ereasing a search thread";
searching_threads.erase(st.first); searching_fibers.erase(st.first);
} }
} }
} }
@ -1083,9 +1127,9 @@ CurrentBlockchainStatus::stop_search_threads()
{ {
std::lock_guard<std::mutex> lck (searching_threads_map_mtx); std::lock_guard<std::mutex> lck (searching_threads_map_mtx);
for (auto& st: searching_threads) for (auto& st: searching_fibers)
{ {
st.second.get_functor().stop(); st.second->stop();
} }
} }

@ -16,6 +16,8 @@
#include "../ext/ThreadPool.hpp" #include "../ext/ThreadPool.hpp"
#include <boost/fiber/all.hpp>
#include <iostream> #include <iostream>
#include <memory> #include <memory>
#include <thread> #include <thread>
@ -333,6 +335,9 @@ protected:
// make it static to guarantee only one such map exist. // make it static to guarantee only one such map exist.
map<string, ThreadRAII2<TxSearch>> searching_threads; map<string, ThreadRAII2<TxSearch>> searching_threads;
map<string, std::unique_ptr<TxSearch>> searching_fibers;
map<string, std::unique_ptr<boost::fibers::fiber>> searching_fibers2;
// thread that will be dispachaed and will keep monitoring blockchain // thread that will be dispachaed and will keep monitoring blockchain
// and mempool changes // and mempool changes
std::thread m_thread; std::thread m_thread;

@ -8,6 +8,8 @@
#include <thread> #include <thread>
#include <iostream> #include <iostream>
#include <boost/fiber/all.hpp>
namespace xmreg namespace xmreg
{ {
@ -48,6 +50,39 @@ protected:
std::unique_ptr<T> f; std::unique_ptr<T> f;
}; };
template <typename T>
class FiberRAII
{
public:
FiberRAII(std::unique_ptr<T> _functor)
: f {std::move(_functor)},
fbr {std::ref(*f)}
{
fbr.detach();
}
FiberRAII(FiberRAII&& other)
: f {std::move(other.f)},
fbr {std::move(other.fbr)}
{
std::cout << "FiberRAII(FiberRAII&& other)" << std::endl;
};
//FiberRAII& operator=(FiberRAII&&) = default;
//virtual ~FiberRAII() {
//std::cout << "virtual ~FiberRAII() " << std::endl;
//if (fbr.joinable())
//fbr.join();
//};
virtual ~FiberRAII() = default;
T& get_functor() {return *f;}
protected:
std::unique_ptr<T> f;
boost::fibers::fiber fbr;
};
} }
#endif //OPENMONERO_THREADRAII_H #endif //OPENMONERO_THREADRAII_H

@ -90,6 +90,8 @@ TxSearch::operator()()
blocks = current_bc_status->get_blocks_range(h1, h2); blocks = current_bc_status->get_blocks_range(h1, h2);
cout << "tx search threadid " << std::this_thread::get_id() << endl;
if (blocks.empty()) if (blocks.empty())
{ {
@ -107,11 +109,16 @@ TxSearch::operator()()
"Last scanned was " << h2; "Last scanned was " << h2;
} }
std::this_thread::sleep_for( //std::this_thread::sleep_for(
std::chrono::seconds( //std::chrono::seconds(
//current_bc_status->get_bc_setup()
//.refresh_block_status_every)
//);
OMINFO << address_prefix + ": sleeping ";
boost::this_fiber::sleep_for(
current_bc_status->get_bc_setup() current_bc_status->get_bc_setup()
.refresh_block_status_every) .refresh_block_status_every);
);
loop_timestamp = get_current_timestamp(); loop_timestamp = get_current_timestamp();
@ -138,11 +145,17 @@ TxSearch::operator()()
continue; continue;
} }
OMINFO << address_prefix + ": analyzing " OMINFO << address_prefix + ": analyzing "
<< blocks.size() << " blocks from " << blocks.size() << " blocks from "
<< h1 << " to " << h2 << h1 << " to " << h2
<< " out of " << last_block_height << " blocks"; << " out of " << last_block_height << " blocks";
OMINFO << address_prefix + ": yielding ";
boost::this_fiber::yield();
OMINFO << address_prefix + ": back from yield ";
vector<crypto::hash> txs_hashes_from_blocks; vector<crypto::hash> txs_hashes_from_blocks;
vector<transaction> txs_in_blocks; vector<transaction> txs_in_blocks;
vector<CurrentBlockchainStatus::txs_tuple_t> txs_data; vector<CurrentBlockchainStatus::txs_tuple_t> txs_data;

Loading…
Cancel
Save