Removing ZMQ dependencies.

master
Alexander Blair 7 years ago committed by Alexander Blair
parent cfccee540b
commit 73bf5745d3

@ -11,9 +11,6 @@ const bodyParser = require('body-parser');
const jwt = require('jsonwebtoken'); // used to create, sign, and verify tokens
const crypto = require('crypto');
const cors = require('cors');
const zmq = require('zmq');
const sock = zmq.socket('sub');
const io = require('socket.io')(server);
let addressBase58Prefix = cnUtil.address_decode(new Buffer(global.config.pool.address));
let threadName = "";
@ -39,127 +36,6 @@ app.use(cors({origin: true}));
app.use(bodyParser.urlencoded({extended: false}));
app.use(bodyParser.json());
if (!cluster.isMaster) {
// Init the ZMQ listeners here.
sock.connect('tcp://127.0.0.1:3000');
sock.connect('tcp://127.0.0.1:3001');
// miner_hash_graph - Hash graphs updated
// miner_hash_stats - Miner hashes updated
// network_block_info - New block information
// pool_stats - Pool statistics update
sock.subscribe('');
}
require('sticky-cluster')(
// server initialization function
function (callback) {
// don't do server.listen(), just pass the server instance into the callback
callback(server);
},
// options
{
concurrency: require('os').cpus().length,
port: 8001,
debug: false,
env: function (index) { return { stickycluster_worker_index: index }; }
}
);
// SocketIO Routes
// ===============
io.on('connection', (socket) => {
socket.on('room', function(room) {
socket.join(room);
});
});
// As the majority of updates come in upstream from the main SocketIO server, we use ZMQ to manage this.
// The following is the ZMQ logic. Pray to whatever deity you like.
sock.on('message', function(topic, message) {
topic = topic.toString();
message = message.toString();
/*
Registered ZMQ Messages:
miner_hash_graph - Hash graphs updated
miner_hash_stats - Miner hashes updated
network_block_info - New block information
pool_stats - Pool statistics update
payments - Payments complete - Trip the frontend to hit the frontend for new payment information
Registered Rooms:
payments
hash_chart_<address> - equiv to https://api.xmrpool.net/miner/<address>/chart/hashrate/allWorkers
worker_stats_<address> - equiv to https://api.xmrpool.net/miner/<address>/stats/allWorkers
worker_ids_<address> - equiv to https://api.xmrpool.net/miner/<address>/identifiers
address_stats_<address> - equiv to https://api.xmrpool.net/miner/<address>/stats
network_block_info - equiv to https://api.xmrpool.net/network/stats
pool_stats_<type> - equiv to https://api.xmrpool.net/pool/stats/<type>
pool_stats - equiv to https://api.xmrpool.net/pool/stats/
*/
switch(topic){
case 'payments':
io.sockets.in('payments').emit('message', 'newPaymentsAvailable');
break;
case 'miner_hash_graph':
message = JSON.parse(message);
message.forEach(function(address){
getAllWorkerHashCharts(address, function(err, data){
return io.sockets.in('hash_chart_'+address).emit('message', JSON.stringify(data));
});
});
break;
case 'miner_hash_stats':
message = JSON.parse(message);
message.forEach(function(address){
getAddressStats(address, function(err,data){
io.sockets.in('address_stats_'+address).emit('message', JSON.stringify(data));
});
getAllWorkerStats(address, function(err, data){
io.sockets.in('worker_stats_'+address).emit('message', JSON.stringify(data));
});
io.sockets.in('worker_ids_'+address).emit('message', JSON.stringify(global.database.getCache(address + '_identifiers')));
});
break;
case 'network_block_info':
io.sockets.in('block_update').emit('message', message);
break;
case 'pool_stats':
if (message === 'global'){
let localCache = global.database.getCache('pool_stats_global');
delete(localCache.minerHistory);
delete(localCache.hashHistory);
let lastPayment = global.database.getCache('lastPaymentCycle');
io.sockets.in('pool_stats').emit('message', JSON.stringify({pool_list: pool_list, pool_statistics: localCache, last_payment: !lastPayment ? 0 : lastPayment}));
} else {
let pool_type = message;
let localCache;
switch (pool_type) {
case 'pplns':
localCache = global.database.getCache('pool_stats_pplns');
localCache.fee = global.config.payout.pplnsFee;
break;
case 'pps':
localCache = global.database.getCache('pool_stats_pps');
localCache.fee = global.config.payout.ppsFee;
break;
case 'solo':
localCache = global.database.getCache('pool_stats_solo');
localCache.fee = global.config.payout.soloFee;
break;
case 'default':
io.sockets.in('pool_stats_' + message).emit('message', JSON.stringify({'error': 'Invalid pool type'}));
}
localCache.poolType = pool_type;
delete(localCache.minerHistory);
delete(localCache.hashHistory);
io.sockets.in('pool_stats_' + message).emit('message', JSON.stringify({pool_statistics: localCache}));
}
break;
}
});
// Support Functions that are reused now
function getAllWorkerHashCharts(address, callback){
let identifiers = global.database.getCache(address + '_identifiers');
@ -944,4 +820,31 @@ adminRoutes.get('/userList', function (req, res) {
app.use('/authed', secureRoutes);
app.use('/admin', adminRoutes);
// Authenticated routes
// Authenticated routes
let workerList = [];
if (cluster.isMaster) {
let numWorkers = require('os').cpus().length;
console.log('Master cluster setting up ' + numWorkers + ' workers...');
for (let i = 0; i < numWorkers; i++) {
let worker = cluster.fork();
workerList.push(worker);
}
cluster.on('online', function (worker) {
console.log('Worker ' + worker.process.pid + ' is online');
});
cluster.on('exit', function (worker, code, signal) {
console.log('Worker ' + worker.process.pid + ' died with code: ' + code + ', and signal: ' + signal);
console.log('Starting a new worker');
worker = cluster.fork();
workerList.push(worker);
});
} else {
app.listen(8001, function () {
console.log('Process ' + process.pid + ' is listening to all incoming requests');
});
}

@ -34,7 +34,9 @@ function Coin(data){
this.intPrefix = 54;
}
this.niceHashDiff = 100000;
this.supportsAutoExchange = true;
this.niceHashDiff = 200000;
this.getBlockHeaderByID = function(blockId, callback){
global.support.rpcDaemon('getblockheaderbyheight', {"height": blockId}, function (body) {
@ -154,6 +156,8 @@ function Coin(data){
};
};
this.cryptoNight = multiHashing.cryptonight;
}

@ -4,15 +4,12 @@ const async = require("async");
const debug = require("debug")("payments");
const request = require('request-json');
const range = require('range');
const zmq = require('zmq');
const sock = zmq.socket('pub');
let hexChars = new RegExp("[0-9a-f]+");
let bestExchange = global.config.payout.bestExchange;
let xmrAPIClient = request.createClient('https://xmr.to/api/v1/xmr2btc/');
let extraPaymentRound = false;
let paymentTimer = null;
sock.bindSync('tcp://127.0.0.1:3001');
let shapeshiftQueue = async.queue(function (task, callback) {
// Amount needs to be shifted in as a non-completed value, as the wallet will only take non-complete values..

@ -297,7 +297,7 @@ function Miner(id, login, pass, ipAddress, startingDiff, messageSender, protoVer
if (global.coinFuncs.validateAddress(this.address)) {
this.bitcoin = 0;
} else if (btcValidator.validate(this.address) && global.config.general.allowBitcoin) {
} else if (btcValidator.validate(this.address) && global.config.general.allowBitcoin && global.coinFuncs.supportsAutoExchange) {
this.bitcoin = 1;
} else if (btcValidator.validate(this.address)) {
this.error = "This pool does not allow payouts to bitcoin.";

@ -2,12 +2,9 @@
const debug = require("debug")("worker");
const async = require("async");
const sprintf = require("sprintf-js").sprintf;
const zmq = require('zmq');
const sock = zmq.socket('pub');
let threadName = "Worker Server ";
let cycleCount = 0;
sock.bindSync('tcp://127.0.0.1:3000');
let lastBlockHash = null;
function updateShareStats() {
@ -251,9 +248,7 @@ function updateShareStats() {
cycleCount += 1;
if (cycleCount === 6){
cycleCount = 0;
sock.send(['miner_hash_graph', JSON.stringify(activeAddresses)]);
}
sock.send(['miner_hash_stats', JSON.stringify(activeAddresses)]);
});
setTimeout(updateShareStats, 10000);
}
@ -345,7 +340,6 @@ function updatePoolStats(poolType) {
totalPayments: result[7] || 0,
roundHashes: result[8] || 0
});
sock.send(['pool_stats', poolType]);
});
}
@ -444,13 +438,6 @@ function updateBlockHeader() {
value: body.result.block_header.reward,
ts: body.result.block_header.timestamp
});
sock.send(['network_block_info', JSON.stringify({
difficulty: body.result.block_header.difficulty,
hash: body.result.block_header.hash,
height: body.result.block_header.height,
value: body.result.block_header.reward,
ts: body.result.block_header.timestamp
})]);
} else if (body.result.block_header.hash === lastBlockHash) {
console.log("Block headers identical to historical header. Ignoring");
} else {

Loading…
Cancel
Save