Compare commits

...

2 Commits

Author SHA1 Message Date
moneroexamples f0f2552be1 random tests
5 years ago
moneroexamples 8b339bf7f0 fiber initial trials
5 years ago

@ -90,6 +90,7 @@ find_package(Boost COMPONENTS
date_time
chrono
regex
fiber
serialization
program_options
date_time

@ -184,13 +184,24 @@ if (!current_bc_status->init_monero_blockchain())
// by tx searching threads that are launched for each user independently,
// when they log back or create new account.
std::thread blockchain_monitoring_thread(
[&current_bc_status]()
//std::thread blockchain_monitoring_thread(
//[&current_bc_status]()
//{
//current_bc_status->monitor_blockchain();
//});
//
//
//boost::fibers::use_scheduling_algorithm<boost::fibers::algo::work_stealing>(2);
boost::fibers::fiber fb([&current_bc_status]()
{
current_bc_status->monitor_blockchain();
});
fb.detach();
OMINFO << "Blockchain monitoring thread started";
// try connecting to the mysql
@ -256,19 +267,19 @@ MAKE_RESOURCE(get_tx);
MAKE_RESOURCE(get_version);
// restbed service
Service service;
auto service = make_unique<Service>();
// Publish the Open Monero API created so that front end can use it
service.publish(login);
service.publish(get_address_txs);
service.publish(get_address_info);
service.publish(get_unspent_outs);
service.publish(get_random_outs);
service.publish(submit_raw_tx);
service.publish(import_wallet_request);
service.publish(import_recent_wallet_request);
service.publish(get_tx);
service.publish(get_version);
service->publish(login);
service->publish(get_address_txs);
service->publish(get_address_info);
service->publish(get_unspent_outs);
service->publish(get_random_outs);
service->publish(submit_raw_tx);
service->publish(import_wallet_request);
service->publish(import_recent_wallet_request);
service->publish(get_tx);
service->publish(get_version);
OMINFO << "JSON API endpoints published";
@ -316,13 +327,18 @@ signal(SIGINT, ExitHandler::exitHandler);
// main restbed thread. this is where
// restbed will be running and handling
// requests
std::thread restbed_service(
[&service, &settings]()
{
OMINFO << "Starting restbed service thread.";
service.start(settings);
});
//std::thread restbed_service(
// [service = std::move(service), &settings]()
//{
//OMINFO << "Starting restbed service thread.";
//boost::fibers::use_scheduling_algorithm<boost::fibers::algo::work_stealing>(2);
//service->start(settings);
//});
//fb.join();
service->start(settings);
//service.start(settings);
// we are going to whait here for as long
// as control+c hasn't been pressed
@ -332,18 +348,17 @@ std::thread restbed_service(
return exitHandler.shouldExit();});
}
//////////////////////////////////////////////
// Try to gracefully stop all threads/services
//////////////////////////////////////////////
OMINFO << "Stopping restbed service.";
service.stop();
restbed_service.join();
//service.stop();
//restbed_service.join();
OMINFO << "Stopping blockchain_monitoring_thread. Please wait.";
current_bc_status->stop();
blockchain_monitoring_thread.join();
//OMINFO << "Stopping blockchain_monitoring_thread. Please wait.";
//current_bc_status->stop();
//blockchain_monitoring_thread.join();
OMINFO << "Stopping mysql_ping. Please wait.";
mysql_ping.stop();

