Merge pull request #51 from Snipa22/socketIO

Socket io
master
Snipa22 7 years ago committed by GitHub
commit 50ee620110

@ -0,0 +1,22 @@
<html>
<script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/1.7.3/socket.io.min.js"></script>
<script>
// set-up a connection between the client and the server
var socket = io.connect('https://api.xmrpool.net/');
// let's assume that the client page, once rendered, knows what room it wants to join
socket.on('connect', function() {
// Connected, let's sign-up for to receive messages for this room
socket.emit('room', "pool_stats");
socket.emit('room', "pool_stats_pplns");
socket.emit('room', "pool_stats_solo");
socket.emit('room', "block_update");
socket.emit('room', "address_stats_46XWBqE1iwsVxSDP1qDrxhE1XvsZV6eALG5LwnoMdjbT4GPdy2bZTb99kagzxp2MMjUamTYZ4WgvZdFadvMimTjvR6Gv8hL");
});
socket.on('message', function(data) {
console.log('Incoming message:', data);
});
</script>
</html>

@ -5,8 +5,11 @@
proxy /api 127.0.0.1:8001 {
without /api
}
proxy /socket.io 127.0.0.1:8001 {
header_upstream Connection {>Connection}
header_upstream Upgrade {>Upgrade}
}
header / {
Access-Control-Allow-Origin *
Access-Control-Allow-Methods "GET, POST, OPTIONS"
Access-Control-Allow-Headers "Content-Type, x-access-token"
}

@ -17,7 +17,7 @@ sudo DEBIAN_FRONTEND=noninteractive apt-get -y upgrade
sudo debconf-set-selections <<< "mysql-server mysql-server/root_password password $ROOT_SQL_PASS"
sudo debconf-set-selections <<< "mysql-server mysql-server/root_password_again password $ROOT_SQL_PASS"
echo -e "[client]\nuser=root\npassword=$ROOT_SQL_PASS" | sudo tee /root/.my.cnf
sudo DEBIAN_FRONTEND=noninteractive apt-get -y install git python-virtualenv python3-virtualenv curl ntp build-essential screen cmake pkg-config libboost-all-dev libevent-dev libunbound-dev libminiupnpc-dev libunwind8-dev liblzma-dev libldns-dev libexpat1-dev libgtest-dev mysql-server lmdb-utils
sudo DEBIAN_FRONTEND=noninteractive apt-get -y install git python-virtualenv python3-virtualenv curl ntp build-essential screen cmake pkg-config libboost-all-dev libevent-dev libunbound-dev libminiupnpc-dev libunwind8-dev liblzma-dev libldns-dev libexpat1-dev libgtest-dev mysql-server lmdb-utils libzmq3-dev
cd ~
git clone https://github.com/Snipa22/nodejs-pool.git # Change this depending on how the deployment goes.
cd /usr/src/gtest

@ -13,7 +13,7 @@ sudo ln -s /usr/share/zoneinfo/Zulu /etc/localtime
sudo dpkg-reconfigure -f noninteractive tzdata
sudo apt-get update
sudo DEBIAN_FRONTEND=noninteractive apt-get -y upgrade
sudo DEBIAN_FRONTEND=noninteractive apt-get -y install git python-virtualenv python3-virtualenv curl ntp build-essential screen cmake pkg-config libboost-all-dev libevent-dev libunbound-dev libminiupnpc-dev libunwind8-dev liblzma-dev libldns-dev libexpat1-dev libgtest-dev
sudo DEBIAN_FRONTEND=noninteractive apt-get -y install git python-virtualenv python3-virtualenv curl ntp build-essential screen cmake pkg-config libboost-all-dev libevent-dev libunbound-dev libminiupnpc-dev libunwind8-dev liblzma-dev libldns-dev libexpat1-dev libgtest-dev libzmq3-dev
cd ~
git clone https://github.com/Snipa22/nodejs-pool.git # Change this depending on how the deployment goes.
cd /usr/src/gtest

