Merge branch 'count-by-account' into master

pull/1/head
Jethro Grassie 4 years ago
commit 30edb1a3e4
No known key found for this signature in database
GPG Key ID: DE8ED755616565BB

@ -0,0 +1,218 @@
/*
Copyright (c) 2014-2020, The Monero Project
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
3. Neither the name of the copyright holder nor the names of its contributors
may be used to endorse or promote products derived from this software without
specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Parts of the project are originally copyright (c) 2012-2013 The Cryptonote
developers.
*/
#include "growbag.h"
#include <assert.h>
#include <string.h>
#include <stdlib.h>
#include <stdbool.h>
struct gbag_t
{
size_t z;
size_t max;
size_t ref;
char * b;
char * n;
char * ni;
gbag_recycle rc;
gbag_moved mv;
};
void
gbag_new(gbag_t **out, size_t count, size_t size,
gbag_recycle recycle, gbag_moved moved)
{
assert(*out==NULL && count && size);
gbag_t *gb = (gbag_t*) calloc(1, sizeof(gbag_t));
gb->z = size;
gb->max = count;
gb->ref = 0;
gb->b = (char*) calloc(gb->max, gb->z);
gb->n = gb->b;
gb->ni = gb->b;
gb->rc = recycle;
gb->mv = moved;
*out = gb;
}
void
gbag_free(gbag_t *gb)
{
assert(gb && gb->max && gb->b);
char *end = gb->b + (gb->max * gb->z);
char *cur = gb->b;
if (gb->rc)
{
while (cur < end)
{
gb->rc(cur);
cur += gb->z;
}
}
free(gb->b);
gb->max = 0;
gb->ref = 0;
gb->b = NULL;
gb->n = NULL;
gb->ni = NULL;
gb->rc = NULL;
gb->mv = NULL;
}
void *
gbag_get(gbag_t *gb)
{
assert(gb && gb->max && gb->b);
char *end = gb->b + (gb->max * gb->z);
char *from = gb->n;
size_t nm, cz;
char *b = NULL;
if (gb->ref == gb->max)
goto grow;
scan:
do
{
if (!*gb->n)
{
gb->ref++;
return gb->n;
}
gb->n += gb->z;
}
while(gb->n < end);
if (from != gb->b)
{
end = from;
gb->n = gb->b;
from = gb->n;
goto scan;
}
else
{
grow:
cz = gb->max * gb->z;
nm = gb->max << 1;
b = (char*) realloc(gb->b, nm * gb->z);
if (b == NULL)
return NULL;
memset(b + cz, 0, cz);
gb->max = nm;
if (gb->mv && gb->b != b)
gb->mv(b, cz);
gb->b = b;
gb->n = b + cz;
gb->ref++;
return gb->n;
}
return NULL;
}
void
gbag_put(gbag_t *gb, void *item)
{
assert(gb && item && gb->ref>0);
if (gb->rc)
gb->rc(item);
memset(item, 0, gb->z);
gb->n = (char*)item;
gb->ref--;
}
size_t
gbag_max(gbag_t *gb)
{
return gb->max;
}
size_t
gbag_used(gbag_t *gb)
{
return gb->ref;
}
void *
gbag_find(gbag_t *gb, const void *key, gbag_cmp cmp)
{
assert(gb && gb->b && gb->max);
return gbag_find_after(gb, key, cmp, NULL);
}
void *
gbag_find_after(gbag_t *gb, const void *key, gbag_cmp cmp, void *from)
{
assert(gb && gb->b && gb->max);
char *s = gb->b;
char *e = gb->b + (gb->max * gb->z);
if (from)
s = ((char*)from) + gb->z;
int c = (e-s)/gb->z;
return bsearch(key, s, c, gb->z, cmp);
}
void *
gbag_first(gbag_t *gb)
{
assert(gb && gb->b && gb->max);
char *s = gb->b;
char *e = gb->b + (gb->max * gb->z);
gb->ni = s;
do
{
if (*s)
return s;
s += gb->z;
}
while (s<e);
return NULL;
}
void *
gbag_next(gbag_t *gb, void* from)
{
assert(gb && gb->b && gb->max);
if (from)
gb->ni = ((char*)from) + gb->z;
char *e = gb->b + (gb->max * gb->z);
char *s = gb->ni;
while (s<e)
{
gb->ni += gb->z;
if (*s)
return s;
s += gb->z;
}
return NULL;
}

