You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
560 lines
26 KiB
560 lines
26 KiB
"use strict";
|
|
const debug = require("debug")("worker");
|
|
const async = require("async");
|
|
const sprintf = require("sprintf-js").sprintf;
|
|
|
|
let threadName = "Worker Server ";
|
|
let cycleCount = 0;
|
|
let lastBlockHash = null;
|
|
|
|
function updateShareStats() {
|
|
// This is an omni-worker to deal with all things share-stats related
|
|
// Time based averages are worked out on ring buffers.
|
|
// Buffer lengths? You guessed it, configured in SQL.
|
|
// Stats timeouts are 30 seconds, so everything for buffers should be there.
|
|
let currentTime = Date.now();
|
|
let activeAddresses = [];
|
|
async.waterfall([
|
|
function (callback) {
|
|
global.coinFuncs.getLastBlockHeader(function (err, body) {
|
|
if (err !== null){
|
|
return callback(err, "Invalid block header");
|
|
}
|
|
callback(null, body.height + 1);
|
|
});
|
|
},
|
|
function (height, callback) {
|
|
let locTime = Date.now() - 600000;
|
|
let identifierTime = Date.now() - 1800000;
|
|
let localStats = {pplns: 0, pps: 0, solo: 0, prop: 0, global: 0, miners: {}};
|
|
let localMinerCount = {pplns: 0, pps: 0, solo: 0, prop: 0, global: 0};
|
|
let localTimes = {
|
|
pplns: locTime, pps: locTime, solo: locTime, prop: locTime,
|
|
global: locTime, miners: {}
|
|
};
|
|
let minerList = [];
|
|
let identifiers = {};
|
|
let loopBreakout = 0;
|
|
async.doUntil(function (callback_until) {
|
|
let oldestTime = Date.now();
|
|
let loopCount = 0;
|
|
let txn = global.database.env.beginTxn({readOnly: true});
|
|
let cursor = new global.database.lmdb.Cursor(txn, global.database.shareDB);
|
|
for (let found = (cursor.goToRange(height) === height); found; found = cursor.goToNextDup()) {
|
|
cursor.getCurrentBinary(function (key, share) { // jshint ignore:line
|
|
try {
|
|
share = global.protos.Share.decode(share);
|
|
} catch (e) {
|
|
console.error(share);
|
|
return;
|
|
}
|
|
if (share.timestamp < oldestTime) {
|
|
oldestTime = share.timestamp;
|
|
}
|
|
if (share.timestamp <= identifierTime) {
|
|
return;
|
|
}
|
|
let minerID = share.paymentAddress;
|
|
if (typeof(share.paymentID) !== 'undefined' && share.paymentID.length > 10) {
|
|
minerID = minerID + '.' + share.paymentID;
|
|
}
|
|
if (minerID in identifiers && identifiers[minerID].indexOf(share.identifier) >= 0) {
|
|
loopCount += 1;
|
|
} else if (minerID in identifiers) {
|
|
identifiers[minerID].push(share.identifier);
|
|
} else {
|
|
identifiers[minerID] = [share.identifier];
|
|
}
|
|
if (share.timestamp <= locTime) {
|
|
return;
|
|
}
|
|
let minerIDWithIdentifier = minerID + "_" + share.identifier;
|
|
localStats.global += share.shares;
|
|
if (localTimes.global <= share.timestamp) {
|
|
localTimes.global = share.timestamp;
|
|
}
|
|
let minerType;
|
|
switch (share.poolType) {
|
|
case global.protos.POOLTYPE.PPLNS:
|
|
minerType = 'pplns';
|
|
localStats.pplns += share.shares;
|
|
if (localTimes.pplns <= share.timestamp) {
|
|
localTimes.pplns = share.timestamp;
|
|
}
|
|
break;
|
|
case global.protos.POOLTYPE.PPS:
|
|
localStats.pps += share.shares;
|
|
minerType = 'pps';
|
|
if (localTimes.pps <= share.timestamp) {
|
|
localTimes.pps = share.timestamp;
|
|
}
|
|
break;
|
|
case global.protos.POOLTYPE.SOLO:
|
|
localStats.solo += share.shares;
|
|
minerType = 'solo';
|
|
if (localTimes.solo <= share.timestamp) {
|
|
localTimes.solo = share.timestamp;
|
|
}
|
|
break;
|
|
}
|
|
if (minerList.indexOf(minerID) >= 0) {
|
|
localStats.miners[minerID] += share.shares;
|
|
if (localTimes.miners[minerID] < share.timestamp) {
|
|
localTimes.miners[minerID] = share.timestamp;
|
|
}
|
|
} else {
|
|
localMinerCount[minerType] += 1;
|
|
localMinerCount.global += 1;
|
|
localStats.miners[minerID] = share.shares;
|
|
localTimes.miners[minerID] = share.timestamp;
|
|
minerList.push(minerID);
|
|
}
|
|
if (minerList.indexOf(minerIDWithIdentifier) >= 0) {
|
|
localStats.miners[minerIDWithIdentifier] += share.shares;
|
|
if (localTimes.miners[minerIDWithIdentifier] < share.timestamp) {
|
|
localTimes.miners[minerIDWithIdentifier] = share.timestamp;
|
|
}
|
|
} else {
|
|
localStats.miners[minerIDWithIdentifier] = share.shares;
|
|
localTimes.miners[minerIDWithIdentifier] = share.timestamp;
|
|
minerList.push(minerIDWithIdentifier);
|
|
}
|
|
});
|
|
}
|
|
cursor.close();
|
|
txn.abort();
|
|
return callback_until(null, oldestTime);
|
|
}, function (oldestTime) {
|
|
height -= 1;
|
|
loopBreakout += 1;
|
|
if (loopBreakout > 60 || height < 0) {
|
|
return true;
|
|
}
|
|
return oldestTime <= identifierTime;
|
|
}, function (err) {
|
|
// todo: Need to finish parsing the cached data into caches for caching purproses.
|
|
let globalMinerList = global.database.getCache('minerList');
|
|
if (globalMinerList === false) {
|
|
globalMinerList = [];
|
|
}
|
|
let cache_updates = {};
|
|
// pplns: 0, pps: 0, solo: 0, prop: 0, global: 0
|
|
['pplns', 'pps', 'solo', 'prop', 'global'].forEach(function (key) {
|
|
let cachedData = global.database.getCache(key + "_stats");
|
|
if (cachedData !== false) {
|
|
cachedData.hash = Math.floor(localStats[key] / 600);
|
|
cachedData.lastHash = localTimes[key];
|
|
cachedData.minerCount = localMinerCount[key];
|
|
if (!cachedData.hasOwnProperty("hashHistory")) {
|
|
cachedData.hashHistory = [];
|
|
cachedData.minerHistory = [];
|
|
}
|
|
if (cycleCount === 0) {
|
|
cachedData.hashHistory.unshift({ts: currentTime, hs: cachedData.hash});
|
|
if (cachedData.hashHistory.length > global.config.general.statsBufferLength) {
|
|
while (cachedData.hashHistory.length > global.config.general.statsBufferLength) {
|
|
cachedData.hashHistory.pop();
|
|
}
|
|
}
|
|
cachedData.minerHistory.unshift({ts: currentTime, cn: cachedData.minerCount});
|
|
if (cachedData.minerHistory.length > global.config.general.statsBufferLength) {
|
|
while (cachedData.minerHistory.length > global.config.general.statsBufferLength) {
|
|
cachedData.minerHistory.pop();
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
cachedData = {
|
|
hash: Math.floor(localStats[key] / 600),
|
|
totalHashes: 0,
|
|
lastHash: localTimes[key],
|
|
minerCount: localMinerCount[key],
|
|
hashHistory: [{ts: currentTime, hs: cachedData.hash}],
|
|
minerHistory: [{ts: currentTime, cn: cachedData.hash}]
|
|
};
|
|
}
|
|
cache_updates[key + "_stats"] = cachedData;
|
|
});
|
|
minerList.forEach(function (miner) {
|
|
if (globalMinerList.indexOf(miner) === -1) {
|
|
globalMinerList.push(miner);
|
|
}
|
|
if (miner.indexOf('_') === -1){
|
|
activeAddresses.push(miner);
|
|
}
|
|
let cachedData = global.database.getCache(miner);
|
|
if (cachedData !== false) {
|
|
cachedData.hash = Math.floor(localStats.miners[miner] / 600);
|
|
cachedData.lastHash = localTimes.miners[miner];
|
|
if (!cachedData.hasOwnProperty("hashHistory")) {
|
|
cachedData.hashHistory = [];
|
|
}
|
|
if (cycleCount === 0){
|
|
cachedData.hashHistory.unshift({ts: currentTime, hs: cachedData.hash});
|
|
if (cachedData.hashHistory.length > global.config.general.statsBufferLength) {
|
|
while (cachedData.hashHistory.length > global.config.general.statsBufferLength) {
|
|
cachedData.hashHistory.pop();
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
cachedData = {
|
|
hash: Math.floor(localStats.miners[miner] / 600),
|
|
totalHashes: 0,
|
|
lastHash: localTimes.miners[miner],
|
|
hashHistory: [{ts: currentTime, hs: cachedData.hash}],
|
|
goodShares: 0,
|
|
badShares: 0
|
|
};
|
|
}
|
|
cache_updates[miner] = cachedData;
|
|
});
|
|
globalMinerList.forEach(function (miner) {
|
|
if (minerList.indexOf(miner) === -1) {
|
|
let minerStats = global.database.getCache(miner);
|
|
if (minerStats.hash !== 0) {
|
|
console.log("Removing: " + miner + " as an active miner from the cache.");
|
|
if (miner.indexOf('_') > -1) {
|
|
// This is a worker case.
|
|
let address_parts = miner.split('_');
|
|
let address = address_parts[0];
|
|
let worker = address_parts[1];
|
|
global.mysql.query("SELECT email FROM users WHERE username = ? AND enable_email IS true limit 1", [address]).then(function (rows) {
|
|
if (rows.length === 0) {
|
|
return;
|
|
}
|
|
// toAddress, subject, body
|
|
let emailData = {
|
|
worker: worker,
|
|
timestamp: global.support.formatDate(Date.now()),
|
|
poolEmailSig: global.config.general.emailSig
|
|
};
|
|
global.support.sendEmail(rows[0].email,
|
|
sprintf(global.config.email.workerNotHashingSubject, emailData),
|
|
sprintf(global.config.email.workerNotHashingBody, emailData));
|
|
});
|
|
}
|
|
minerStats.hash = 0;
|
|
cache_updates[miner] = minerStats;
|
|
}
|
|
}
|
|
});
|
|
Object.keys(identifiers).forEach(function (key) {
|
|
cache_updates[key + '_identifiers'] = identifiers[key];
|
|
});
|
|
cache_updates.minerList = globalMinerList;
|
|
global.database.bulkSetCache(cache_updates);
|
|
callback(null);
|
|
});
|
|
}
|
|
], function (err, result) {
|
|
cycleCount += 1;
|
|
if (cycleCount === 6){
|
|
cycleCount = 0;
|
|
}
|
|
});
|
|
setTimeout(updateShareStats, 10000);
|
|
}
|
|
|
|
function updatePoolStats(poolType) {
|
|
let cache;
|
|
if (typeof(poolType) !== 'undefined') {
|
|
cache = global.database.getCache(poolType + "_stats");
|
|
} else {
|
|
cache = global.database.getCache("global_stats");
|
|
}
|
|
async.series([
|
|
function (callback) {
|
|
debug(threadName + "Checking Influx for last 10min avg for pool stats");
|
|
return callback(null, cache.hash || 0);
|
|
},
|
|
function (callback) {
|
|
debug(threadName + "Checking Influx for last 10min avg for miner count for pool stats");
|
|
return callback(null, cache.minerCount || 0);
|
|
},
|
|
function (callback) {
|
|
debug(threadName + "Checking Influx for last 10min avg for miner count for pool stats");
|
|
return callback(null, cache.totalHashes || 0);
|
|
},
|
|
function (callback) {
|
|
debug(threadName + "Checking MySQL for last block find time for pool stats");
|
|
let cacheData = global.database.getBlockList(poolType);
|
|
if (cacheData.length === 0) {
|
|
return callback(null, 0);
|
|
}
|
|
return callback(null, Math.floor(cacheData[0].ts / 1000));
|
|
},
|
|
function (callback) {
|
|
debug(threadName + "Checking MySQL for last block find time for pool stats");
|
|
let cacheData = global.database.getBlockList(poolType);
|
|
if (cacheData.length === 0) {
|
|
return callback(null, 0);
|
|
}
|
|
return callback(null, cacheData[0].height);
|
|
},
|
|
function (callback) {
|
|
debug(threadName + "Checking MySQL for block count for pool stats");
|
|
return callback(null, global.database.getBlockList(poolType).length);
|
|
},
|
|
function (callback) {
|
|
debug(threadName + "Checking MySQL for total miners paid");
|
|
if (typeof(poolType) !== 'undefined') {
|
|
global.mysql.query("SELECT payment_address, payment_id FROM payments WHERE pool_type = ? group by payment_address, payment_id", [poolType]).then(function (rows) {
|
|
return callback(null, rows.length);
|
|
});
|
|
} else {
|
|
global.mysql.query("SELECT payment_address, payment_id FROM payments group by payment_address, payment_id").then(function (rows) {
|
|
return callback(null, rows.length);
|
|
});
|
|
}
|
|
},
|
|
function (callback) {
|
|
debug(threadName + "Checking MySQL for total transactions count");
|
|
if (typeof(poolType) !== 'undefined') {
|
|
global.mysql.query("SELECT distinct(transaction_id) from payments WHERE pool_type = ?", [poolType]).then(function (rows) {
|
|
return callback(null, rows.length);
|
|
});
|
|
} else {
|
|
global.mysql.query("SELECT count(id) as txn_count FROM transactions").then(function (rows) {
|
|
if (typeof(rows[0]) !== 'undefined') {
|
|
return callback(null, rows[0].txn_count);
|
|
} else {
|
|
return callback(null, 0);
|
|
}
|
|
});
|
|
}
|
|
},
|
|
function (callback) {
|
|
debug(threadName + "Checking Influx for last 10min avg for miner count for pool stats");
|
|
return callback(null, cache.roundHashes || 0);
|
|
}
|
|
], function (err, result) {
|
|
if (typeof(poolType) === 'undefined') {
|
|
poolType = 'global';
|
|
}
|
|
global.database.setCache('pool_stats_' + poolType, {
|
|
hashRate: result[0],
|
|
miners: result[1],
|
|
totalHashes: result[2],
|
|
lastBlockFoundTime: result[3] || 0,
|
|
lastBlockFound: result[4] || 0,
|
|
totalBlocksFound: result[5] || 0,
|
|
totalMinersPaid: result[6] || 0,
|
|
totalPayments: result[7] || 0,
|
|
roundHashes: result[8] || 0
|
|
});
|
|
});
|
|
}
|
|
|
|
function updatePoolPorts(poolServers) {
|
|
debug(threadName + "Updating pool ports");
|
|
let local_cache = {global: []};
|
|
let portCount = 0;
|
|
global.mysql.query("select * from ports where hidden = 0 and lastSeen >= NOW() - INTERVAL 10 MINUTE").then(function (rows) {
|
|
rows.forEach(function (row) {
|
|
portCount += 1;
|
|
if (!local_cache.hasOwnProperty(row.port_type)) {
|
|
local_cache[row.port_type] = [];
|
|
}
|
|
local_cache[row.port_type].push({
|
|
host: poolServers[row.pool_id],
|
|
port: row.network_port,
|
|
difficulty: row.starting_diff,
|
|
description: row.description,
|
|
miners: row.miners
|
|
});
|
|
if (portCount === rows.length) {
|
|
let local_counts = {};
|
|
let port_diff = {};
|
|
let port_miners = {};
|
|
let pool_type_count = 0;
|
|
let localPortInfo = {};
|
|
for (let pool_type in local_cache) { // jshint ignore:line
|
|
pool_type_count += 1;
|
|
local_cache[pool_type].forEach(function (portData) { // jshint ignore:line
|
|
if (!local_counts.hasOwnProperty(portData.port)) {
|
|
local_counts[portData.port] = 0;
|
|
}
|
|
if (!port_diff.hasOwnProperty(portData.port)) {
|
|
port_diff[portData.port] = portData.difficulty;
|
|
}
|
|
if (!port_miners.hasOwnProperty(portData.port)) {
|
|
port_miners[portData.port] = 0;
|
|
}
|
|
if (port_diff[portData.port] === portData.difficulty) {
|
|
local_counts[portData.port] += 1;
|
|
port_miners[portData.port] += portData.miners;
|
|
}
|
|
localPortInfo[portData.port] = portData.description;
|
|
if (local_counts[portData.port] === Object.keys(poolServers).length) {
|
|
local_cache.global.push({
|
|
host: {
|
|
blockID: local_cache[pool_type][0].host.blockID,
|
|
blockIDTime: local_cache[pool_type][0].host.blockIDTime,
|
|
hostname: global.config.pool.geoDNS,
|
|
},
|
|
port: portData.port,
|
|
pool_type: pool_type,
|
|
difficulty: portData.difficulty,
|
|
miners: port_miners[portData.port],
|
|
description: localPortInfo[portData.port]
|
|
});
|
|
}
|
|
});
|
|
if (pool_type_count === Object.keys(local_cache).length) {
|
|
debug(threadName + "Sending the following to the workers: " + JSON.stringify(local_cache));
|
|
global.database.setCache('poolPorts', local_cache);
|
|
}
|
|
}
|
|
}
|
|
});
|
|
});
|
|
}
|
|
|
|
function updatePoolInformation() {
|
|
let local_cache = {};
|
|
debug(threadName + "Updating pool information");
|
|
global.mysql.query("select * from pools where last_checkin >= NOW() - INTERVAL 10 MINUTE").then(function (rows) {
|
|
rows.forEach(function (row) {
|
|
local_cache[row.id] = {
|
|
ip: row.ip,
|
|
blockID: row.blockID,
|
|
blockIDTime: global.support.formatDateFromSQL(row.blockIDTime),
|
|
hostname: row.hostname
|
|
};
|
|
if (Object.keys(local_cache).length === rows.length) {
|
|
global.database.setCache('poolServers', local_cache);
|
|
updatePoolPorts(local_cache);
|
|
}
|
|
});
|
|
});
|
|
}
|
|
|
|
function updateBlockHeader() {
|
|
// Todo: Implement within the coins/<coin>.js file.
|
|
global.support.rpcDaemon('getlastblockheader', [], function (body) {
|
|
if (typeof body.error !== 'undefined'){
|
|
return console.error(`Issue getting last block header: ${JSON.stringify(body)}`);
|
|
}
|
|
if (body.result && body.result.block_header.hash !== lastBlockHash) {
|
|
lastBlockHash = body.result.block_header.hash;
|
|
global.database.setCache('networkBlockInfo', {
|
|
difficulty: body.result.block_header.difficulty,
|
|
hash: body.result.block_header.hash,
|
|
height: body.result.block_header.height,
|
|
value: body.result.block_header.reward,
|
|
ts: body.result.block_header.timestamp
|
|
});
|
|
} else if (body.result.block_header.hash === lastBlockHash) {
|
|
console.log("Block headers identical to historical header. Ignoring");
|
|
} else {
|
|
console.error("GetLastBlockHeader Error during block header update");
|
|
}
|
|
});
|
|
}
|
|
|
|
function updateWalletStats() {
|
|
async.waterfall([
|
|
function (callback) {
|
|
// Todo: Implement within the coins/<coin>.js file.
|
|
global.support.rpcWallet('getbalance', [], function (body) {
|
|
if (body.result) {
|
|
return callback(null, {
|
|
balance: body.result.balance,
|
|
unlocked: body.result.unlocked_balance,
|
|
ts: Date.now()
|
|
});
|
|
} else {
|
|
return callback(true, "Unable to process balance");
|
|
}
|
|
});
|
|
},
|
|
function (state, callback) {
|
|
// Todo: Implement within the coins/<coin>.js file.
|
|
global.support.rpcWallet('getheight', [], function (body) {
|
|
if (body.result) {
|
|
state.height = body.result.height;
|
|
return callback(null, state);
|
|
} else if (typeof body.error !== 'undefined' && body.error.message === 'Method not found') {
|
|
state.height = 0;
|
|
return callback(null, state);
|
|
} else {
|
|
return callback(true, "Unable to get current wallet height");
|
|
}
|
|
});
|
|
}
|
|
], function (err, results) {
|
|
if (err) {
|
|
return console.error("Unable to get wallet stats: " + results);
|
|
}
|
|
global.database.setCache('walletStateInfo', results);
|
|
let history = global.database.getCache('walletHistory');
|
|
if (history === false) {
|
|
history = [];
|
|
}
|
|
history.unshift(results);
|
|
history = history.sort(global.support.tsCompare);
|
|
if (history.length > global.config.general.statsBufferLength) {
|
|
while (history.length > global.config.general.statsBufferLength) {
|
|
history.pop();
|
|
}
|
|
}
|
|
global.database.setCache('walletHistory', history);
|
|
});
|
|
|
|
}
|
|
|
|
let lastBlockCheckIsOk = true;
|
|
function monitorNodes() {
|
|
global.coinFuncs.getLastBlockHeader((err, block) => {
|
|
if (err !== null) {
|
|
if (lastBlockCheckIsOk) {
|
|
lastBlockCheckIsOk = false;
|
|
global.support.sendEmail(
|
|
global.config.general.adminEmail,
|
|
'Failed to query daemon for last block header',
|
|
`The worker failed to return last block header. Please verify if the daemon is running properly.`
|
|
);
|
|
}
|
|
return
|
|
}
|
|
if (!lastBlockCheckIsOk) {
|
|
lastBlockCheckIsOk = true;
|
|
global.support.sendEmail(
|
|
global.config.general.adminEmail,
|
|
'Quering daemon for last block header is back to normal',
|
|
`An warning was sent to you indicating that the the worker failed to return the last block header.
|
|
The issue seems to be solved now.`
|
|
);
|
|
}
|
|
const sql = 'SELECT blockID, hostname, ip FROM pools WHERE last_checkin > DATE_SUB(NOW(), INTERVAL 30 MINUTE)';
|
|
global.mysql.query(sql).then(pools => {
|
|
pools.forEach(({ blockID, hostname, ip }) => {
|
|
if (blockID < block.height - 3) {
|
|
global.support.sendEmail(
|
|
global.config.general.adminEmail,
|
|
'Pool server is behind in blocks',
|
|
`The pool server: ${hostname} with IP: ${ip} is ${(block.height - blockID)} blocks behind.`
|
|
);
|
|
}
|
|
})
|
|
});
|
|
|
|
});
|
|
}
|
|
|
|
updateShareStats();
|
|
updateBlockHeader();
|
|
updatePoolStats();
|
|
updatePoolInformation();
|
|
updateWalletStats();
|
|
monitorNodes();
|
|
setInterval(updateBlockHeader, 10000);
|
|
setInterval(updatePoolStats, 5000);
|
|
setInterval(updatePoolStats, 5000, 'pplns');
|
|
setInterval(updatePoolStats, 5000, 'pps');
|
|
setInterval(updatePoolStats, 5000, 'solo');
|
|
setInterval(updatePoolInformation, 5000);
|
|
setInterval(updateWalletStats, 60000);
|
|
setInterval(monitorNodes, 300000);
|