diff --git a/CMakeLists.txt b/CMakeLists.txt index 964d3d4..9516f3a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -85,6 +85,7 @@ set(HEADERS src/log.h src/mempool.h src/merge_mining_client.h + src/merge_mining_client_json_rpc.h src/merkle.h src/p2p_server.h src/p2pool.h @@ -118,6 +119,7 @@ set(SOURCES src/memory_leak_debug.cpp src/mempool.cpp src/merge_mining_client.cpp + src/merge_mining_client_json_rpc.cpp src/merkle.cpp src/p2p_server.cpp src/p2pool.cpp diff --git a/src/merge_mining_client.cpp b/src/merge_mining_client.cpp index e369841..b389245 100644 --- a/src/merge_mining_client.cpp +++ b/src/merge_mining_client.cpp @@ -17,361 +17,18 @@ #include "common.h" #include "merge_mining_client.h" -#include "p2pool.h" -#include "params.h" -#include "json_rpc_request.h" -#include "json_parsers.h" -#include - -LOG_CATEGORY(MergeMiningClient) +#include "merge_mining_client_json_rpc.h" namespace p2pool { -MergeMiningClient::MergeMiningClient(p2pool* pool, const std::string& host, const std::string& wallet) - : m_host(host) - , m_port(80) - , m_auxWallet(wallet) - , m_ping(0.0) - , m_pool(pool) - , m_loop{} - , m_loopThread{} - , m_timer{} - , m_getJobRunning(false) - , m_shutdownAsync{} -{ - const size_t k = host.find_last_of(':'); - if (k != std::string::npos) { - m_host = host.substr(0, k); - m_port = std::stoul(host.substr(k + 1), nullptr, 10); - } - - if (m_host.empty() || (m_port == 0) || (m_port >= 65536)) { - LOGERR(1, "Invalid host " << host); - throw std::exception(); - } - - int err = uv_loop_init(&m_loop); - if (err) { - LOGERR(1, "failed to create event loop, error " << uv_err_name(err)); - throw std::exception(); - } - - // Init loop user data before running it - GetLoopUserData(&m_loop); - - err = uv_async_init(&m_loop, &m_shutdownAsync, on_shutdown); - if (err) { - LOGERR(1, "uv_async_init failed, error " << uv_err_name(err)); - uv_loop_close(&m_loop); - throw std::exception(); - } - m_shutdownAsync.data = this; - - err = uv_timer_init(&m_loop, &m_timer); - if (err) { - LOGERR(1, "failed to create timer, error " << uv_err_name(err)); - uv_loop_close(&m_loop); - throw std::exception(); - } - m_timer.data = this; - - uv_rwlock_init_checked(&m_lock); - - err = uv_thread_create(&m_loopThread, loop, this); - if (err) { - LOGERR(1, "failed to start event loop thread, error " << uv_err_name(err)); - uv_loop_close(&m_loop); - throw std::exception(); - } - - merge_mining_get_chain_id(); -} - -MergeMiningClient::~MergeMiningClient() -{ - uv_async_send(&m_shutdownAsync); - uv_thread_join(&m_loopThread); - - uv_rwlock_destroy(&m_lock); - - LOGINFO(1, "stopped"); -} - -void MergeMiningClient::on_timer() -{ - MinerData data = m_pool->miner_data(); - merge_mining_get_job(data.height, data.prev_id, m_auxWallet, aux_data()); -} - -void MergeMiningClient::merge_mining_get_chain_id() -{ - const std::string req = "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"merge_mining_get_chain_id\"}"; - - JSONRPCRequest::call(m_host, m_port, req, std::string(), m_pool->params().m_socks5Proxy, - [this](const char* data, size_t size, double ping) { - WriteLock lock(m_lock); - - if (parse_merge_mining_get_chain_id(data, size)) { - if (ping > 0.0) { - m_ping = ping; - } - - LOGINFO(1, m_host << ':' << m_port << " uses chain_id " << log::LightCyan() << m_chainID); - LOGINFO(1, m_host << ':' << m_port << " ping is " << m_ping << " ms"); - - // Chain ID received successfully, we can start polling for new mining jobs now - const int err = uv_timer_start(&m_timer, on_timer, 0, 500); - if (err) { - LOGERR(1, "failed to start timer, error " << uv_err_name(err)); - } - } - }, - [this](const char* data, size_t size, double) { - if (size > 0) { - LOGERR(1, "couldn't get merge mining id from " << m_host << ':' << m_port << ", error " << log::const_buf(data, size)); - } - }, &m_loop); -} - -bool MergeMiningClient::parse_merge_mining_get_chain_id(const char* data, size_t size) -{ - auto err = [this](const char* msg) { - LOGWARN(3, "merge_mining_get_chain_id RPC call failed: " << msg << ". Trying again in 1 second."); - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - merge_mining_get_chain_id(); - return false; - }; - - rapidjson::Document doc; - - if (doc.Parse(data, size).HasParseError() || !doc.IsObject()) { - return err("parsing failed"); - } - - if (doc.HasMember("error")) { - return err(doc["error"].IsString() ? doc["error"].GetString() : "an unknown error occurred"); - } - - if (!doc.HasMember("result")) { - return err("\"result\" field not found"); - } - - const auto& result = doc["result"]; - - if (!result.IsObject() || !result.HasMember("chain_id")) { - return err("couldn't parse result"); - } - - const auto& chain_id = result["chain_id"]; - - if (!chain_id.IsString() || !from_hex(chain_id.GetString(), chain_id.GetStringLength(), m_chainID)) { - return err("invalid chain_id"); - } - - return true; -} - -void MergeMiningClient::merge_mining_get_job(uint64_t height, const hash& prev_id, const std::string& wallet, const hash& aux_hash) -{ - if (m_getJobRunning) { - return; - } - - m_getJobRunning = true; - - char buf[log::Stream::BUF_SIZE + 1]; - // cppcheck-suppress uninitvar - log::Stream s(buf); - - s << "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"merge_mining_get_job\",\"params\":{" - << "\"address\":\"" << wallet << '"' - << ",\"aux_hash\":\"" << aux_hash << '"' - << ",\"height\":" << height - << ",\"prev_id\":\"" << prev_id << '"' - << "}}"; - - JSONRPCRequest::call(m_host, m_port, std::string(buf, s.m_pos), std::string(), m_pool->params().m_socks5Proxy, - [this](const char* data, size_t size, double) { - bool changed = false; - hash chain_id; - - { - WriteLock lock(m_lock); - if (parse_merge_mining_get_job(data, size, changed)) { - chain_id = m_chainID; - } - } - - if (changed && !chain_id.empty()) { - m_pool->update_aux_data(chain_id); - } - }, - [this](const char* data, size_t size, double) { - if (size > 0) { - LOGERR(3, "couldn't get merge mining job from " << m_host << ':' << m_port << ", error " << log::const_buf(data, size)); - } - m_getJobRunning = false; - }, &m_loop); -} - -bool MergeMiningClient::parse_merge_mining_get_job(const char* data, size_t size, bool& changed) -{ - auto err = [](const char* msg) { - LOGWARN(3, "merge_mining_get_job RPC call failed: " << msg); - return false; - }; - - rapidjson::Document doc; - - if (doc.Parse(data, size).HasParseError() || !doc.IsObject()) { - return err("parsing failed"); - } - - if (doc.HasMember("error")) { - return err(doc["error"].IsString() ? doc["error"].GetString() : "an unknown error occurred"); - } - - if (!doc.HasMember("result")) { - return err("\"result\" field not found"); - } - - const auto& result = doc["result"]; - - if (!result.IsObject()) { - return err("couldn't parse result"); - } - - if (!result.HasMember("aux_hash")) { - return true; - } - - const auto& aux_hash = result["aux_hash"]; - - hash h; - if (!aux_hash.IsString() || !from_hex(aux_hash.GetString(), aux_hash.GetStringLength(), h)) { - return err("invalid aux_hash"); - } - - if (h == m_auxHash) { - return true; - } - - std::vector aux_blob; - - if (!result.HasMember("aux_blob") || !result["aux_blob"].IsString() || !from_hex(result["aux_blob"].GetString(), result["aux_blob"].GetStringLength(), aux_blob)) { - return err("invalid aux_blob"); - } - - if (!result.HasMember("aux_diff") || !result["aux_diff"].IsUint64()) { - return err("invalid aux_diff"); - } - - m_auxBlob = std::move(aux_blob); - m_auxHash = h; - m_auxDiff.lo = result["aux_diff"].GetUint64(); - m_auxDiff.hi = 0; - - changed = true; - - return true; -} - -void MergeMiningClient::merge_mining_submit_solution(const std::vector& blob, const std::vector& merkle_proof) +IMergeMiningClient* IMergeMiningClient::create(p2pool* pool, const std::string& host, const std::string& wallet) noexcept { - ReadLock lock(m_lock); - - std::vector buf((m_auxBlob.size() + HASH_SIZE + blob.size()) * 2 + merkle_proof.size() * (HASH_SIZE * 2 + 3) + 256); - log::Stream s(buf.data(), buf.size()); - - s << "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"merge_mining_submit_solution\",\"params\":{" - << "\"aux_blob\":\"" << log::hex_buf(m_auxBlob.data(), m_auxBlob.size()) << '"' - << ",\"aux_hash\":\"" << m_auxHash << '"' - << ",\"blob\":\"" << log::hex_buf(blob.data(), blob.size()) << '"' - << ",\"merkle_proof\":["; - - for (size_t i = 0, n = merkle_proof.size(); i < n; ++i) { - if (i > 0) { - s << ','; - } - s << '"' << merkle_proof[i] << '"'; + try { + return new MergeMiningClientJSON_RPC(pool, host, wallet); } - - s << "]}}"; - - JSONRPCRequest::call(m_host, m_port, std::string(buf.data(), s.m_pos), std::string(), m_pool->params().m_socks5Proxy, - [this](const char* data, size_t size, double) { - parse_merge_mining_submit_solution(data, size); - }, - [this](const char* data, size_t size, double) { - if (size > 0) { - LOGERR(3, "couldn't submit merge mining solution to " << m_host << ':' << m_port << ", error " << log::const_buf(data, size)); - } - }, &m_loop); -} - -bool MergeMiningClient::parse_merge_mining_submit_solution(const char* data, size_t size) -{ - auto err = [this](const char* msg) { - LOGWARN(3, "merge_mining_submit_solution to " << m_host << ':' << m_port << " failed: " << msg); - return false; - }; - - rapidjson::Document doc; - - if (doc.Parse(data, size).HasParseError() || !doc.IsObject()) { - return err("parsing failed"); - } - - if (doc.HasMember("error")) { - return err(doc["error"].IsString() ? doc["error"].GetString() : "an unknown error occurred"); - } - - if (!doc.HasMember("result")) { - return err("\"result\" field not found"); - } - - const auto& result = doc["result"]; - - if (!result.IsObject()) { - return err("couldn't parse result"); - } - - if (!result.HasMember("status") || !result["status"].IsString()) { - return err("invalid status"); + catch (...) { } - - const char* status = result["status"].GetString(); - LOGINFO(0, log::LightGreen() << "merge_mining_submit_solution to " << m_host << ':' << m_port << ": " << status); - - // Get new mining job - on_timer(); - - return true; -} - -void MergeMiningClient::loop(void* data) -{ - LOGINFO(1, "event loop started"); - - MergeMiningClient* client = static_cast(data); - - int err = uv_run(&client->m_loop, UV_RUN_DEFAULT); - if (err) { - LOGWARN(1, "uv_run returned " << err); - } - - err = uv_loop_close(&client->m_loop); - if (err) { - LOGWARN(1, "uv_loop_close returned error " << uv_err_name(err)); - } - - LOGINFO(1, "event loop stopped"); -} - -void MergeMiningClient::on_shutdown() -{ - uv_timer_stop(&m_timer); - uv_close(reinterpret_cast(&m_timer), nullptr); + return nullptr; } } // namespace p2pool diff --git a/src/merge_mining_client.h b/src/merge_mining_client.h index bcd020b..fd0646b 100644 --- a/src/merge_mining_client.h +++ b/src/merge_mining_client.h @@ -17,73 +17,27 @@ #pragma once -#include "uv_util.h" - namespace p2pool { class p2pool; -class MergeMiningClient +class IMergeMiningClient { public: - MergeMiningClient(p2pool* pool, const std::string& host, const std::string& wallet); - ~MergeMiningClient(); - - void merge_mining_submit_solution(const std::vector& blob, const std::vector& merkle_proof); - - FORCEINLINE hash aux_id() const { ReadLock lock(m_lock); return m_chainID; } - FORCEINLINE hash aux_data() const { ReadLock lock(m_lock); return m_auxHash; } - FORCEINLINE difficulty_type aux_diff() const { ReadLock lock(m_lock); return m_auxDiff; } - -private: - static void loop(void* data); - - static void on_timer(uv_timer_t* timer) { reinterpret_cast(timer->data)->on_timer(); } - void on_timer(); - - void merge_mining_get_chain_id(); - bool parse_merge_mining_get_chain_id(const char* data, size_t size); - - void merge_mining_get_job(uint64_t height, const hash& prev_id, const std::string& wallet, const hash& aux_hash); - bool parse_merge_mining_get_job(const char* data, size_t size, bool& changed); - - bool parse_merge_mining_submit_solution(const char* data, size_t size); - - std::string m_host; - uint32_t m_port; - - mutable uv_rwlock_t m_lock; - - std::string m_auxWallet; - std::vector m_auxBlob; - hash m_auxHash; - difficulty_type m_auxDiff; - - hash m_chainID; - double m_ping; - - p2pool* m_pool; - - uv_loop_t m_loop; - uv_thread_t m_loopThread; - - uv_timer_t m_timer; - - bool m_getJobRunning; - - uv_async_t m_shutdownAsync; - - static void on_shutdown(uv_async_t* async) + struct ChainParameters { - MergeMiningClient* client = reinterpret_cast(async->data); - client->on_shutdown(); + hash aux_id; + hash aux_hash; + std::vector aux_blob; + difficulty_type aux_diff; + }; - uv_close(reinterpret_cast(&client->m_shutdownAsync), nullptr); - - delete GetLoopUserData(&client->m_loop, false); - } +public: + static IMergeMiningClient* create(p2pool* pool, const std::string& host, const std::string& wallet) noexcept; + virtual ~IMergeMiningClient() {} - void on_shutdown(); + virtual bool get_params(ChainParameters& out_params) const = 0; + virtual void submit_solution(const std::vector& blob, const std::vector& merkle_proof) = 0; }; } // namespace p2pool diff --git a/src/merge_mining_client_json_rpc.cpp b/src/merge_mining_client_json_rpc.cpp new file mode 100644 index 0000000..a4c5368 --- /dev/null +++ b/src/merge_mining_client_json_rpc.cpp @@ -0,0 +1,396 @@ +/* + * This file is part of the Monero P2Pool + * Copyright (c) 2021-2024 SChernykh + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#include "common.h" +#include "merge_mining_client.h" +#include "merge_mining_client_json_rpc.h" +#include "p2pool.h" +#include "params.h" +#include "json_rpc_request.h" +#include "json_parsers.h" +#include + +LOG_CATEGORY(MergeMiningClientJSON_RPC) + +namespace p2pool { + +MergeMiningClientJSON_RPC::MergeMiningClientJSON_RPC(p2pool* pool, const std::string& host, const std::string& wallet) + : m_host(host) + , m_port(80) + , m_auxWallet(wallet) + , m_ping(0.0) + , m_pool(pool) + , m_loop{} + , m_loopThread{} + , m_timer{} + , m_getJobRunning(false) + , m_shutdownAsync{} +{ + const size_t k = host.find_last_of(':'); + if (k != std::string::npos) { + m_host = host.substr(0, k); + m_port = std::stoul(host.substr(k + 1), nullptr, 10); + } + + if (m_host.empty() || (m_port == 0) || (m_port >= 65536)) { + LOGERR(1, "Invalid host " << host); + throw std::exception(); + } + + int err = uv_loop_init(&m_loop); + if (err) { + LOGERR(1, "failed to create event loop, error " << uv_err_name(err)); + throw std::exception(); + } + + // Init loop user data before running it + GetLoopUserData(&m_loop); + + err = uv_async_init(&m_loop, &m_shutdownAsync, on_shutdown); + if (err) { + LOGERR(1, "uv_async_init failed, error " << uv_err_name(err)); + uv_loop_close(&m_loop); + throw std::exception(); + } + m_shutdownAsync.data = this; + + err = uv_timer_init(&m_loop, &m_timer); + if (err) { + LOGERR(1, "failed to create timer, error " << uv_err_name(err)); + uv_loop_close(&m_loop); + throw std::exception(); + } + m_timer.data = this; + + uv_rwlock_init_checked(&m_lock); + + err = uv_thread_create(&m_loopThread, loop, this); + if (err) { + LOGERR(1, "failed to start event loop thread, error " << uv_err_name(err)); + uv_loop_close(&m_loop); + throw std::exception(); + } + + merge_mining_get_chain_id(); +} + +MergeMiningClientJSON_RPC::~MergeMiningClientJSON_RPC() +{ + uv_async_send(&m_shutdownAsync); + uv_thread_join(&m_loopThread); + + uv_rwlock_destroy(&m_lock); + + LOGINFO(1, "stopped"); +} + +void MergeMiningClientJSON_RPC::on_timer() +{ + MinerData data = m_pool->miner_data(); + merge_mining_get_job(data.height, data.prev_id, m_auxWallet); +} + +void MergeMiningClientJSON_RPC::merge_mining_get_chain_id() +{ + const std::string req = "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"merge_mining_get_chain_id\"}"; + + JSONRPCRequest::call(m_host, m_port, req, std::string(), m_pool->params().m_socks5Proxy, + [this](const char* data, size_t size, double ping) { + WriteLock lock(m_lock); + + if (parse_merge_mining_get_chain_id(data, size)) { + if (ping > 0.0) { + m_ping = ping; + } + + LOGINFO(1, m_host << ':' << m_port << " uses chain_id " << log::LightCyan() << m_chainParams.aux_id); + LOGINFO(1, m_host << ':' << m_port << " ping is " << m_ping << " ms"); + + // Chain ID received successfully, we can start polling for new mining jobs now + const int err = uv_timer_start(&m_timer, on_timer, 0, 500); + if (err) { + LOGERR(1, "failed to start timer, error " << uv_err_name(err)); + } + } + }, + [this](const char* data, size_t size, double) { + if (size > 0) { + LOGERR(1, "couldn't get merge mining id from " << m_host << ':' << m_port << ", error " << log::const_buf(data, size)); + } + }, &m_loop); +} + +bool MergeMiningClientJSON_RPC::parse_merge_mining_get_chain_id(const char* data, size_t size) +{ + auto err = [this](const char* msg) { + LOGWARN(3, "merge_mining_get_chain_id RPC call failed: " << msg << ". Trying again in 1 second."); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + merge_mining_get_chain_id(); + return false; + }; + + rapidjson::Document doc; + + if (doc.Parse(data, size).HasParseError() || !doc.IsObject()) { + return err("parsing failed"); + } + + if (doc.HasMember("error")) { + return err(doc["error"].IsString() ? doc["error"].GetString() : "an unknown error occurred"); + } + + if (!doc.HasMember("result")) { + return err("\"result\" field not found"); + } + + const auto& result = doc["result"]; + + if (!result.IsObject() || !result.HasMember("chain_id")) { + return err("couldn't parse result"); + } + + const auto& chain_id = result["chain_id"]; + + if (!chain_id.IsString() || !from_hex(chain_id.GetString(), chain_id.GetStringLength(), m_chainParams.aux_id)) { + return err("invalid chain_id"); + } + + return true; +} + +void MergeMiningClientJSON_RPC::merge_mining_get_job(uint64_t height, const hash& prev_id, const std::string& wallet) +{ + if (m_getJobRunning) { + return; + } + + m_getJobRunning = true; + + char buf[log::Stream::BUF_SIZE + 1]; + // cppcheck-suppress uninitvar + log::Stream s(buf); + + hash aux_hash; + { + ReadLock lock(m_lock); + aux_hash = m_chainParams.aux_hash; + } + + s << "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"merge_mining_get_job\",\"params\":{" + << "\"address\":\"" << wallet << '"' + << ",\"aux_hash\":\"" << aux_hash << '"' + << ",\"height\":" << height + << ",\"prev_id\":\"" << prev_id << '"' + << "}}"; + + JSONRPCRequest::call(m_host, m_port, std::string(buf, s.m_pos), std::string(), m_pool->params().m_socks5Proxy, + [this](const char* data, size_t size, double) { + bool changed = false; + hash chain_id; + + { + WriteLock lock(m_lock); + if (parse_merge_mining_get_job(data, size, changed)) { + chain_id = m_chainParams.aux_id; + } + } + + if (changed && !chain_id.empty()) { + m_pool->update_aux_data(chain_id); + } + }, + [this](const char* data, size_t size, double) { + if (size > 0) { + LOGERR(3, "couldn't get merge mining job from " << m_host << ':' << m_port << ", error " << log::const_buf(data, size)); + } + m_getJobRunning = false; + }, &m_loop); +} + +bool MergeMiningClientJSON_RPC::parse_merge_mining_get_job(const char* data, size_t size, bool& changed) +{ + auto err = [](const char* msg) { + LOGWARN(3, "merge_mining_get_job RPC call failed: " << msg); + return false; + }; + + rapidjson::Document doc; + + if (doc.Parse(data, size).HasParseError() || !doc.IsObject()) { + return err("parsing failed"); + } + + if (doc.HasMember("error")) { + return err(doc["error"].IsString() ? doc["error"].GetString() : "an unknown error occurred"); + } + + if (!doc.HasMember("result")) { + return err("\"result\" field not found"); + } + + const auto& result = doc["result"]; + + if (!result.IsObject()) { + return err("couldn't parse result"); + } + + if (!result.HasMember("aux_hash")) { + return true; + } + + const auto& aux_hash = result["aux_hash"]; + + hash h; + if (!aux_hash.IsString() || !from_hex(aux_hash.GetString(), aux_hash.GetStringLength(), h)) { + return err("invalid aux_hash"); + } + + if (h == m_chainParams.aux_hash) { + return true; + } + + std::vector aux_blob; + + if (!result.HasMember("aux_blob") || !result["aux_blob"].IsString() || !from_hex(result["aux_blob"].GetString(), result["aux_blob"].GetStringLength(), aux_blob)) { + return err("invalid aux_blob"); + } + + if (!result.HasMember("aux_diff") || !result["aux_diff"].IsUint64()) { + return err("invalid aux_diff"); + } + + m_chainParams.aux_blob = std::move(aux_blob); + m_chainParams.aux_hash = h; + m_chainParams.aux_diff.lo = result["aux_diff"].GetUint64(); + m_chainParams.aux_diff.hi = 0; + + changed = true; + + return true; +} + +void MergeMiningClientJSON_RPC::submit_solution(const std::vector& blob, const std::vector& merkle_proof) +{ + ReadLock lock(m_lock); + + std::vector buf((m_chainParams.aux_blob.size() + HASH_SIZE + blob.size()) * 2 + merkle_proof.size() * (HASH_SIZE * 2 + 3) + 256); + log::Stream s(buf.data(), buf.size()); + + s << "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"merge_mining_submit_solution\",\"params\":{" + << "\"aux_blob\":\"" << log::hex_buf(m_chainParams.aux_blob.data(), m_chainParams.aux_blob.size()) << '"' + << ",\"aux_hash\":\"" << m_chainParams.aux_hash << '"' + << ",\"blob\":\"" << log::hex_buf(blob.data(), blob.size()) << '"' + << ",\"merkle_proof\":["; + + for (size_t i = 0, n = merkle_proof.size(); i < n; ++i) { + if (i > 0) { + s << ','; + } + s << '"' << merkle_proof[i] << '"'; + } + + s << "]}}"; + + JSONRPCRequest::call(m_host, m_port, std::string(buf.data(), s.m_pos), std::string(), m_pool->params().m_socks5Proxy, + [this](const char* data, size_t size, double) { + parse_merge_mining_submit_solution(data, size); + }, + [this](const char* data, size_t size, double) { + if (size > 0) { + LOGERR(3, "couldn't submit merge mining solution to " << m_host << ':' << m_port << ", error " << log::const_buf(data, size)); + } + }, &m_loop); +} + +bool MergeMiningClientJSON_RPC::get_params(ChainParameters& out_params) const +{ + ReadLock lock(m_lock); + + if (m_chainParams.aux_id.empty() || m_chainParams.aux_diff.empty()) { + return false; + } + + out_params = m_chainParams; + return true; +} + +bool MergeMiningClientJSON_RPC::parse_merge_mining_submit_solution(const char* data, size_t size) +{ + auto err = [this](const char* msg) { + LOGWARN(3, "merge_mining_submit_solution to " << m_host << ':' << m_port << " failed: " << msg); + return false; + }; + + rapidjson::Document doc; + + if (doc.Parse(data, size).HasParseError() || !doc.IsObject()) { + return err("parsing failed"); + } + + if (doc.HasMember("error")) { + return err(doc["error"].IsString() ? doc["error"].GetString() : "an unknown error occurred"); + } + + if (!doc.HasMember("result")) { + return err("\"result\" field not found"); + } + + const auto& result = doc["result"]; + + if (!result.IsObject()) { + return err("couldn't parse result"); + } + + if (!result.HasMember("status") || !result["status"].IsString()) { + return err("invalid status"); + } + + const char* status = result["status"].GetString(); + LOGINFO(0, log::LightGreen() << "merge_mining_submit_solution to " << m_host << ':' << m_port << ": " << status); + + // Get new mining job + on_timer(); + + return true; +} + +void MergeMiningClientJSON_RPC::loop(void* data) +{ + LOGINFO(1, "event loop started"); + + MergeMiningClientJSON_RPC* client = static_cast(data); + + int err = uv_run(&client->m_loop, UV_RUN_DEFAULT); + if (err) { + LOGWARN(1, "uv_run returned " << err); + } + + err = uv_loop_close(&client->m_loop); + if (err) { + LOGWARN(1, "uv_loop_close returned error " << uv_err_name(err)); + } + + LOGINFO(1, "event loop stopped"); +} + +void MergeMiningClientJSON_RPC::on_shutdown() +{ + uv_timer_stop(&m_timer); + uv_close(reinterpret_cast(&m_timer), nullptr); +} + +} // namespace p2pool diff --git a/src/merge_mining_client_json_rpc.h b/src/merge_mining_client_json_rpc.h new file mode 100644 index 0000000..206759f --- /dev/null +++ b/src/merge_mining_client_json_rpc.h @@ -0,0 +1,83 @@ +/* + * This file is part of the Monero P2Pool + * Copyright (c) 2021-2024 SChernykh + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#pragma once + +#include "uv_util.h" + +namespace p2pool { + +class p2pool; + +class MergeMiningClientJSON_RPC : public IMergeMiningClient +{ +public: + MergeMiningClientJSON_RPC(p2pool* pool, const std::string& host, const std::string& wallet); + ~MergeMiningClientJSON_RPC(); + + bool get_params(ChainParameters& out_params) const override; + void submit_solution(const std::vector& blob, const std::vector& merkle_proof) override; + +private: + static void loop(void* data); + + static void on_timer(uv_timer_t* timer) { reinterpret_cast(timer->data)->on_timer(); } + void on_timer(); + + void merge_mining_get_chain_id(); + bool parse_merge_mining_get_chain_id(const char* data, size_t size); + + void merge_mining_get_job(uint64_t height, const hash& prev_id, const std::string& wallet); + bool parse_merge_mining_get_job(const char* data, size_t size, bool& changed); + + bool parse_merge_mining_submit_solution(const char* data, size_t size); + + std::string m_host; + uint32_t m_port; + + mutable uv_rwlock_t m_lock; + ChainParameters m_chainParams; + + std::string m_auxWallet; + + double m_ping; + + p2pool* m_pool; + + uv_loop_t m_loop; + uv_thread_t m_loopThread; + + uv_timer_t m_timer; + + bool m_getJobRunning; + + uv_async_t m_shutdownAsync; + + static void on_shutdown(uv_async_t* async) + { + MergeMiningClientJSON_RPC* client = reinterpret_cast(async->data); + client->on_shutdown(); + + uv_close(reinterpret_cast(&client->m_shutdownAsync), nullptr); + + delete GetLoopUserData(&client->m_loop, false); + } + + void on_shutdown(); +}; + +} // namespace p2pool diff --git a/src/p2pool.cpp b/src/p2pool.cpp index 0690f00..23aec70 100644 --- a/src/p2pool.cpp +++ b/src/p2pool.cpp @@ -199,7 +199,7 @@ p2pool::~p2pool() { WriteLock lock(m_mergeMiningClientsLock); - for (const MergeMiningClient* c : m_mergeMiningClients) { + for (const IMergeMiningClient* c : m_mergeMiningClients) { delete c; } m_mergeMiningClients.clear(); @@ -527,9 +527,13 @@ void p2pool::update_aux_data(const hash& chain_id) aux_id.reserve(m_mergeMiningClients.size() + 1); - for (const MergeMiningClient* c : m_mergeMiningClients) { - data.aux_chains.emplace_back(c->aux_id(), c->aux_data(), c->aux_diff()); - aux_id.emplace_back(c->aux_id()); + IMergeMiningClient::ChainParameters params; + + for (const IMergeMiningClient* c : m_mergeMiningClients) { + if (c->get_params(params)) { + data.aux_chains.emplace_back(params.aux_id, params.aux_hash, params.aux_diff); + aux_id.emplace_back(params.aux_id); + } } aux_id.emplace_back(m_sideChain->consensus_hash()); } @@ -646,23 +650,28 @@ void p2pool::submit_aux_block(const hash& chain_id, uint32_t template_id, uint32 ReadLock lock(m_mergeMiningClientsLock); - for (MergeMiningClient* c : m_mergeMiningClients) { - if (chain_id == c->aux_id()) { + IMergeMiningClient::ChainParameters params; + + for (IMergeMiningClient* c : m_mergeMiningClients) { + if (!c->get_params(params)) { + continue; + } + + if (chain_id == params.aux_id) { std::vector proof; - const hash aux_hash = c->aux_data(); - if (m_blockTemplate->get_aux_proof(template_id, extra_nonce, aux_hash, proof)) { + if (m_blockTemplate->get_aux_proof(template_id, extra_nonce, params.aux_hash, proof)) { if (pool_block_debug()) { const MinerData data = miner_data(); const uint32_t n_aux_chains = static_cast(data.aux_chains.size() + 1); - const uint32_t index = get_aux_slot(c->aux_id(), data.aux_nonce, n_aux_chains); + const uint32_t index = get_aux_slot(params.aux_id, data.aux_nonce, n_aux_chains); - if (!verify_merkle_proof(aux_hash, proof, index, n_aux_chains, merge_mining_root)) { + if (!verify_merkle_proof(params.aux_hash, proof, index, n_aux_chains, merge_mining_root)) { LOGERR(0, "submit_aux_block: verify_merkle_proof failed for chain_id " << chain_id); } } - c->merge_mining_submit_solution(blob, proof); + c->submit_solution(blob, proof); } else { LOGWARN(3, "submit_aux_block: failed to get merkle proof for chain_id " << chain_id); @@ -958,7 +967,10 @@ void p2pool::download_block_headers(uint64_t current_height) m_mergeMiningClients.clear(); for (const auto& h : m_params->m_mergeMiningHosts) { - m_mergeMiningClients.push_back(new MergeMiningClient(this, h.m_host, h.m_wallet)); + IMergeMiningClient* c = IMergeMiningClient::create(this, h.m_host, h.m_wallet); + if (c) { + m_mergeMiningClients.push_back(c); + } } } diff --git a/src/p2pool.h b/src/p2pool.h index f6cd0a7..4bee95b 100644 --- a/src/p2pool.h +++ b/src/p2pool.h @@ -33,7 +33,7 @@ class Miner; class ConsoleCommands; class p2pool_api; class ZMQReader; -class MergeMiningClient; +class IMergeMiningClient; struct PoolBlock; class p2pool : public MinerCallbackHandler, public nocopy_nomove @@ -233,7 +233,7 @@ private: ZMQReader* m_ZMQReader = nullptr; mutable uv_rwlock_t m_mergeMiningClientsLock; - std::vector m_mergeMiningClients; + std::vector m_mergeMiningClients; mutable uv_rwlock_t m_auxIdLock; std::vector m_auxId; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 6cc9880..15cb60a 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -54,6 +54,7 @@ set(SOURCES ../src/memory_leak_debug.cpp ../src/mempool.cpp ../src/merge_mining_client.cpp + ../src/merge_mining_client_json_rpc.cpp ../src/merkle.cpp ../src/miner.cpp ../src/p2p_server.cpp