@ -56,9 +56,13 @@ CurrentBlockchainStatus::monitor_blockchain()
OMINFO << "Current blockchain height: " << current_height
<< ", no of mempool txs: " << mempool_txs.size();
clean_search_thread_map();
//clean_search_thread_map();
std::this_thread::sleep_for(
// std::this_thread::sleep_for(
// bc_setup.refresh_block_status_every);
cout << " monitor threadid " << std::this_thread::get_id() << endl;
boost::this_fiber::sleep_for(
bc_setup.refresh_block_status_every);
}
@ -764,11 +768,50 @@ CurrentBlockchainStatus::start_tx_search_thread(
{
// launch SearchTx thread for the given xmr account
searching_threads.insert(
{acc.address, ThreadRAII2<TxSearch>(std::move(tx_search))});
//searching_threads.insert(
//{acc.address, ThreadRAII2<TxSearch>(std::move(tx_search))});
// searching_fibers.insert(
// {acc.address, std::move(tx_search)});
// boost::fibers::fiber(std::ref(*tx_search)).detach();
// boost::fibers::fiber(
// std::ref(*searching_fibers.find(acc.address)->second)
// ).detach();
//boost::fibers::fiber fb(boost::fibers::launch::dispatch,
auto fb = make_unique<boost::fibers::fiber>(
[ts = std::move(tx_search)]()
{
cout << "Fiber starting" << endl;
(*ts)();
});
searching_fibers2.insert(
{acc.address, std::move(fb)});
//searching_fibers2.insert(
//{acc.address, boost::fibers::fiber(
//boost::fibers::launch::post,
//[ts = std::move(tx_search)]() -> void
//{
//cout << "Fiber starting" << endl;
//(*ts)();
//cout << "Fiber finished" << endl;
//})});
cout << "threadid " << std::this_thread::get_id() << endl;
searching_fibers2.find(acc.address)->second->detach();
//fb.detach();
//boost::this_fiber::sleep_for(120s);
OMINFO << acc.address.substr(0,6)
+ ": TxSearch thread created.";
//boost::this_fiber::yield();
}
catch (const std::exception& e)
{
@ -843,7 +886,8 @@ CurrentBlockchainStatus::search_thread_exist(const string& address)
// no mutex here, as this will be executed
// from other methods, which do use mutex.
// so if you put mutex here, you will get into deadlock.
return searching_threads.count(address) > 0;
//return searching_threads.count(address) > 0;
return searching_fibers.count(address) > 0;
}
bool
@ -876,7 +920,7 @@ CurrentBlockchainStatus::find_txs_in_mempool(
{
std::lock_guard<std::mutex> lck (getting_mempool_txs);
if (searching_threads.count(address_str) == 0)
if (searching_fibers.count(address_str) == 0)
{
// thread does not exist
OMERROR << "thread for " << address_str << " does not exist";
@ -1015,7 +1059,7 @@ CurrentBlockchainStatus::set_new_searched_blk_no(
{
std::lock_guard<std::mutex> lck (searching_threads_map_mtx);
if (searching_threads.count(address) == 0)
if (searching_fibers.count(address) == 0)
{
// thread does not exist
OMERROR << address.substr(0,6)
@ -1034,9 +1078,9 @@ CurrentBlockchainStatus::get_search_thread(string const& acc_address)
{
// do not need locking mutex here, as this function should be only
// executed from others that do lock the mutex already.
auto it = searching_threads.find(acc_address);
auto it = searching_fibers.find(acc_address);
if (it == searching_threads.end())
if (it == searching_fibers.end())
{
OMERROR << "Search thread does not exisit for addr: "
<< acc_address;
@ -1044,7 +1088,7 @@ CurrentBlockchainStatus::get_search_thread(string const& acc_address)
"non-existing search thread");
}
return searching_threads.find(acc_address)->second.get_functor();
return *searching_fibers.find(acc_address)->second;
}
void
@ -1052,17 +1096,17 @@ CurrentBlockchainStatus::clean_search_thread_map()
{
std::lock_guard<std::mutex> lck (searching_threads_map_mtx);
for (auto& st: searching_threads)
for (auto& st: searching_fibers)
{
if (search_thread_exist(st.first)
&& st.second.get_functor().still_searching() == false)
&& st.second->still_searching() == false)
{
// before erasing a search thread, check if there was any
// exception thrown by it
try
{
auto eptr = st.second.get_functor().get_exception_ptr();
auto eptr = st.second->get_exception_ptr();
if (eptr != nullptr)
std::rethrow_exception(eptr);
}
@ -1073,7 +1117,7 @@ CurrentBlockchainStatus::clean_search_thread_map()
}
OMINFO << "Ereasing a search thread";
searching_threads.erase(st.first);
searching_fibers.erase(st.first);
}
}
}
@ -1083,9 +1127,9 @@ CurrentBlockchainStatus::stop_search_threads()
{
std::lock_guard<std::mutex> lck (searching_threads_map_mtx);
for (auto& st: searching_threads)
for (auto& st: searching_fibers)
{
st.second.get_functor().stop();
st.second->stop();
}
}

@ -16,6 +16,8 @@
#include "../ext/ThreadPool.hpp"
#include <boost/fiber/all.hpp>
#include <iostream>
#include <memory>
#include <thread>
@ -332,6 +334,9 @@ protected:
// map, key is address to which a running thread belongs to.
// make it static to guarantee only one such map exist.
map<string, ThreadRAII2<TxSearch>> searching_threads;
map<string, std::unique_ptr<TxSearch>> searching_fibers;
map<string, std::unique_ptr<boost::fibers::fiber>> searching_fibers2;
// thread that will be dispachaed and will keep monitoring blockchain
// and mempool changes
@ -339,7 +344,7 @@ protected:
// to synchronize searching access to searching_threads map
mutex searching_threads_map_mtx;
// to synchronize access to mempool_txs vector
mutex getting_mempool_txs;

@ -8,6 +8,8 @@
#include <thread>
#include <iostream>
#include <boost/fiber/all.hpp>
namespace xmreg
{
@ -48,6 +50,39 @@ protected:
std::unique_ptr<T> f;
};
template <typename T>
class FiberRAII
{
public:
FiberRAII(std::unique_ptr<T> _functor)
: f {std::move(_functor)},
fbr {std::ref(*f)}
{
fbr.detach();
}
FiberRAII(FiberRAII&& other)
: f {std::move(other.f)},
fbr {std::move(other.fbr)}
{
std::cout << "FiberRAII(FiberRAII&& other)" << std::endl;
};
//FiberRAII& operator=(FiberRAII&&) = default;
//virtual ~FiberRAII() {
//std::cout << "virtual ~FiberRAII() " << std::endl;
//if (fbr.joinable())
//fbr.join();
//};
virtual ~FiberRAII() = default;
T& get_functor() {return *f;}
protected:
std::unique_ptr<T> f;
boost::fibers::fiber fbr;
};
}
#endif //OPENMONERO_THREADRAII_H

@ -89,6 +89,8 @@ TxSearch::operator()()
vector<block> blocks;
blocks = current_bc_status->get_blocks_range(h1, h2);
cout << "tx search threadid " << std::this_thread::get_id() << endl;
if (blocks.empty())
{
@ -107,11 +109,16 @@ TxSearch::operator()()
"Last scanned was " << h2;
}
std::this_thread::sleep_for(
std::chrono::seconds(
//std::this_thread::sleep_for(
//std::chrono::seconds(
//current_bc_status->get_bc_setup()
//.refresh_block_status_every)
//);
OMINFO << address_prefix + ": sleeping ";
boost::this_fiber::sleep_for(
current_bc_status->get_bc_setup()
.refresh_block_status_every)
);
.refresh_block_status_every);
loop_timestamp = get_current_timestamp();
@ -138,10 +145,16 @@ TxSearch::operator()()
continue;
}
OMINFO << address_prefix + ": analyzing "
<< blocks.size() << " blocks from "
<< h1 << " to " << h2
<< " out of " << last_block_height << " blocks";
OMINFO << address_prefix + ": yielding ";
boost::this_fiber::yield();
OMINFO << address_prefix + ": back from yield ";
vector<crypto::hash> txs_hashes_from_blocks;
vector<transaction> txs_in_blocks;

Loading…
Cancel
Save