Merge pull request #126 from moneroexamples/exit_handler

Control+C handling
new_rpc
moneroexamples 5 years ago committed by GitHub
commit 9c069e9865
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,7 +1,7 @@
#include "src/om_log.h"
#include "src/CmdLineOptions.h"
#include "src/MicroCore.h"
#include "src/YourMoneroRequests.h"
#include "src/OpenMoneroRequests.h"
#include "src/ThreadRAII.h"
#include "src/db/MysqlPing.h"
@ -15,6 +15,33 @@ using namespace restbed;
using boost::filesystem::path;
// signal exit handler, addpated from aleth
class ExitHandler
{
public:
static std::mutex m;
static std::condition_variable cv;
static void exitHandler(int)
{
std::lock_guard<std::mutex> lk(m);
s_shouldExit = true;
OMINFO << "Request to finish the openmonero received";
cv.notify_one();
}
bool shouldExit() const { return s_shouldExit; }
private:
static bool s_shouldExit;
};
bool ExitHandler::s_shouldExit {false};
std::mutex ExitHandler::m;
std::condition_variable ExitHandler::cv;
int
main(int ac, const char* av[])
{
@ -30,11 +57,20 @@ if (*help_opt)
return EXIT_SUCCESS;
}
auto monero_log_level =
*(opts.get_option<size_t>("monero-log-level"));
if (monero_log_level < 1 || monero_log_level > 4)
{
cerr << "monero-log-level,m option must be between 1 and 4!\n";
return EXIT_SUCCESS;
}
// setup monero logger
mlog_configure(mlog_get_default_log_path(""), true);
mlog_set_log("1");
mlog_set_log(std::to_string(monero_log_level).c_str());
string log_file = *(opts.get_option<string>("log-file"));
auto log_file = *(opts.get_option<string>("log-file"));
// setup a logger for Open Monero
@ -88,7 +124,7 @@ nlohmann::json config_json = bc_setup.get_config();
//cast port number in string to uint16
uint16_t app_port = boost::lexical_cast<uint16_t>(*port_opt);
auto app_port = boost::lexical_cast<uint16_t>(*port_opt);
// set mysql/mariadb connection details
xmreg::MySqlConnector::url = config_json["database"]["url"];
@ -125,10 +161,11 @@ if (!current_bc_status->init_monero_blockchain())
// by tx searching threads that are launched for each user independently,
// when they log back or create new account.
xmreg::ThreadRAII blockchain_monitoring_thread(
std::thread([current_bc_status]
{current_bc_status->monitor_blockchain();}),
xmreg::ThreadRAII::DtorAction::join);
std::thread blockchain_monitoring_thread(
[&current_bc_status]()
{
current_bc_status->monitor_blockchain();
});
OMINFO << "Blockchain monitoring thread started";
@ -171,14 +208,17 @@ xmreg::MysqlPing mysql_ping {
mysql_accounts->get_connection(),
bc_setup.mysql_ping_every};
xmreg::ThreadRAII mysql_ping_thread(
std::thread(std::ref(mysql_ping)),
xmreg::ThreadRAII::DtorAction::join);
std::thread mysql_ping_thread(
[&mysql_ping]()
{
mysql_ping();
});
OMINFO << "MySQL ping thread started";
// create REST JSON API services
xmreg::YourMoneroRequests open_monero(mysql_accounts, current_bc_status);
xmreg::OpenMoneroRequests open_monero(mysql_accounts, current_bc_status);
// create Open Monero APIs
MAKE_RESOURCE(login);
@ -241,8 +281,55 @@ else
OMINFO << "Start the service at http://127.0.0.1:" << app_port;
}
// intercept basic termination requests,
// including Ctrl+c
ExitHandler exitHandler;
signal(SIGABRT, ExitHandler::exitHandler);
signal(SIGTERM, ExitHandler::exitHandler);
signal(SIGINT, ExitHandler::exitHandler);
// main restbed thread. this is where
// restbed will be running and handling
// requests
std::thread restbed_service(
[&service, &settings]()
{
OMINFO << "Starting restbed service thread.";
service.start(settings);
});
// we are going to whait here for as long
// as control+c hasn't been pressed
{
std::unique_lock<std::mutex> lk(ExitHandler::m);
ExitHandler::cv.wait(lk, [&exitHandler]{
return exitHandler.shouldExit();});
}
//////////////////////////////////////////////
// Try to gracefully stop all threads/services
//////////////////////////////////////////////
OMINFO << "Stopping restbed service.";
service.stop();
restbed_service.join();
OMINFO << "Stopping blockchain_monitoring_thread. Please wait.";
current_bc_status->stop();
blockchain_monitoring_thread.join();
OMINFO << "Stopping mysql_ping. Please wait.";
mysql_ping.stop();
mysql_ping_thread.join();
OMINFO << "Disconnecting from database.";
mysql_accounts->disconnect();
service.start(settings);
OMINFO << "All done. Bye.";
return EXIT_SUCCESS;
}