@ -0,0 +1,60 @@
/*
Copyright (c) 2014-2020, The Monero Project
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
3. Neither the name of the copyright holder nor the names of its contributors
may be used to endorse or promote products derived from this software without
specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Parts of the project are originally copyright (c) 2012-2013 The Cryptonote
developers.
*/
/* A bag of memory that exponentially grows */
#ifndef GBAG_H
#define GBAG_H
#include <stddef.h>
typedef struct gbag_t gbag_t;
typedef void (*gbag_recycle)(void*);
typedef void (*gbag_moved)(const void*,size_t);
typedef int (*gbag_cmp)(const void*,const void*);
void gbag_new(gbag_t **out, size_t count, size_t size,
gbag_recycle recycle, gbag_moved moved);
void gbag_free(gbag_t *gb);
void * gbag_get(gbag_t *gb);
void gbag_put(gbag_t *gb, void *item);
size_t gbag_max(gbag_t *gb);
size_t gbag_used(gbag_t *gb);
void * gbag_find(gbag_t *gb, const void *key, gbag_cmp cmp);
void * gbag_find_after(gbag_t *gb, const void *key, gbag_cmp cmp, void* from);
void * gbag_first(gbag_t *gb);
void * gbag_next(gbag_t *gb, void* from);
#endif