@ -1,20 +1,22 @@
"use strict";
const express = require('express'); // call express
const app = express(); // define our app using express
const server = require('http').createServer(app);
const cluster = require('cluster');
const async = require("async");
const debug = require("debug")("api");
const btcValidator = require('wallet-address-validator');
const cnUtil = require('cryptonote-util');
let bodyParser = require('body-parser');
let jwt = require('jsonwebtoken'); // used to create, sign, and verify tokens
const bodyParser = require('body-parser');
const jwt = require('jsonwebtoken'); // used to create, sign, and verify tokens
const crypto = require('crypto');
let cors = require('cors');
const cors = require('cors');
const zmq = require('zmq');
const sock = zmq.socket('sub');
const io = require('socket.io')(server);
let addressBase58Prefix = cnUtil.address_decode(new Buffer(global.config.pool.address));
let threadName = "";
let workerList = [];
if (cluster.isMaster) {
threadName = "(Master) ";
@ -22,10 +24,6 @@ if (cluster.isMaster) {
threadName = "(Worker " + cluster.worker.id + " - " + process.pid + ") ";
}
app.use(cors());
app.use(bodyParser.urlencoded({extended: false}));
app.use(bodyParser.json());
let pool_list = [];
if(global.config.pplns.enable === true){
pool_list.push('pplns');
@ -37,6 +35,251 @@ if(global.config.solo.enable === true){
pool_list.push('solo');
}
app.use(cors({origin: true}));
app.use(bodyParser.urlencoded({extended: false}));
app.use(bodyParser.json());
if (!cluster.isMaster) {
// Init the ZMQ listeners here.
sock.connect('tcp://127.0.0.1:3000');
sock.connect('tcp://127.0.0.1:3001');
// miner_hash_graph - Hash graphs updated
// miner_hash_stats - Miner hashes updated
// network_block_info - New block information
// pool_stats - Pool statistics update
sock.subscribe('');
}
require('sticky-cluster')(
// server initialization function
function (callback) {
// don't do server.listen(), just pass the server instance into the callback
callback(server);
},
// options
{
concurrency: require('os').cpus().length,
port: 8001,
debug: false,
env: function (index) { return { stickycluster_worker_index: index }; }
}
);
// SocketIO Routes
// ===============
io.on('connection', (socket) => {
socket.on('room', function(room) {
socket.join(room);
});
});
// As the majority of updates come in upstream from the main SocketIO server, we use ZMQ to manage this.
// The following is the ZMQ logic. Pray to whatever deity you like.
sock.on('message', function(topic, message) {
topic = topic.toString();
message = message.toString();
/*
Registered ZMQ Messages:
miner_hash_graph - Hash graphs updated
miner_hash_stats - Miner hashes updated
network_block_info - New block information
pool_stats - Pool statistics update
payments - Payments complete - Trip the frontend to hit the frontend for new payment information
Registered Rooms:
payments
hash_chart_<address> - equiv to https://api.xmrpool.net/miner/<address>/chart/hashrate/allWorkers
worker_stats_<address> - equiv to https://api.xmrpool.net/miner/<address>/stats/allWorkers
worker_ids_<address> - equiv to https://api.xmrpool.net/miner/<address>/identifiers
address_stats_<address> - equiv to https://api.xmrpool.net/miner/<address>/stats
network_block_info - equiv to https://api.xmrpool.net/network/stats
pool_stats_<type> - equiv to https://api.xmrpool.net/pool/stats/<type>
pool_stats - equiv to https://api.xmrpool.net/pool/stats/
*/
switch(topic){
case 'payments':
io.sockets.in('payments').emit('message', 'newPaymentsAvailable');
break;
case 'miner_hash_graph':
message = JSON.parse(message);
message.forEach(function(address){
getAllWorkerHashCharts(address, function(err, data){
return io.sockets.in('hash_chart_'+address).emit('message', JSON.stringify(data));
});
});
break;
case 'miner_hash_stats':
message = JSON.parse(message);
message.forEach(function(address){
getAddressStats(address, function(err,data){
io.sockets.in('address_stats_'+address).emit('message', JSON.stringify(data));
});
getAllWorkerStats(address, function(err, data){
io.sockets.in('worker_stats_'+address).emit('message', JSON.stringify(data));
});
io.sockets.in('worker_ids_'+address).emit('message', JSON.stringify(global.database.getCache(address + '_identifiers')));
});
break;
case 'network_block_info':
io.sockets.in('block_update').emit('message', message);
break;
case 'pool_stats':
if (message === 'global'){
let localCache = global.database.getCache('pool_stats_global');
delete(localCache.minerHistory);
delete(localCache.hashHistory);
let lastPayment = global.database.getCache('lastPaymentCycle');
io.sockets.in('pool_stats').emit('message', JSON.stringify({pool_list: pool_list, pool_statistics: localCache, last_payment: !lastPayment ? 0 : lastPayment}));
} else {
let pool_type = message;
let localCache;
switch (pool_type) {
case 'pplns':
localCache = global.database.getCache('pool_stats_pplns');
localCache.fee = global.config.payout.pplnsFee;
break;
case 'pps':
localCache = global.database.getCache('pool_stats_pps');
localCache.fee = global.config.payout.ppsFee;
break;
case 'solo':
localCache = global.database.getCache('pool_stats_solo');
localCache.fee = global.config.payout.soloFee;
break;
case 'default':
io.sockets.in('pool_stats_' + message).emit('message', JSON.stringify({'error': 'Invalid pool type'}));
}
localCache.poolType = pool_type;
delete(localCache.minerHistory);
delete(localCache.hashHistory);
io.sockets.in('pool_stats_' + message).emit('message', JSON.stringify({pool_statistics: localCache}));
}
break;
}
});
// Support Functions that are reused now
function getAllWorkerHashCharts(address, callback){
let identifiers = global.database.getCache(address + '_identifiers');
let returnData = {global: global.database.getCache(address)['hashHistory']};
if (identifiers !== false){
identifiers.sort();
} else {
return returnData;
}
let intCounter = 0;
identifiers.forEach(function(identifier){
returnData[identifier] = global.database.getCache(address+"_"+identifier)['hashHistory'];
intCounter += 1;
if (intCounter === identifiers.length){
return callback(null, returnData);
}
});
}
function getAllWorkerStats(address, callback){
let identifiers = global.database.getCache(address + '_identifiers');
let globalCache = global.database.getCache(address);
let returnData = {global: {
lts: Math.floor(globalCache.lastHash / 1000),
identifer: 'global',
hash: globalCache.hash,
totalHash: globalCache.totalHashes
}};
let intCounter = 0;
if (identifiers === false){
return callback(null, returnData);
}
identifiers.sort().forEach(function(identifier){
let cachedData = global.database.getCache(address+"_"+identifier);
returnData[identifier] = {
lts: Math.floor(cachedData.lastHash / 1000),
identifer: identifier,
hash: cachedData.hash,
totalHash: cachedData.totalHashes
};
intCounter += 1;
if (intCounter === identifiers.length){
return callback(null, returnData);
}
});
}
function getAddressStats(address, extCallback){
let address_parts = address.split('.');
let address_pt = address_parts[0];
let payment_id = address_parts[1];
let cachedData = global.database.getCache(address);
let paidQuery = "SELECT SUM(amount) as amt FROM payments WHERE payment_address = ? AND payment_id = ?";
let txnQuery = "SELECT count(id) as amt FROM payments WHERE payment_address = ? AND payment_id = ?";
let unpaidQuery = "SELECT SUM(amount) as amt FROM balance WHERE payment_address = ? AND payment_id = ?";
if (typeof(payment_id) === 'undefined') {
paidQuery = "SELECT SUM(amount) as amt FROM payments WHERE payment_address = ? AND payment_id IS ?";
txnQuery = "SELECT count(id) as amt FROM payments WHERE payment_address = ? AND payment_id IS ?";
unpaidQuery = "SELECT SUM(amount) as amt FROM balance WHERE payment_address = ? AND payment_id IS ?";
}
async.waterfall([
function (callback) {
debug(threadName + "Checking Influx for last 10min avg for /miner/address/stats");
return callback(null, {hash: cachedData.hash, identifier: 'global', lastHash: Math.floor(cachedData.lastHash / 1000),
totalHashes: cachedData.totalHashes, validShares: Number(cachedData.goodShares), invalidShares: Number(cachedData.badShares)});
},
function (returnData, callback) {
debug(threadName + "Checking MySQL total amount paid for /miner/address/stats");
global.mysql.query(paidQuery, [address_pt, payment_id]).then(function (rows) {
if (typeof(rows[0]) === 'undefined') {
returnData.amtPaid = 0;
} else {
returnData.amtPaid = rows[0].amt;
if (returnData.amtPaid === null) {
returnData.amtPaid = 0;
}
}
return callback(null, returnData);
});
},
function (returnData, callback) {
debug(threadName + "Checking MySQL total amount unpaid for /miner/address/stats");
global.mysql.query(unpaidQuery, [address_pt, payment_id]).then(function (rows) {
if (typeof(rows[0]) === 'undefined') {
returnData.amtDue = 0;
} else {
returnData.amtDue = rows[0].amt;
if (returnData.amtDue === null) {
returnData.amtDue = 0;
}
}
return callback(null, returnData);
});
},
function (returnData, callback) {
debug(threadName + "Checking MySQL total amount unpaid for /miner/address/stats");
global.mysql.query(txnQuery, [address_pt, payment_id]).then(function (rows) {
if (typeof(rows[0]) === 'undefined') {
returnData.txnCount = 0;
} else {
returnData.txnCount = rows[0].amt;
if (returnData.txnCount === null) {
returnData.txnCount = 0;
}
}
return callback(true, returnData);
});
}
], function (err, result) {
debug(threadName + "Result information for " + address + ": " + JSON.stringify(result));
if (err === true) {
return extCallback(null, result);
}
if (err) {
console.error(threadName + "Error within the miner stats identifier func");
return extCallback(err.toString());
}
});
}
// ROUTES FOR OUR API
// =============================================================================
@ -300,31 +543,8 @@ app.get('/miner/:address/payments', function (req, res) {
});
app.get('/miner/:address/stats/allWorkers', function (req, res) {
let address = req.params.address;
let identifiers = global.database.getCache(address + '_identifiers');
let globalCache = global.database.getCache(address);
let returnData = {global: {
lts: Math.floor(globalCache.lastHash / 1000),
identifer: 'global',
hash: globalCache.hash,
totalHash: globalCache.totalHashes
}};
let intCounter = 0;
if (identifiers === false){
return res.json(returnData);
}
identifiers.sort().forEach(function(identifier){
let cachedData = global.database.getCache(req.params.address+"_"+identifier);
returnData[identifier] = {
lts: Math.floor(cachedData.lastHash / 1000),
identifer: identifier,
hash: cachedData.hash,
totalHash: cachedData.totalHashes
};
intCounter += 1;
if (intCounter === identifiers.length){
return res.json(returnData);
}
getAllWorkerStats(req.params.address, function(err, data){
return res.json(data);
});
});
@ -353,21 +573,8 @@ app.get('/miner/:address/chart/hashrate', function (req, res) {
});
app.get('/miner/:address/chart/hashrate/allWorkers', function (req, res) {
let address = req.params.address;
let identifiers = global.database.getCache(address + '_identifiers');
let returnData = {global: global.database.getCache(req.params.address)['hashHistory']};
if (identifiers !== false){
identifiers.sort();
} else {
return res.json(returnData);
}
let intCounter = 0;
identifiers.forEach(function(identifier){
returnData[identifier] = global.database.getCache(req.params.address+"_"+identifier)['hashHistory'];
intCounter += 1;
if (intCounter === identifiers.length){
return res.json(returnData);
}
getAllWorkerHashCharts(req.params.address, function(err, data){
return res.json(data);
});
});
@ -376,76 +583,8 @@ app.get('/miner/:address/chart/hashrate/:identifier', function (req, res) {
});
app.get('/miner/:address/stats', function (req, res) {
let address = req.params.address;
let address_parts = req.params.address.split('.');
let address_pt = address_parts[0];
let payment_id = address_parts[1];
let cachedData = global.database.getCache(address);
let paidQuery = "SELECT SUM(amount) as amt FROM payments WHERE payment_address = ? AND payment_id = ?";
let txnQuery = "SELECT count(id) as amt FROM payments WHERE payment_address = ? AND payment_id = ?";
let unpaidQuery = "SELECT SUM(amount) as amt FROM balance WHERE payment_address = ? AND payment_id = ?";
if (typeof(payment_id) === 'undefined') {
paidQuery = "SELECT SUM(amount) as amt FROM payments WHERE payment_address = ? AND payment_id IS ?";
txnQuery = "SELECT count(id) as amt FROM payments WHERE payment_address = ? AND payment_id IS ?";
unpaidQuery = "SELECT SUM(amount) as amt FROM balance WHERE payment_address = ? AND payment_id IS ?";
}
async.waterfall([
function (callback) {
debug(threadName + "Checking Influx for last 10min avg for /miner/address/stats");
return callback(null, {hash: cachedData.hash, identifier: 'global', lastHash: Math.floor(cachedData.lastHash / 1000),
totalHashes: cachedData.totalHashes, validShares: Number(cachedData.goodShares), invalidShares: Number(cachedData.badShares)});
},
function (returnData, callback) {
debug(threadName + "Checking MySQL total amount paid for /miner/address/stats");
global.mysql.query(paidQuery, [address_pt, payment_id]).then(function (rows) {
if (typeof(rows[0]) === 'undefined') {
returnData.amtPaid = 0;
} else {
returnData.amtPaid = rows[0].amt;
if (returnData.amtPaid === null) {
returnData.amtPaid = 0;
}
}
return callback(null, returnData);
});
},
function (returnData, callback) {
debug(threadName + "Checking MySQL total amount unpaid for /miner/address/stats");
global.mysql.query(unpaidQuery, [address_pt, payment_id]).then(function (rows) {
if (typeof(rows[0]) === 'undefined') {
returnData.amtDue = 0;
} else {
returnData.amtDue = rows[0].amt;
if (returnData.amtDue === null) {
returnData.amtDue = 0;
}
}
return callback(null, returnData);
});
},
function (returnData, callback) {
debug(threadName + "Checking MySQL total amount unpaid for /miner/address/stats");
global.mysql.query(txnQuery, [address_pt, payment_id]).then(function (rows) {
if (typeof(rows[0]) === 'undefined') {
returnData.txnCount = 0;
} else {
returnData.txnCount = rows[0].amt;
if (returnData.txnCount === null) {
returnData.txnCount = 0;
}
}
return callback(true, returnData);
});
}
], function (err, result) {
debug(threadName + "Result information for " + address + ": " + JSON.stringify(result));
if (err === true) {
return res.json(result);
}
if (err) {
console.error(threadName + "Error within the miner stats identifier func");
return res.json({'error': err.toString()});
}
getAddressStats(req.params.address, function(err, data){
return res.json(data);
});
});
@ -805,29 +944,4 @@ adminRoutes.get('/userList', function (req, res) {
app.use('/authed', secureRoutes);
app.use('/admin', adminRoutes);
// Authenticated routes
if (cluster.isMaster) {
let numWorkers = require('os').cpus().length;
console.log('Master cluster setting up ' + numWorkers + ' workers...');
for (let i = 0; i < numWorkers; i++) {
let worker = cluster.fork();
workerList.push(worker);
}
cluster.on('online', function (worker) {
console.log('Worker ' + worker.process.pid + ' is online');
});
cluster.on('exit', function (worker, code, signal) {
console.log('Worker ' + worker.process.pid + ' died with code: ' + code + ', and signal: ' + signal);
console.log('Starting a new worker');
worker = cluster.fork();
workerList.push(worker);
});
} else {
app.listen(8001, function () {
console.log('Process ' + process.pid + ' is listening to all incoming requests');
});
}
// Authenticated routes

@ -4,12 +4,15 @@ const async = require("async");
const debug = require("debug")("payments");
const request = require('request-json');
const range = require('range');
const zmq = require('zmq');
const sock = zmq.socket('pub');
let hexChars = new RegExp("[0-9a-f]+");
let bestExchange = global.config.payout.bestExchange;
let xmrAPIClient = request.createClient('https://xmr.to/api/v1/xmr2btc/');
let extraPaymentRound = false;
let paymentTimer = null;
sock.bindSync('tcp://127.0.0.1:3001');
let shapeshiftQueue = async.queue(function (task, callback) {
// Amount needs to be shifted in as a non-completed value, as the wallet will only take non-complete values..
@ -344,6 +347,7 @@ paymentQueue.drain = function(){
extraPaymentRound = false;
paymentTimer = setInterval(makePayments, global.config.payout.timer * 60 * 1000);
global.database.setCache('lastPaymentCycle', Math.floor(Date.now()/1000));
sock.send(['payments', 'complete']);
};
function updateShapeshiftCompletion() {

@ -2,9 +2,13 @@
const debug = require("debug")("worker");
const async = require("async");
const sprintf = require("sprintf-js").sprintf;
const zmq = require('zmq');
const sock = zmq.socket('pub');
let threadName = "Worker Server ";
let cycleCount = 0;
sock.bindSync('tcp://127.0.0.1:3000');
let lastBlockHash = null;
function updateShareStats() {
// This is an omni-worker to deal with all things share-stats related
@ -12,6 +16,7 @@ function updateShareStats() {
// Buffer lengths? You guessed it, configured in SQL.
// Stats timeouts are 30 seconds, so everything for buffers should be there.
let currentTime = Date.now();
let activeAddresses = [];
async.waterfall([
function (callback) {
global.coinFuncs.getLastBlockHeader(function (body) {
@ -173,6 +178,9 @@ function updateShareStats() {
if (globalMinerList.indexOf(miner) === -1) {
globalMinerList.push(miner);
}
if (miner.indexOf('_') === -1){
activeAddresses.push(miner);
}
let cachedData = global.database.getCache(miner);
if (cachedData !== false) {
cachedData.hash = Math.floor(localStats.miners[miner] / 600);
@ -241,7 +249,9 @@ function updateShareStats() {
cycleCount += 1;
if (cycleCount === 6){
cycleCount = 0;
sock.send(['miner_hash_graph', JSON.stringify(activeAddresses)]);
}
sock.send(['miner_hash_stats', JSON.stringify(activeAddresses)]);
});
setTimeout(updateShareStats, 10000);
}
@ -333,6 +343,7 @@ function updatePoolStats(poolType) {
totalPayments: result[7] || 0,
roundHashes: result[8] || 0
});
sock.send(['pool_stats', poolType]);
});
}
@ -422,7 +433,8 @@ function updatePoolInformation() {
function updateBlockHeader() {
global.support.rpcDaemon('getlastblockheader', [], function (body) {
if (body.result) {
if (body.result && body.result.block_header.hash !== lastBlockHash) {
lastBlockHash = body.result.block_header.hash;
global.database.setCache('networkBlockInfo', {
difficulty: body.result.block_header.difficulty,
hash: body.result.block_header.hash,
@ -430,6 +442,13 @@ function updateBlockHeader() {
value: body.result.block_header.reward,
ts: body.result.block_header.timestamp
});
sock.send(['network_block_info', JSON.stringify({
difficulty: body.result.block_header.difficulty,
hash: body.result.block_header.hash,
height: body.result.block_header.height,
value: body.result.block_header.reward,
ts: body.result.block_header.timestamp
})]);
} else {
console.error("GetLastBlockHeader Error during block header update");
}

@ -36,8 +36,11 @@
"request": "^2.79.0",
"request-json": "0.6.1",
"shapeshift.io": "1.3.0",
"socketio": "^1.0.0",
"sprintf-js": "^1.0.3",
"sticky-cluster": "^0.3.1",
"uuid": "3.0.1",
"wallet-address-validator": "0.1.0"
"wallet-address-validator": "0.1.0",
"zmq": "^2.15.3"
}
}

Loading…
Cancel
Save