@ -7,10 +7,10 @@ set(SOURCE_FILES
tools.cpp
CmdLineOptions.cpp
CurrentBlockchainStatus.cpp
db/MySqlConnector.cpp
db/MySqlAccounts.cpp
db/ssqlses.cpp
YourMoneroRequests.cpp
db/MySqlConnector.cpp
db/MySqlAccounts.cpp
db/ssqlses.cpp
OpenMoneroRequests.cpp
TxSearch.cpp
RPCCalls.cpp
OutputInputIdentification.cpp

@ -37,6 +37,9 @@ namespace xmreg
("config-file,c", value<string>()
->default_value("./config/config.json"),
"Config file path.")
("monero-log-level,m", value<size_t>()
->default_value(1),
"Monero log level 1-4, default is 1.")
("log-file,l", value<string>()
->default_value("./openmonero.log"),
"Name and path to log file. -l \"\" to disable log file.");

@ -35,9 +35,14 @@ CurrentBlockchainStatus::monitor_blockchain()
is_running = true;
while (true)
{
{
if (stop_blockchain_monitor_loop)
{
stop_search_threads();
clean_search_thread_map();
OMINFO << "Breaking monitor_blockchain thread loop.";
break;
}
update_current_blockchain_height();
@ -54,6 +59,8 @@ CurrentBlockchainStatus::monitor_blockchain()
is_running = false;
}
OMINFO << "Exiting monitor_blockchain thread loop.";
}
uint64_t
@ -897,6 +904,17 @@ CurrentBlockchainStatus::clean_search_thread_map()
}
}
void
CurrentBlockchainStatus::stop_search_threads()
{
std::lock_guard<std::mutex> lck (searching_threads_map_mtx);
for (auto& st: searching_threads)
{
st.second.get_functor().stop();
}
}
tuple<string, string, string>
CurrentBlockchainStatus::construct_output_rct_field(
const uint64_t global_amount_index,

@ -228,6 +228,9 @@ public:
virtual void
clean_search_thread_map();
virtual void
stop_search_threads();
/*
* The frontend requires rct field to work
* the filed consisitct of rct_pk, mask, and amount.
@ -264,6 +267,9 @@ public:
virtual TxSearch&
get_search_thread(string const& acc_address);
inline virtual void
stop() {stop_blockchain_monitor_loop = true;}
// default destructor is fine
virtual ~CurrentBlockchainStatus() = default;

@ -224,18 +224,18 @@ MicroCore::init_success() const
return initialization_succeded;
}
MicroCore::~MicroCore()
{
//cout << "\n\nMicroCore::~MicroCore()\n\n";
if (initialization_succeded)
{
//core_storage.get_db().safesyncmode(true);
if (core_storage.get_db().is_open())
core_storage.get_db().close();
//cout << "\n\n core_storage.get_db().close();;\n\n";
}
}
//MicroCore::~MicroCore()
//{
// //cout << "\n\nMicroCore::~MicroCore()\n\n";
// if (initialization_succeded)
// {
// //core_storage.get_db().safesyncmode(true);
// if (core_storage.get_db().is_open())
// core_storage.get_db().close();
// //cout << "\n\n core_storage.get_db().close();;\n\n";
// }
//}
}

@ -212,11 +212,10 @@ public:
secret_key);
}
virtual bool
init_success() const;
virtual ~MicroCore();
virtual ~MicroCore() = default;
};
}

@ -5,7 +5,7 @@
#define MYSQLPP_SSQLS_NO_STATICS 1
#include "YourMoneroRequests.h"
#include "OpenMoneroRequests.h"
#include "db/ssqlses.h"
#include "OutputInputIdentification.h"
@ -28,7 +28,7 @@ handel_::operator()(const shared_ptr< Session > session)
YourMoneroRequests::YourMoneroRequests(
OpenMoneroRequests::OpenMoneroRequests(
shared_ptr<MySqlAccounts> _acc,
shared_ptr<CurrentBlockchainStatus> _current_bc_status):
xmr_accounts {_acc}, current_bc_status {_current_bc_status}
@ -38,7 +38,7 @@ YourMoneroRequests::YourMoneroRequests(
void
YourMoneroRequests::login(const shared_ptr<Session> session, const Bytes & body)
OpenMoneroRequests::login(const shared_ptr<Session> session, const Bytes & body)
{
json j_response;
json j_request;
@ -157,7 +157,7 @@ YourMoneroRequests::login(const shared_ptr<Session> session, const Bytes & body)
}
void
YourMoneroRequests::get_address_txs(
OpenMoneroRequests::get_address_txs(
const shared_ptr< Session > session, const Bytes & body)
{
json j_response;
@ -389,7 +389,7 @@ YourMoneroRequests::get_address_txs(
}
void
YourMoneroRequests::get_address_info(
OpenMoneroRequests::get_address_info(
const shared_ptr< Session > session, const Bytes & body)
{
json j_response;
@ -555,7 +555,7 @@ YourMoneroRequests::get_address_info(
void
YourMoneroRequests::get_unspent_outs(
OpenMoneroRequests::get_unspent_outs(
const shared_ptr< Session > session,
const Bytes & body)
{
@ -774,7 +774,7 @@ YourMoneroRequests::get_unspent_outs(
}
void
YourMoneroRequests::get_random_outs(
OpenMoneroRequests::get_random_outs(
const shared_ptr< Session > session, const Bytes & body)
{
json j_request;
@ -888,7 +888,7 @@ YourMoneroRequests::get_random_outs(
void
YourMoneroRequests::submit_raw_tx(
OpenMoneroRequests::submit_raw_tx(
const shared_ptr< Session > session, const Bytes & body)
{
json j_request = body_to_json(body);
@ -982,7 +982,7 @@ YourMoneroRequests::submit_raw_tx(
}
void
YourMoneroRequests::import_wallet_request(
OpenMoneroRequests::import_wallet_request(
const shared_ptr< Session > session, const Bytes & body)
{
@ -1241,7 +1241,7 @@ YourMoneroRequests::import_wallet_request(
void
YourMoneroRequests::import_recent_wallet_request(
OpenMoneroRequests::import_recent_wallet_request(
const shared_ptr< Session > session, const Bytes & body)
{
json j_response;
@ -1373,7 +1373,7 @@ YourMoneroRequests::import_recent_wallet_request(
void
YourMoneroRequests::get_tx(
OpenMoneroRequests::get_tx(
const shared_ptr< Session > session, const Bytes & body)
{
json j_response;
@ -1738,7 +1738,7 @@ YourMoneroRequests::get_tx(
void
YourMoneroRequests::get_version(
OpenMoneroRequests::get_version(
const shared_ptr< Session > session,
const Bytes & body)
{
@ -1767,8 +1767,8 @@ YourMoneroRequests::get_version(
shared_ptr<Resource>
YourMoneroRequests::make_resource(
function< void (YourMoneroRequests&, const shared_ptr< Session >,
OpenMoneroRequests::make_resource(
function< void (OpenMoneroRequests&, const shared_ptr< Session >,
const Bytes& ) > handle_func,
const string& path)
{
@ -1787,7 +1787,7 @@ YourMoneroRequests::make_resource(
void
YourMoneroRequests::generic_options_handler(
OpenMoneroRequests::generic_options_handler(
const shared_ptr< Session > session )
{
const auto request = session->get_request( );
@ -1804,7 +1804,7 @@ YourMoneroRequests::generic_options_handler(
multimap<string, string>
YourMoneroRequests::make_headers(
OpenMoneroRequests::make_headers(
const multimap<string, string>& extra_headers)
{
multimap<string, string> headers {
@ -1819,20 +1819,20 @@ YourMoneroRequests::make_headers(
};
void
YourMoneroRequests::print_json_log(const string& text, const json& j)
OpenMoneroRequests::print_json_log(const string& text, const json& j)
{
cout << text << '\n' << j.dump(4) << endl;
}
string
YourMoneroRequests::body_to_string(const Bytes & body)
OpenMoneroRequests::body_to_string(const Bytes & body)
{
return string(reinterpret_cast<const char *>(body.data()), body.size());
}
json
YourMoneroRequests::body_to_json(const Bytes & body)
OpenMoneroRequests::body_to_json(const Bytes & body)
{
json j = json::parse(body_to_string(body));
return j;
@ -1840,13 +1840,13 @@ YourMoneroRequests::body_to_json(const Bytes & body)
uint64_t
YourMoneroRequests::get_current_blockchain_height()
OpenMoneroRequests::get_current_blockchain_height()
{
return current_bc_status->get_current_blockchain_height();
}
bool
YourMoneroRequests::login_and_start_search_thread(
OpenMoneroRequests::login_and_start_search_thread(
const string& xmr_address,
const string& view_key,
XmrAccount& acc,
@ -1946,7 +1946,7 @@ YourMoneroRequests::login_and_start_search_thread(
bool
YourMoneroRequests::parse_request(
OpenMoneroRequests::parse_request(
const Bytes& body,
vector<string>& values_map,
json& j_request,
@ -1979,7 +1979,7 @@ YourMoneroRequests::parse_request(
}
catch (std::exception& e)
{
cerr << "YourMoneroRequests::parse_request: " << e.what() << endl;
cerr << "OpenMoneroRequests::parse_request: " << e.what() << endl;
j_response["status"] = "error";
j_response["reason"] = "reqest json parsing failed";
@ -1989,7 +1989,7 @@ YourMoneroRequests::parse_request(
}
boost::optional<XmrAccount>
YourMoneroRequests::select_account(
OpenMoneroRequests::select_account(
string const& xmr_address) const
{
boost::optional<XmrAccount> acc = XmrAccount{};
@ -2006,7 +2006,7 @@ YourMoneroRequests::select_account(
}
boost::optional<XmrPayment>
YourMoneroRequests::select_payment(
OpenMoneroRequests::select_payment(
XmrAccount const& xmr_account) const
{
vector<XmrPayment> xmr_payments;
@ -2053,7 +2053,7 @@ YourMoneroRequests::select_payment(
}
void
YourMoneroRequests::session_close(
OpenMoneroRequests::session_close(
const shared_ptr< Session > session,
json& j_response,
int return_code,

@ -19,7 +19,7 @@
#ifndef MAKE_RESOURCE
#define MAKE_RESOURCE(name) auto name = open_monero.make_resource( \
&xmreg::YourMoneroRequests::name, "/" + string(#name));
&xmreg::OpenMoneroRequests::name, "/" + string(#name));
#endif
@ -58,7 +58,7 @@ struct handel_
};
class YourMoneroRequests
class OpenMoneroRequests
{
// this manages all mysql queries
@ -67,7 +67,7 @@ class YourMoneroRequests
public:
YourMoneroRequests(shared_ptr<MySqlAccounts> _acc,
OpenMoneroRequests(shared_ptr<MySqlAccounts> _acc,
shared_ptr<CurrentBlockchainStatus> _current_bc_status);
/**
@ -114,7 +114,7 @@ public:
get_version(const shared_ptr< Session > session, const Bytes & body);
shared_ptr<Resource>
make_resource(function< void (YourMoneroRequests&, const shared_ptr< Session >, const Bytes& ) > handle_func,
make_resource(function< void (OpenMoneroRequests&, const shared_ptr< Session >, const Bytes& ) > handle_func,
const string& path);
static void

@ -17,13 +17,15 @@ ThreadRAII::~ThreadRAII()
{
if (action == DtorAction::join)
{
//std::cout << "\nThreadRAII::~ThreadRAII() t.join()\n";
//std::cout << "\nThreadRAII::~ThreadRAII() t.join()\n"
// << std::endl;
t.join();
}
else
{
t.detach();
//std::cout << "\nThreadRAII::~ThreadRAII() t.detach()\n";
//std::cout << "\nThreadRAII::~ThreadRAII() t.detach()\n"
// << std::endl;
t.detach();
}
}
}

@ -15,7 +15,7 @@ namespace xmreg
class ThreadRAII
{
public:
enum class DtorAction { join, detach};
enum class DtorAction {join, detach};
ThreadRAII(std::thread&& _t, DtorAction _action);

@ -67,6 +67,8 @@ TxSearch::operator()()
auto current_bc_status_ptr = current_bc_status.get();
searching_is_ongoing = true;
// we put everything in massive catch, as there are plenty ways in which
// an exceptions can be thrown here. Mostly from mysql.
// but because this is detatch thread, we cant catch them in main thread.
@ -75,7 +77,8 @@ TxSearch::operator()()
try
{
while(continue_search)
{
{
seconds loop_timestamp {current_timestamp};
uint64_t last_block_height = current_bc_status->current_height;
@ -604,16 +607,18 @@ TxSearch::operator()()
set_exception_ptr();
}
searching_is_ongoing = false;
// it will stop anyway, but just call it so we get info message pritened out
stop();
}
void
TxSearch::stop()
{
OMINFO << address_prefix + ": stopping the thread "
"by setting "
"continue_search=false";
OMINFO << address_prefix + ": stopping the thread";
continue_search = false;
}
@ -655,7 +660,7 @@ TxSearch::ping()
bool
TxSearch::still_searching() const
{
return continue_search;
return searching_is_ongoing;
}
void

@ -45,8 +45,13 @@ private:
// using the service.
static seconds thread_search_life;
// indicate that a thread loop should keep running
bool continue_search {true};
// this acctually indicates whether thread loop finished
// its execution
bool searching_is_ongoing {false};
// to store last exception thrown in the search thread
// using this, a main thread can get info what went wrong here
std::exception_ptr eptr;

@ -7,22 +7,51 @@
#include "MysqlPing.h"
#include <algorithm>
namespace xmreg
{
MysqlPing::MysqlPing(
std::shared_ptr<MySqlConnector> _conn,
seconds _ping_time)
: conn {_conn}, ping_time {_ping_time}
seconds _ping_time, seconds _sleep_time)
: conn {_conn}, ping_time {_ping_time},
thread_sleep_time {_sleep_time}
{}
void
MysqlPing::operator()()
{
// the while loop below is going to execute
// faster than ping_time specified. The reason
// is so that we can exit it in a timely manner
// when keep_looping becomes false
uint64_t between_ping_counter {0};
// if ping_time lower than thread_sleep_time,
// use ping_time for thread_sleep_time
thread_sleep_time = std::min(thread_sleep_time, ping_time);
// we are going to ping mysql every
// no_of_loops_between_pings iterations
// of the while loop
uint64_t no_of_loops_between_pings
= std::max<uint64_t>(1,
ping_time.count()/thread_sleep_time.count()) - 1;
while (keep_looping)
{
std::this_thread::sleep_for(chrono::seconds(ping_time));
std::this_thread::sleep_for(thread_sleep_time);
if (++between_ping_counter <= no_of_loops_between_pings)
{
continue;
}
between_ping_counter = 0;
if (auto c = conn.lock())
{
@ -44,6 +73,8 @@ MysqlPing::operator()()
++counter;
}
OMINFO << "Exiting Mysql ping thread loop.";
}
}

@ -24,7 +24,8 @@ public:
enum class StopReason {NotYetStopped, PingFailed, PointerExpired};
MysqlPing(std::shared_ptr<MySqlConnector> _conn,
seconds _ping_time = 300s);
seconds _ping_time = 300s,
seconds _sleep_time = 5s);
void operator()();
void stop() {keep_looping = false;}
@ -35,9 +36,12 @@ public:
MysqlPing(MysqlPing&&) = default;
MysqlPing& operator=(MysqlPing&&) = default;
~MysqlPing() = default;
private:
std::weak_ptr<MySqlConnector> conn;
seconds ping_time; // in seconds
seconds ping_time {300};
seconds thread_sleep_time {5};
atomic<bool> keep_looping {true};
atomic<uint64_t> counter {0};
atomic<StopReason> why_stoped {StopReason::NotYetStopped};

@ -3,7 +3,7 @@
//
#include "../src/MicroCore.h"
#include "../src/YourMoneroRequests.h"
#include "../src/OpenMoneroRequests.h"
#include "../src/db/MysqlPing.h"
//#include "chaingen.h"

Loading…
Cancel
Save