@ -69,9 +69,11 @@ developers.
#include "log.h"
#include "webui.h"
#include "forkoff.h"
#include "growbag.h"
#include "uthash.h"
#define MAX_LINE 8192
#define POOL_CLIENTS_GROW 0x4000
#define CLIENTS_INIT 0x4000
#define RPC_BODY_MAX 8192
#define JOB_BODY_MAX 8192
#define ERROR_BODY_MAX 512
@ -133,6 +135,17 @@ enum msgbin_type { BIN_PING, BIN_CONNECT, BIN_DISCONNECT, BIN_SHARE,
BIN_BLOCK, BIN_STATS, BIN_BALANCE };
const unsigned char msgbin[] = {0x4D,0x4E,0x52,0x4F,0x50,0x4F,0x4F,0x4C};
/* 2m, 10m, 30m, 1h, 1d, 1w */
const unsigned hr_intervals[] = {120,600,1800,3600,86400,604800};
typedef struct hr_stats_t
{
time_t last_calc;
uint64_t diff_since;
/* 2m, 10m, 30m, 1h, 1d, 1w */
double avg[6];
} hr_stats_t;
typedef struct config_t
{
char rpc_host[MAX_HOST];
@ -206,19 +219,25 @@ typedef struct client_t
char agent[256];
bstack_t *active_jobs;
uint64_t hashes;
hr_stats_t hr_stats;
time_t connected_since;
bool is_xnp;
uint32_t mode;
uint8_t bad_shares;
bool downstream;
uint32_t downstream_miners;
uint32_t downstream_accounts;
UT_hash_handle hh;
} client_t;
typedef struct pool_clients_t
typedef struct account_t
{
client_t *clients;
size_t count;
} pool_clients_t;
char address[ADDRESS_MAX];
size_t worker_count;
time_t connected_since;
uint64_t hashes;
hr_stats_t hr_stats;
UT_hash_handle hh;
} account_t;
typedef struct share_t
{
@ -248,14 +267,15 @@ typedef struct payment_t
typedef struct rpc_callback_t rpc_callback_t;
typedef void (*rpc_callback_fun)(const char*, rpc_callback_t*);
typedef void (*rpc_datafree_fun)(void*);
struct rpc_callback_t
{
rpc_callback_fun f;
rpc_callback_fun cf;
void *data;
rpc_datafree_fun df;
};
static config_t config;
static pool_clients_t pool_clients;
static bstack_t *bst;
static bstack_t *bsh;
static struct event_base *pool_base;
@ -281,6 +301,8 @@ static pthread_cond_t cond_clients = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t mutex_clients = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t mutex_log = PTHREAD_MUTEX_INITIALIZER;
static pthread_rwlock_t rwlock_tx = PTHREAD_RWLOCK_INITIALIZER;
static pthread_rwlock_t rwlock_acc = PTHREAD_RWLOCK_INITIALIZER;
static pthread_rwlock_t rwlock_cfd = PTHREAD_RWLOCK_INITIALIZER;
static FILE *fd_log;
static unsigned char sec_view[32];
static unsigned char pub_spend[32];
@ -293,6 +315,10 @@ static struct event *timer_10s;
static time_t upstream_last_time;
static uint64_t upstream_last_height;
static uint32_t miner_count;
static client_t *clients_by_fd = NULL;
static account_t *accounts = NULL;
static gbag_t *bag_accounts;
static gbag_t *bag_clients;
#ifdef HAVE_RX
extern void rx_stop_mining();
@ -323,12 +349,44 @@ void rx_slow_hash_free_state(){}
} \
}
static void
hr_update(hr_stats_t *stats)
{
/*
Update some time decayed EMA hashrates.
*/
time_t now = time(NULL);
double t = difftime(now, stats->last_calc);
if (t <= 0)
return;
double h = stats->diff_since;
double d, p, z;
unsigned i = sizeof(hr_intervals)/sizeof(hr_intervals[0]);
while (i--)
{
unsigned inter = hr_intervals[i];
double *f = &stats->avg[i];
d = t/inter;
if (d > 32)
d = 32;
p = 1 - 1.0 / exp(d);
z = 1 + p;
*f += (h / t * p);
*f /= z;
if (*f < 2e-16)
*f = 0;
}
stats->diff_since = 0;
stats->last_calc = now;
}
static inline rpc_callback_t *
rpc_callback_new(rpc_callback_fun f, void *data)
rpc_callback_new(rpc_callback_fun cf, void *data, rpc_datafree_fun df)
{
rpc_callback_t *c = calloc(1, sizeof(rpc_callback_t));
c->f = f;
c->cf = cf;
c->data = data;
c->df = df;
return c;
}
@ -338,10 +396,21 @@ rpc_callback_free(rpc_callback_t *callback)
if (!callback)
return;
if (callback->data)
free(callback->data);
{
if (callback->df)
callback->df(callback->data);
else
free(callback->data);
}
free(callback);
}
static inline void
rpc_bag_free(void* data)
{
gbag_free((gbag_t*)data);
}
static int
compare_uint64(const MDB_val *a, const MDB_val *b)
{
@ -603,35 +672,21 @@ store_block(uint64_t height, block_t *block)
return rc;
}
uint64_t
miner_hr(const char *address)
void
account_hr(double *avg, const char *address)
{
pthread_mutex_lock(&mutex_clients);
clients_reading++;
pthread_mutex_unlock(&mutex_clients);
client_t *c = pool_clients.clients;
uint64_t hr = 0;
for (size_t i = 0; i < pool_clients.count; i++, c++)
{
if (c->connected_since
&& !strncmp(c->address, address, ADDRESS_MAX))
{
double d = difftime(time(NULL), c->connected_since);
if (d == 0.0)
continue;
hr += c->hashes / d;
continue;
}
}
pthread_mutex_lock(&mutex_clients);
clients_reading--;
pthread_cond_signal(&cond_clients);
pthread_mutex_unlock(&mutex_clients);
return hr;
account_t *account = NULL;
pthread_rwlock_rdlock(&rwlock_acc);
HASH_FIND_STR(accounts, address, account);
if (!account)
goto bail;
memcpy(avg, account->hr_stats.avg, sizeof(account->hr_stats.avg));
bail:
pthread_rwlock_unlock(&rwlock_acc);
}
uint64_t
miner_balance(const char *address)
account_balance(const char *address)
{
int rc;
char *err;
@ -937,17 +992,9 @@ static void
update_pool_hr(void)
{
uint64_t hr = 0;
client_t *c = pool_clients.clients;
for (size_t i = 0; i < pool_clients.count; i++, c++)
{
if (c->connected_since)
{
double d = difftime(time(NULL), c->connected_since);
if (d == 0.0)
continue;
hr += c->hashes / d;
}
}
client_t *c = (client_t*)gbag_first(bag_clients);
while ((c = gbag_next(bag_clients, 0)))
hr += (uint64_t) c->hr_stats.avg[0];
log_debug("Pool hashrate: %"PRIu64, hr);
if (upstream_event)
return;
@ -1344,10 +1391,34 @@ miner_send_job(client_t *client, bool response)
}
static void
pool_clients_send_job(void)
accounts_moved(const void *items, size_t count)
{
client_t *c = pool_clients.clients;
for (size_t i = 0; i < pool_clients.count; i++, c++)
account_t *s, *e, *r;
s = (account_t*) items;
e = s + count;
pthread_rwlock_wrlock(&rwlock_acc);
while (s<e)
HASH_REPLACE_STR(accounts, address, s, r);
pthread_rwlock_unlock(&rwlock_acc);
}
static void
clients_moved(const void *items, size_t count)
{
client_t *s, *e, *r;
s = (client_t*) items;
e = s + count;
pthread_rwlock_wrlock(&rwlock_cfd);
while (s<e)
HASH_REPLACE_INT(clients_by_fd, fd, s, r);
pthread_rwlock_unlock(&rwlock_cfd);
}
static void
clients_send_job(void)
{
client_t *c = (client_t*) gbag_first(bag_clients);
while ((c = gbag_next(bag_clients, 0)))
{
if (c->fd == 0 || c->address[0] == 0 || c->downstream)
continue;
@ -1356,27 +1427,36 @@ pool_clients_send_job(void)
}
static void
pool_clients_init(void)
clients_init(void)
{
assert(pool_clients.count == 0);
pool_clients.count = POOL_CLIENTS_GROW;
pool_clients.clients = (client_t*) calloc(pool_clients.count,
sizeof(client_t));
gbag_new(&bag_accounts, CLIENTS_INIT, sizeof(account_t), 0,
accounts_moved);
gbag_new(&bag_clients, CLIENTS_INIT, sizeof(client_t), 0,
clients_moved);
}
static void
pool_clients_free(void)
clients_free(void)
{
if (!pool_clients.clients)
if (!(bag_accounts && bag_clients))
return;
client_t *c = pool_clients.clients;
for (size_t i = 0; i < pool_clients.count; i++, c++)
client_t *c = (client_t*) gbag_first(bag_clients);
while ((c = gbag_next(bag_clients, 0)))
{
if (!c->active_jobs)
continue;
client_clear_jobs(c);
}
free(pool_clients.clients);
pthread_rwlock_wrlock(&rwlock_cfd);
HASH_CLEAR(hh, clients_by_fd);
gbag_free(bag_clients);
pthread_rwlock_unlock(&rwlock_cfd);
pthread_rwlock_wrlock(&rwlock_acc);
HASH_CLEAR(hh, accounts);
gbag_free(bag_accounts);
pthread_rwlock_unlock(&rwlock_acc);
}
static void
@ -1464,7 +1544,7 @@ rpc_on_response(struct evhttp_request *req, void *arg)
char body[len+1];
evbuffer_remove(input, body, len);
body[len] = '\0';
callback->f(body, callback);
callback->cf(body, callback);
rpc_callback_free(callback);
}
@ -1659,7 +1739,7 @@ rpc_on_block_template(const char* data, rpc_callback_t *callback)
pool_stats.last_template_fetched = time(NULL);
block_template_t *top = (block_template_t*) bstack_push(bst, NULL);
response_to_block_template(result, top);
pool_clients_send_job();
clients_send_job();
json_object_put(root);
}
@ -1770,7 +1850,7 @@ startup_payout(uint64_t height)
rpc_get_request_body(body, "get_block_header_by_height", "sd",
"height", block->height);
rpc_callback_t *cb = rpc_callback_new(
rpc_on_block_header_by_height, NULL);
rpc_on_block_header_by_height, 0, 0);
rpc_request(pool_base, body, cb);
}
@ -1862,7 +1942,7 @@ rpc_on_last_block_header(const char* data, rpc_callback_t *callback)
uint64_t reserve = 17;
rpc_get_request_body(body, "get_block_template", "sssd",
"wallet_address", config.pool_wallet, "reserve_size", reserve);
rpc_callback_t *cb1 = rpc_callback_new(rpc_on_block_template, NULL);
rpc_callback_t *cb1 = rpc_callback_new(rpc_on_block_template, 0, 0);
rpc_request(pool_base, body, cb1);
uint64_t end = top->height - 60;
@ -1870,7 +1950,7 @@ rpc_on_last_block_header(const char* data, rpc_callback_t *callback)
rpc_get_request_body(body, "get_block_headers_range", "sdsd",
"start_height", start, "end_height", end);
rpc_callback_t *cb2 = rpc_callback_new(
rpc_on_block_headers_range, NULL);
rpc_on_block_headers_range, 0, 0);
rpc_request(pool_base, body, cb2);
}
@ -1955,11 +2035,12 @@ rpc_on_wallet_transferred(const char* data, rpc_callback_t *callback)
mdb_txn_abort(txn);
goto cleanup;
}
payment_t *payment = (payment_t*) callback->data;
for (; payment->amount; payment++)
gbag_t *bag_pay = (gbag_t*) callback->data;
payment_t *p = (payment_t*) gbag_first(bag_pay);
while((p = gbag_next(bag_pay, 0)))
{
MDB_cursor_op op = MDB_SET;
MDB_val key = {ADDRESS_MAX, (void*)payment->address};
MDB_val key = {ADDRESS_MAX, (void*)p->address};
MDB_val val;
rc = mdb_cursor_get(cursor, &key, &val, op);
if (rc == MDB_NOTFOUND)
@ -1975,21 +2056,21 @@ rpc_on_wallet_transferred(const char* data, rpc_callback_t *callback)
}
uint64_t current_amount = *(uint64_t*)val.mv_data;
if (current_amount >= payment->amount)
if (current_amount >= p->amount)
{
current_amount -= payment->amount;
current_amount -= p->amount;
}
else
{
log_error("Payment was more than balance: %"PRIu64" > %"PRIu64,
payment->amount, current_amount);
p->amount, current_amount);
current_amount = 0;
}
if (error)
{
log_warn("Error seen on transfer for %s with amount %"PRIu64,
payment->address, payment->amount);
p->address, p->amount);
}
MDB_val new_val = {sizeof(current_amount), (void*)&current_amount};
rc = mdb_cursor_put(cursor, &key, &new_val, MDB_CURRENT);
@ -2022,12 +2103,12 @@ rpc_on_wallet_transferred(const char* data, rpc_callback_t *callback)
goto cleanup;
}
time_t now = time(NULL);
payment = (payment_t*) callback->data;
for (; payment->amount; payment++)
p = (payment_t*) gbag_first(bag_pay);
while((p = gbag_next(bag_pay, 0)))
{
payment->timestamp = now;
MDB_val key = {ADDRESS_MAX, (void*)payment->address};
MDB_val val = {sizeof(payment_t), payment};
p->timestamp = now;
MDB_val key = {ADDRESS_MAX, (void*)p->address};
MDB_val val = {sizeof(payment_t), p};
if ((rc = mdb_cursor_put(cursor, &key, &val, MDB_APPENDDUP)) != 0)
{
err = mdb_strerror(rc);
@ -2071,12 +2152,8 @@ send_payments(void)
return rc;
}
size_t payments_count = 0;
size_t payments_max_count = 25;
size_t payments_size = payments_max_count * sizeof(payment_t);
payment_t *payments = (payment_t*) calloc(1, payments_size);
payment_t *payment = payments;
payment_t *end_payment = payment + payments_max_count;
gbag_t *bag_pay = NULL;
gbag_new(&bag_pay, 25, sizeof(payment_t), 0, 0);
MDB_cursor_op op = MDB_FIRST;
while (1)
@ -2096,52 +2173,43 @@ send_payments(void)
log_info("Sending payment: %"PRIu64", %.8s", amount, address);
strncpy(payment->address, address, ADDRESS_MAX-1);
payment->amount = amount;
payments_count++;
if (++payment == end_payment)
{
payments_size <<= 1;
payments = (payment_t*) realloc(payments, payments_size);
payment = payments + payments_max_count;
memset(payment, 0, sizeof(payment_t) * payments_max_count);
payments_max_count <<= 1;
end_payment = payments + payments_max_count;
}
payment_t *p = (payment_t*) gbag_get(bag_pay);
strncpy(p->address, address, ADDRESS_MAX-1);
p->amount = amount;
}
mdb_cursor_close(cursor);
mdb_txn_abort(txn);
if (payments_count)
size_t proc = gbag_used(bag_pay);
if (proc)
{
size_t body_size = 160 * payments_count + 128;
size_t body_size = 160 * proc + 128;
char body[body_size];
char *start = body;
char *end = body + body_size;
start = stecpy(start, "{\"id\":\"0\",\"jsonrpc\":\"2.0\",\"method\":"
"\"transfer_split\",\"params\":{"
"\"ring_size\":11,\"destinations\":[", end);
for (size_t i=0; i<payments_count; i++)
payment_t *p = (payment_t*) gbag_first(bag_pay);
while ((p = gbag_next(bag_pay, 0)))
{
payment_t *p = &payments[i];
start = stecpy(start, "{\"address\":\"", end);
start = stecpy(start, p->address, end);
start = stecpy(start, "\",\"amount\":", end);
sprintf(start, "%"PRIu64"}", p->amount);
start = body + strlen(body);
if (i != payments_count -1)
if (--proc)
start = stecpy(start, ",", end);
else
start = stecpy(start, "]}}", end);
}
log_trace(body);
rpc_callback_t *cb = rpc_callback_new(
rpc_on_wallet_transferred, payments);
rpc_on_wallet_transferred, bag_pay, rpc_bag_free);
rpc_wallet_request(pool_base, body, cb);
}
else
free(payments);
gbag_free(bag_pay);
return 0;
}
@ -2159,7 +2227,7 @@ fetch_view_key(void)
return;
char body[RPC_BODY_MAX] = {0};
rpc_get_request_body(body, "query_key", "ss", "key_type", "view_key");
rpc_callback_t *cb = rpc_callback_new(rpc_on_view_key, NULL);
rpc_callback_t *cb = rpc_callback_new(rpc_on_view_key, 0, 0);
rpc_wallet_request(pool_base, body, cb);
}
@ -2169,7 +2237,7 @@ fetch_last_block_header(void)
log_info("Fetching last block header");
char body[RPC_BODY_MAX] = {0};
rpc_get_request_body(body, "get_last_block_header", NULL);
rpc_callback_t *cb = rpc_callback_new(rpc_on_last_block_header, NULL);
rpc_callback_t *cb = rpc_callback_new(rpc_on_last_block_header, 0, 0);
rpc_request(pool_base, body, cb);
}
@ -2239,7 +2307,7 @@ trusted_send_balance(client_t *client, const char *address)
int t = BIN_BALANCE;
memcpy(data, msgbin, 8);
memcpy(data+8, &t, 1);
uint64_t balance = miner_balance(address);
uint64_t balance = account_balance(address);
memcpy(data+9, &balance, sizeof(uint64_t));
memcpy(data+9+sizeof(uint64_t), address, ADDRESS_MAX);
evbuffer_add(output, data, z);
@ -2258,7 +2326,7 @@ upstream_send_ping()
}
static void
upstream_send_client_connect(uint32_t count)
upstream_send_account_connect(uint32_t count)
{
struct evbuffer *output = bufferevent_get_output(upstream_event);
size_t z = 9 + sizeof(uint32_t);
@ -2268,11 +2336,11 @@ upstream_send_client_connect(uint32_t count)
memcpy(data+8, &t, 1);
memcpy(data+9, &count, z-9);
evbuffer_add(output, data, z);
log_trace("Sending message connect upstream");
log_trace("Sending message account connect upstream");
}
static void
upstream_send_client_disconnect()
upstream_send_account_disconnect()
{
struct evbuffer *output = bufferevent_get_output(upstream_event);
char data[9];
@ -2412,37 +2480,37 @@ upstream_send_backlog()
mdb_cursor_close(curshr);
mdb_cursor_close(curblk);
mdb_txn_abort(txn);
upstream_send_client_connect(pool_stats.connected_miners);
upstream_send_account_connect(pool_stats.connected_accounts);
}
static void
trusted_on_client_connect(client_t *client)
trusted_on_account_connect(client_t *client)
{
struct evbuffer *input = bufferevent_get_input(client->bev);
uint32_t count;
evbuffer_remove(input, &count, sizeof(uint32_t));
pool_stats.connected_miners += count;
client->downstream_miners += count;
log_trace("Downstream miner connected. "
"Miner count: %d, Pool hashrate: %"PRIu64,
pool_stats.connected_miners, pool_stats.pool_hashrate);
pool_stats.connected_accounts += count;
client->downstream_accounts += count;
log_trace("Downstream account connected. "
"Accounts: %d, Pool hashrate: %"PRIu64,
pool_stats.connected_accounts, pool_stats.pool_hashrate);
trusted_send_stats(client);
if (upstream_event)
upstream_send_client_connect(count);
upstream_send_account_connect(count);
}
static void
trusted_on_client_disconnect(client_t *client)
trusted_on_account_disconnect(client_t *client)
{
pool_stats.connected_miners--;
if (client->downstream_miners)
client->downstream_miners--;
log_trace("Downstream miner disconnected. "
pool_stats.connected_accounts--;
if (client->downstream_accounts)
client->downstream_accounts--;
log_trace("Downstream account disconnected. "
"Miner count: %d, Pool hashrate: %"PRIu64,
pool_stats.connected_miners, pool_stats.pool_hashrate);
pool_stats.connected_accounts, pool_stats.pool_hashrate);
trusted_send_stats(client);
if (upstream_event)
upstream_send_client_disconnect();
upstream_send_account_disconnect();
}
static void
@ -2459,6 +2527,8 @@ trusted_on_client_share(client_t *client)
s.difficulty);
client->hashes += s.difficulty;
pool_stats.round_hashes += s.difficulty;
client->hr_stats.diff_since += s.difficulty;
hr_update(&client->hr_stats);
rc = store_share(s.height, &s);
if (rc != 0)
log_warn("Failed to store share: %s", mdb_strerror(rc));
@ -2494,7 +2564,7 @@ upstream_on_stats(struct bufferevent *bev)
evbuffer_remove(input, &pool_stats, sizeof(pool_stats_t));
log_trace("Stats from upstream: "
"%d, %"PRIu64", %"PRIu64", %d, %"PRIu64,
pool_stats.connected_miners,
pool_stats.connected_accounts,
pool_stats.pool_hashrate,
pool_stats.round_hashes,
pool_stats.pool_blocks_found,
@ -2598,9 +2668,9 @@ upstream_on_event(struct bufferevent *bev, short error, void *ctx)
log_debug("Upstream timeout");
}
/* Update stats due to upstream disconnect */
if (pool_stats.connected_miners != miner_count)
if (pool_stats.connected_accounts != miner_count)
{
pool_stats.connected_miners = miner_count;
pool_stats.connected_accounts = miner_count;
update_pool_hr();
}
/* Wait and try to reconnect */
@ -2686,42 +2756,28 @@ timer_on_10m(int fd, short kind, void *ctx)
static void
client_add(int fd, struct bufferevent *bev, bool downstream)
{
client_t *c = pool_clients.clients;
bool resize = true;
for (size_t i = 0; i < pool_clients.count; i++, c++)
{
if (c->connected_since == 0)
{
resize = false;
break;
}
}
client_t *c;
bool resize = gbag_used(bag_clients) == gbag_max(bag_clients);
if (resize)
{
pthread_mutex_lock(&mutex_clients);
while (clients_reading)
pthread_cond_wait(&cond_clients, &mutex_clients);
pool_clients.count += POOL_CLIENTS_GROW;
c = realloc(pool_clients.clients, sizeof(client_t) *
pool_clients.count);
pool_clients.clients = c;
c += pool_clients.count - POOL_CLIENTS_GROW;
memset(c, 0, sizeof(client_t) * POOL_CLIENTS_GROW);
c = gbag_get(bag_clients);
pthread_mutex_unlock(&mutex_clients);
log_debug("Client pool can now hold %zu clients", pool_clients.count);
log_debug("Client pool can now hold %zu clients",
gbag_max(bag_clients));
}
else
memset(c, 0, sizeof(client_t));
c = gbag_get(bag_clients);
c->fd = fd;
c->bev = bev;
c->connected_since = time(NULL);
c->downstream = downstream;
bstack_new(&c->active_jobs, CLIENT_JOBS_MAX, sizeof(job_t), job_recycle);
miner_count++;
if (!downstream)
pool_stats.connected_miners++;
if (upstream_event)
upstream_send_client_connect(1);
pthread_rwlock_wrlock(&rwlock_cfd);
HASH_ADD_INT(clients_by_fd, fd, c);
pthread_rwlock_unlock(&rwlock_cfd);
}
static void
@ -2733,16 +2789,9 @@ client_find(struct bufferevent *bev, client_t **client)
*client = NULL;
return;
}
client_t *c = pool_clients.clients;
for (size_t i = 0; i < pool_clients.count; i++, c++)
{
if (c->fd == fd)
{
*client = c;
return;
}
}
*client = NULL;
pthread_rwlock_rdlock(&rwlock_cfd);
HASH_FIND_INT(clients_by_fd, &fd, *client);
pthread_rwlock_unlock(&rwlock_cfd);
}
static void
@ -2753,15 +2802,31 @@ client_clear(struct bufferevent *bev)
if (!client)
return;
client_clear_jobs(client);
if (client->downstream)
pool_stats.connected_miners -= client->downstream_miners;
else
pool_stats.connected_miners--;
account_t *account = NULL;
pthread_rwlock_rdlock(&rwlock_acc);
HASH_FIND_STR(accounts, client->address, account);
pthread_rwlock_unlock(&rwlock_acc);
if (account && account->worker_count == 1)
{
if (client->downstream)
pool_stats.connected_accounts -= client->downstream_accounts;
else
pool_stats.connected_accounts--;
if (upstream_event)
upstream_send_account_disconnect();
miner_count--;
pthread_rwlock_wrlock(&rwlock_acc);
HASH_DEL(accounts, account);
pthread_rwlock_unlock(&rwlock_acc);
gbag_put(bag_accounts, account);
}
else if (account && account->worker_count > 1)
account->worker_count--;
pthread_rwlock_wrlock(&rwlock_cfd);
HASH_DEL(clients_by_fd, client);
pthread_rwlock_unlock(&rwlock_cfd);
memset(client, 0, sizeof(client_t));
bufferevent_free(bev);
if (upstream_event)
upstream_send_client_disconnect();
miner_count--;
}
static void
@ -2831,6 +2896,28 @@ miner_on_login(json_object *message, client_t *client)
strncpy(client->address, address, sizeof(client->address)-1);
strncpy(client->worker_id, worker_id, sizeof(client->worker_id)-1);
account_t *account = NULL;
pthread_rwlock_rdlock(&rwlock_acc);
HASH_FIND_STR(accounts, client->address, account);
pthread_rwlock_unlock(&rwlock_acc);
if (!account)
{
miner_count++;
if (!client->downstream)
pool_stats.connected_accounts++;
if (upstream_event)
upstream_send_account_connect(1);
account = gbag_get(bag_accounts);
strncpy(account->address, address, sizeof(account->address)-1);
account->worker_count = 1;
account->connected_since = time(NULL);
account->hashes = 0;
pthread_rwlock_wrlock(&rwlock_acc);
HASH_ADD_STR(accounts, address, account);
pthread_rwlock_unlock(&rwlock_acc);
}
else
account->worker_count++;
uuid_t cid;
uuid_generate(cid);
bin_to_hex((const unsigned char*)cid, sizeof(uuid_t),
@ -3137,7 +3224,17 @@ post_hash:
BN_free(rh);
/* Process share */
account_t *account = NULL;
pthread_rwlock_rdlock(&rwlock_acc);
HASH_FIND_STR(accounts, client->address, account);
client->hashes += job->target;
client->hr_stats.diff_since += job->target;
account->hashes += job->target;
account->hr_stats.diff_since += job->target;
hr_update(&client->hr_stats);
/* TODO: account hr should be called less freq */
hr_update(&account->hr_stats);
pthread_rwlock_unlock(&rwlock_acc);
time_t now = time(NULL);
bool can_store = true;
log_trace("Checking hash against block difficulty: "
@ -3156,7 +3253,7 @@ post_hash:
"\"submit_block\", \"params\":[\"%s\"]}",
block_hex);
rpc_callback_t *cb = rpc_callback_new(rpc_on_block_submitted, NULL);
rpc_callback_t *cb = rpc_callback_new(rpc_on_block_submitted, 0, 0);
cb->data = calloc(1, sizeof(block_t));
block_t* b = (block_t*) cb->data;
b->height = bt->height;
@ -3385,11 +3482,11 @@ trusted_on_read(struct bufferevent *bev, void *ctx)
if (len - 9 < sizeof(uint32_t))
goto unlock;
evbuffer_drain(input, 9);
trusted_on_client_connect(client);
trusted_on_account_connect(client);
break;
case BIN_DISCONNECT:
evbuffer_drain(input, 9);
trusted_on_client_disconnect(client);
trusted_on_account_disconnect(client);
break;
case BIN_SHARE:
if (len - 9 < sizeof(share_t))
@ -3492,7 +3589,7 @@ listener_on_accept(evutil_socket_t listener, short event, void *arg)
bufferevent_setwatermark(bev, EV_READ, 0, MAX_LINE);
client_add(fd, bev, base == trusted_base);
log_info("New %s connected. Miner count: %d, Pool hashrate: %"PRIu64,
type, pool_stats.connected_miners, pool_stats.pool_hashrate);
type, pool_stats.connected_accounts, pool_stats.pool_hashrate);
bufferevent_enable(bev, EV_READ|EV_WRITE);
}
@ -4077,7 +4174,7 @@ cleanup(void)
event_base_loopbreak(trusted_base);
if (pool_base)
event_base_free(pool_base);
pool_clients_free();
clients_free();
if (bsh)
bstack_free(bsh);
if (bst)
@ -4090,6 +4187,8 @@ cleanup(void)
pthread_mutex_destroy(&mutex_clients);
pthread_mutex_destroy(&mutex_log);
pthread_rwlock_destroy(&rwlock_tx);
pthread_rwlock_destroy(&rwlock_acc);
pthread_rwlock_destroy(&rwlock_cfd);
pthread_cond_destroy(&cond_clients);
log_info("Pool shutdown successfully");
if (fd_log)
@ -4252,7 +4351,7 @@ int main(int argc, char **argv)
uuid_generate(iid);
memcpy(&instance_id, iid, 4);
pool_clients_init();
clients_init();
wui_context_t uic;
uic.port = config.webui_port;

