You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

560 lines
26 KiB

"use strict";
const debug = require("debug")("worker");
const async = require("async");
const sprintf = require("sprintf-js").sprintf;
let threadName = "Worker Server ";
let cycleCount = 0;
let lastBlockHash = null;
function updateShareStats() {
// This is an omni-worker to deal with all things share-stats related
// Time based averages are worked out on ring buffers.
// Buffer lengths? You guessed it, configured in SQL.
// Stats timeouts are 30 seconds, so everything for buffers should be there.
let currentTime = Date.now();
let activeAddresses = [];
async.waterfall([
function (callback) {
global.coinFuncs.getLastBlockHeader(function (err, body) {
if (err !== null){
return callback(err, "Invalid block header");
}
callback(null, body.height + 1);
});
},
function (height, callback) {
let locTime = Date.now() - 600000;
let identifierTime = Date.now() - 1800000;
let localStats = {pplns: 0, pps: 0, solo: 0, prop: 0, global: 0, miners: {}};
let localMinerCount = {pplns: 0, pps: 0, solo: 0, prop: 0, global: 0};
let localTimes = {
pplns: locTime, pps: locTime, solo: locTime, prop: locTime,
global: locTime, miners: {}
};
let minerList = [];
let identifiers = {};
let loopBreakout = 0;
async.doUntil(function (callback_until) {
let oldestTime = Date.now();
let loopCount = 0;
let txn = global.database.env.beginTxn({readOnly: true});
let cursor = new global.database.lmdb.Cursor(txn, global.database.shareDB);
for (let found = (cursor.goToRange(height) === height); found; found = cursor.goToNextDup()) {
cursor.getCurrentBinary(function (key, share) { // jshint ignore:line
try {
share = global.protos.Share.decode(share);
} catch (e) {
console.error(share);
return;
}
if (share.timestamp < oldestTime) {
oldestTime = share.timestamp;
}
if (share.timestamp <= identifierTime) {
return;
}
let minerID = share.paymentAddress;
if (typeof(share.paymentID) !== 'undefined' && share.paymentID.length > 10) {
minerID = minerID + '.' + share.paymentID;
}
if (minerID in identifiers && identifiers[minerID].indexOf(share.identifier) >= 0) {
loopCount += 1;
} else if (minerID in identifiers) {
identifiers[minerID].push(share.identifier);
} else {
identifiers[minerID] = [share.identifier];
}
if (share.timestamp <= locTime) {
return;
}
let minerIDWithIdentifier = minerID + "_" + share.identifier;
localStats.global += share.shares;
if (localTimes.global <= share.timestamp) {
localTimes.global = share.timestamp;
}
let minerType;
switch (share.poolType) {
case global.protos.POOLTYPE.PPLNS:
minerType = 'pplns';
localStats.pplns += share.shares;
if (localTimes.pplns <= share.timestamp) {
localTimes.pplns = share.timestamp;
}
break;
case global.protos.POOLTYPE.PPS:
localStats.pps += share.shares;
minerType = 'pps';
if (localTimes.pps <= share.timestamp) {
localTimes.pps = share.timestamp;
}
break;
case global.protos.POOLTYPE.SOLO:
localStats.solo += share.shares;
minerType = 'solo';
if (localTimes.solo <= share.timestamp) {
localTimes.solo = share.timestamp;
}
break;
}
if (minerList.indexOf(minerID) >= 0) {
localStats.miners[minerID] += share.shares;
if (localTimes.miners[minerID] < share.timestamp) {
localTimes.miners[minerID] = share.timestamp;
}
} else {
localMinerCount[minerType] += 1;
localMinerCount.global += 1;
localStats.miners[minerID] = share.shares;
localTimes.miners[minerID] = share.timestamp;
minerList.push(minerID);
}
if (minerList.indexOf(minerIDWithIdentifier) >= 0) {
localStats.miners[minerIDWithIdentifier] += share.shares;
if (localTimes.miners[minerIDWithIdentifier] < share.timestamp) {
localTimes.miners[minerIDWithIdentifier] = share.timestamp;
}
} else {
localStats.miners[minerIDWithIdentifier] = share.shares;
localTimes.miners[minerIDWithIdentifier] = share.timestamp;
minerList.push(minerIDWithIdentifier);
}
});
}
cursor.close();
txn.abort();
return callback_until(null, oldestTime);
}, function (oldestTime) {
height -= 1;
loopBreakout += 1;
if (loopBreakout > 60 || height < 0) {
return true;
}
return oldestTime <= identifierTime;
}, function (err) {
// todo: Need to finish parsing the cached data into caches for caching purproses.
let globalMinerList = global.database.getCache('minerList');
if (globalMinerList === false) {
globalMinerList = [];
}
let cache_updates = {};
// pplns: 0, pps: 0, solo: 0, prop: 0, global: 0
['pplns', 'pps', 'solo', 'prop', 'global'].forEach(function (key) {
let cachedData = global.database.getCache(key + "_stats");
if (cachedData !== false) {
cachedData.hash = Math.floor(localStats[key] / 600);
cachedData.lastHash = localTimes[key];
cachedData.minerCount = localMinerCount[key];
if (!cachedData.hasOwnProperty("hashHistory")) {
cachedData.hashHistory = [];
cachedData.minerHistory = [];
}
if (cycleCount === 0) {
cachedData.hashHistory.unshift({ts: currentTime, hs: cachedData.hash});
if (cachedData.hashHistory.length > global.config.general.statsBufferLength) {
while (cachedData.hashHistory.length > global.config.general.statsBufferLength) {
cachedData.hashHistory.pop();
}
}
cachedData.minerHistory.unshift({ts: currentTime, cn: cachedData.minerCount});
if (cachedData.minerHistory.length > global.config.general.statsBufferLength) {
while (cachedData.minerHistory.length > global.config.general.statsBufferLength) {
cachedData.minerHistory.pop();
}
}
}
} else {
cachedData = {
hash: Math.floor(localStats[key] / 600),
totalHashes: 0,
lastHash: localTimes[key],
minerCount: localMinerCount[key],
hashHistory: [{ts: currentTime, hs: cachedData.hash}],
minerHistory: [{ts: currentTime, cn: cachedData.hash}]
};
}
cache_updates[key + "_stats"] = cachedData;
});
minerList.forEach(function (miner) {
if (globalMinerList.indexOf(miner) === -1) {
globalMinerList.push(miner);
}
if (miner.indexOf('_') === -1){
activeAddresses.push(miner);
}
let cachedData = global.database.getCache(miner);
if (cachedData !== false) {
cachedData.hash = Math.floor(localStats.miners[miner] / 600);
cachedData.lastHash = localTimes.miners[miner];
if (!cachedData.hasOwnProperty("hashHistory")) {
cachedData.hashHistory = [];
}
if (cycleCount === 0){
cachedData.hashHistory.unshift({ts: currentTime, hs: cachedData.hash});
if (cachedData.hashHistory.length > global.config.general.statsBufferLength) {
while (cachedData.hashHistory.length > global.config.general.statsBufferLength) {
cachedData.hashHistory.pop();
}
}
}
} else {
cachedData = {
hash: Math.floor(localStats.miners[miner] / 600),
totalHashes: 0,
lastHash: localTimes.miners[miner],
hashHistory: [{ts: currentTime, hs: cachedData.hash}],
goodShares: 0,
badShares: 0
};
}
cache_updates[miner] = cachedData;
});
globalMinerList.forEach(function (miner) {
if (minerList.indexOf(miner) === -1) {
let minerStats = global.database.getCache(miner);
if (minerStats.hash !== 0) {
console.log("Removing: " + miner + " as an active miner from the cache.");
if (miner.indexOf('_') > -1) {
// This is a worker case.
let address_parts = miner.split('_');
let address = address_parts[0];
let worker = address_parts[1];
global.mysql.query("SELECT email FROM users WHERE username = ? AND enable_email IS true limit 1", [address]).then(function (rows) {
if (rows.length === 0) {
return;
}
// toAddress, subject, body
let emailData = {
worker: worker,
timestamp: global.support.formatDate(Date.now()),
poolEmailSig: global.config.general.emailSig
};
global.support.sendEmail(rows[0].email,
sprintf(global.config.email.workerNotHashingSubject, emailData),
sprintf(global.config.email.workerNotHashingBody, emailData));
});
}
minerStats.hash = 0;
cache_updates[miner] = minerStats;
}
}
});
Object.keys(identifiers).forEach(function (key) {
cache_updates[key + '_identifiers'] = identifiers[key];
});
cache_updates.minerList = globalMinerList;
global.database.bulkSetCache(cache_updates);
callback(null);
});
}
], function (err, result) {
cycleCount += 1;
if (cycleCount === 6){
cycleCount = 0;
}
});
setTimeout(updateShareStats, 10000);
}
function updatePoolStats(poolType) {
let cache;
if (typeof(poolType) !== 'undefined') {
cache = global.database.getCache(poolType + "_stats");
} else {
cache = global.database.getCache("global_stats");
}
async.series([
function (callback) {
debug(threadName + "Checking Influx for last 10min avg for pool stats");
return callback(null, cache.hash || 0);
},
function (callback) {
debug(threadName + "Checking Influx for last 10min avg for miner count for pool stats");
return callback(null, cache.minerCount || 0);
},
function (callback) {
debug(threadName + "Checking Influx for last 10min avg for miner count for pool stats");
return callback(null, cache.totalHashes || 0);
},
function (callback) {
debug(threadName + "Checking MySQL for last block find time for pool stats");
let cacheData = global.database.getBlockList(poolType);
if (cacheData.length === 0) {
return callback(null, 0);
}
return callback(null, Math.floor(cacheData[0].ts / 1000));
},
function (callback) {
debug(threadName + "Checking MySQL for last block find time for pool stats");
let cacheData = global.database.getBlockList(poolType);
if (cacheData.length === 0) {
return callback(null, 0);
}
return callback(null, cacheData[0].height);
},
function (callback) {
debug(threadName + "Checking MySQL for block count for pool stats");
return callback(null, global.database.getBlockList(poolType).length);
},
function (callback) {
debug(threadName + "Checking MySQL for total miners paid");
if (typeof(poolType) !== 'undefined') {
global.mysql.query("SELECT payment_address, payment_id FROM payments WHERE pool_type = ? group by payment_address, payment_id", [poolType]).then(function (rows) {
return callback(null, rows.length);
});
} else {
global.mysql.query("SELECT payment_address, payment_id FROM payments group by payment_address, payment_id").then(function (rows) {
return callback(null, rows.length);
});
}
},
function (callback) {
debug(threadName + "Checking MySQL for total transactions count");
if (typeof(poolType) !== 'undefined') {
global.mysql.query("SELECT distinct(transaction_id) from payments WHERE pool_type = ?", [poolType]).then(function (rows) {
return callback(null, rows.length);
});
} else {
global.mysql.query("SELECT count(id) as txn_count FROM transactions").then(function (rows) {
if (typeof(rows[0]) !== 'undefined') {
return callback(null, rows[0].txn_count);
} else {
return callback(null, 0);
}
});
}
},
function (callback) {
debug(threadName + "Checking Influx for last 10min avg for miner count for pool stats");
return callback(null, cache.roundHashes || 0);
}
], function (err, result) {
if (typeof(poolType) === 'undefined') {
poolType = 'global';
}
global.database.setCache('pool_stats_' + poolType, {
hashRate: result[0],
miners: result[1],
totalHashes: result[2],
lastBlockFoundTime: result[3] || 0,
lastBlockFound: result[4] || 0,
totalBlocksFound: result[5] || 0,
totalMinersPaid: result[6] || 0,
totalPayments: result[7] || 0,
roundHashes: result[8] || 0
});
});
}
function updatePoolPorts(poolServers) {
debug(threadName + "Updating pool ports");
let local_cache = {global: []};
let portCount = 0;
global.mysql.query("select * from ports where hidden = 0 and lastSeen >= NOW() - INTERVAL 10 MINUTE").then(function (rows) {
rows.forEach(function (row) {
portCount += 1;
if (!local_cache.hasOwnProperty(row.port_type)) {
local_cache[row.port_type] = [];
}
local_cache[row.port_type].push({
host: poolServers[row.pool_id],
port: row.network_port,
difficulty: row.starting_diff,
description: row.description,
miners: row.miners
});
if (portCount === rows.length) {
let local_counts = {};
let port_diff = {};
let port_miners = {};
let pool_type_count = 0;
let localPortInfo = {};
for (let pool_type in local_cache) { // jshint ignore:line
pool_type_count += 1;
local_cache[pool_type].forEach(function (portData) { // jshint ignore:line
if (!local_counts.hasOwnProperty(portData.port)) {
local_counts[portData.port] = 0;
}
if (!port_diff.hasOwnProperty(portData.port)) {
port_diff[portData.port] = portData.difficulty;
}
if (!port_miners.hasOwnProperty(portData.port)) {
port_miners[portData.port] = 0;
}
if (port_diff[portData.port] === portData.difficulty) {
local_counts[portData.port] += 1;
port_miners[portData.port] += portData.miners;
}
localPortInfo[portData.port] = portData.description;
if (local_counts[portData.port] === Object.keys(poolServers).length) {
local_cache.global.push({
host: {
blockID: local_cache[pool_type][0].host.blockID,
blockIDTime: local_cache[pool_type][0].host.blockIDTime,
hostname: global.config.pool.geoDNS,
},
port: portData.port,
pool_type: pool_type,
difficulty: portData.difficulty,
miners: port_miners[portData.port],
description: localPortInfo[portData.port]
});
}
});
if (pool_type_count === Object.keys(local_cache).length) {
debug(threadName + "Sending the following to the workers: " + JSON.stringify(local_cache));
global.database.setCache('poolPorts', local_cache);
}
}
}
});
});
}
function updatePoolInformation() {
let local_cache = {};
debug(threadName + "Updating pool information");
global.mysql.query("select * from pools where last_checkin >= NOW() - INTERVAL 10 MINUTE").then(function (rows) {
rows.forEach(function (row) {
local_cache[row.id] = {
ip: row.ip,
blockID: row.blockID,
blockIDTime: global.support.formatDateFromSQL(row.blockIDTime),
hostname: row.hostname
};
if (Object.keys(local_cache).length === rows.length) {
global.database.setCache('poolServers', local_cache);
updatePoolPorts(local_cache);
}
});
});
}
function updateBlockHeader() {
// Todo: Implement within the coins/<coin>.js file.
global.support.rpcDaemon('getlastblockheader', [], function (body) {
if (typeof body.error !== 'undefined'){
return console.error(`Issue getting last block header: ${JSON.stringify(body)}`);
}
if (body.result && body.result.block_header.hash !== lastBlockHash) {
lastBlockHash = body.result.block_header.hash;
global.database.setCache('networkBlockInfo', {
difficulty: body.result.block_header.difficulty,
hash: body.result.block_header.hash,
height: body.result.block_header.height,
value: body.result.block_header.reward,
ts: body.result.block_header.timestamp
});
} else if (body.result.block_header.hash === lastBlockHash) {
console.log("Block headers identical to historical header. Ignoring");
} else {
console.error("GetLastBlockHeader Error during block header update");
}
});
}
function updateWalletStats() {
async.waterfall([
function (callback) {
// Todo: Implement within the coins/<coin>.js file.
global.support.rpcWallet('getbalance', [], function (body) {
if (body.result) {
return callback(null, {
balance: body.result.balance,
unlocked: body.result.unlocked_balance,
ts: Date.now()
});
} else {
return callback(true, "Unable to process balance");
}
});
},
function (state, callback) {
// Todo: Implement within the coins/<coin>.js file.
global.support.rpcWallet('getheight', [], function (body) {
if (body.result) {
state.height = body.result.height;
return callback(null, state);
} else if (typeof body.error !== 'undefined' && body.error.message === 'Method not found') {
state.height = 0;
return callback(null, state);
} else {
return callback(true, "Unable to get current wallet height");
}
});
}
], function (err, results) {
if (err) {
return console.error("Unable to get wallet stats: " + results);
}
global.database.setCache('walletStateInfo', results);
let history = global.database.getCache('walletHistory');
if (history === false) {
history = [];
}
history.unshift(results);
history = history.sort(global.support.tsCompare);
if (history.length > global.config.general.statsBufferLength) {
while (history.length > global.config.general.statsBufferLength) {
history.pop();
}
}
global.database.setCache('walletHistory', history);
});
}
let lastBlockCheckIsOk = true;
function monitorNodes() {
global.coinFuncs.getLastBlockHeader((err, block) => {
if (err !== null) {
if (lastBlockCheckIsOk) {
lastBlockCheckIsOk = false;
global.support.sendEmail(
global.config.general.adminEmail,
'Failed to query daemon for last block header',
`The worker failed to return last block header. Please verify if the daemon is running properly.`
);
}
return
}
if (!lastBlockCheckIsOk) {
lastBlockCheckIsOk = true;
global.support.sendEmail(
global.config.general.adminEmail,
'Quering daemon for last block header is back to normal',
`An warning was sent to you indicating that the the worker failed to return the last block header.
The issue seems to be solved now.`
);
}
const sql = 'SELECT blockID, hostname, ip FROM pools WHERE last_checkin > DATE_SUB(NOW(), INTERVAL 30 MINUTE)';
global.mysql.query(sql).then(pools => {
pools.forEach(({ blockID, hostname, ip }) => {
if (blockID < block.height - 3) {
global.support.sendEmail(
global.config.general.adminEmail,
'Pool server is behind in blocks',
`The pool server: ${hostname} with IP: ${ip} is ${(block.height - blockID)} blocks behind.`
);
}
})
});
});
}
updateShareStats();
updateBlockHeader();
updatePoolStats();
updatePoolInformation();
updateWalletStats();
monitorNodes();
setInterval(updateBlockHeader, 10000);
setInterval(updatePoolStats, 5000);
setInterval(updatePoolStats, 5000, 'pplns');
setInterval(updatePoolStats, 5000, 'pps');
setInterval(updatePoolStats, 5000, 'solo');
setInterval(updatePoolInformation, 5000);
setInterval(updateWalletStats, 60000);
setInterval(monitorNodes, 300000);