@ -35,7 +35,7 @@ developers.
#ifndef POOL_H
#define POOL_H
uint64_t miner_hr(const char *address);
uint64_t miner_balance(const char *address);
void account_hr(double *avg, const char *address);
uint64_t account_balance(const char *address);
#endif

File diff suppressed because it is too large Load Diff

@ -75,7 +75,7 @@ send_json_stats(struct evhttp_request *req, void *arg)
uint32_t pbf = context->pool_stats->pool_blocks_found;
uint64_t rh = context->pool_stats->round_hashes;
unsigned ss = context->allow_self_select;
uint64_t mh = 0;
double mh[6] = {0};
double mb = 0.0;
hdrs_in = evhttp_request_get_input_headers(req);
@ -86,8 +86,8 @@ send_json_stats(struct evhttp_request *req, void *arg)
if (wa)
{
wa += 3;
mh = miner_hr(wa);
uint64_t balance = miner_balance(wa);
account_hr(mh, wa);
uint64_t balance = account_balance(wa);
mb = (double) balance / 1000000000000.0;
}
}
@ -108,12 +108,17 @@ send_json_stats(struct evhttp_request *req, void *arg)
"\"allow_self_select\":%u,"
"\"connected_miners\":%d,"
"\"miner_hashrate\":%"PRIu64","
"\"miner_hashrate_stats\":["
"%"PRIu64",%"PRIu64",%"PRIu64","
"%"PRIu64",%"PRIu64",%"PRIu64"],"
"\"miner_balance\":%.8f"
"}", ph, rh, nh, nd, height, ltf, lbf, pbf,
context->payment_threshold, context->pool_fee,
context->pool_port, context->pool_ssl_port,
ss, context->pool_stats->connected_miners,
mh, mb);
ss, context->pool_stats->connected_accounts,
(uint64_t)mh[0],
(uint64_t)mh[0], (uint64_t)mh[1], (uint64_t)mh[2],
(uint64_t)mh[3], (uint64_t)mh[4], (uint64_t)mh[5], mb);
hdrs_out = evhttp_request_get_output_headers(req);
evhttp_add_header(hdrs_out, "Content-Type", "application/json");
evhttp_send_reply(req, HTTP_OK, "OK", buf);

@ -40,7 +40,7 @@ typedef struct pool_stats_t
uint64_t network_difficulty;
uint64_t network_hashrate;
uint64_t network_height;
uint32_t connected_miners;
uint32_t connected_accounts;
uint64_t pool_hashrate;
uint64_t round_hashes;
uint32_t pool_blocks_found;

Loading…
Cancel
Save