From daf01452ffaa0fac56d5cbda22c6d6b24a794fd3 Mon Sep 17 00:00:00 2001 From: Jethro Grassie Date: Sun, 5 May 2019 16:58:08 -0400 Subject: [PATCH] batch transfers Use `transfer_split` instead of `transfer`. The downside is we can no longer store the tx hash in payments table. This is because we have no way of knowing which returned tx hash contains the destination for a payment. Wallet stores tx hash anyway, so not an issue. --- src/pool.c | 138 ++++++++++++++++++++++++++++++++++++----------------- src/util.c | 9 ++++ src/util.h | 1 + 3 files changed, 104 insertions(+), 44 deletions(-) diff --git a/src/pool.c b/src/pool.c index bdd130c..d762c7a 100644 --- a/src/pool.c +++ b/src/pool.c @@ -196,9 +196,9 @@ typedef struct block_t typedef struct payment_t { - char tx_hash[64]; uint64_t amount; time_t timestamp; + char address[ADDRESS_MAX]; } payment_t; typedef struct rpc_callback_t @@ -783,6 +783,14 @@ send_payments() return rc; } + size_t payments_count = 0; + size_t payments_max_count = 25; + size_t payments_size = payments_max_count * sizeof(payment_t); + payment_t *payments = (payment_t*) calloc(1, payments_size); + memset(payments, 0, payments_size); + payment_t *payment = payments; + payment_t *end_payment = payment + payments_max_count; + MDB_cursor_op op = MDB_FIRST; while (1) { @@ -801,19 +809,51 @@ send_payments() log_info("Sending payment of %"PRIu64" to %s\n", amount, address); - char body[RPC_BODY_MAX]; - snprintf(body, RPC_BODY_MAX, "{\"id\":\"0\",\"jsonrpc\":\"2.0\",\"method\":\"transfer\",\"params\":{" - "\"destinations\":[{\"amount\":%"PRIu64",\"address\":\"%s\"}],\"mixin\":10}}", - amount, address); + memcpy(payment->address, address, ADDRESS_MAX); + payment->amount = amount; + payments_count++; + + if (++payment == end_payment) + { + payments_size <<= 1; + payments = (payment_t*) realloc(payments, payments_size); + payment = payments + payments_max_count; + memset(payment, 0, sizeof(payment_t) * payments_max_count); + payments_max_count <<= 1; + end_payment = payments + payments_max_count; + } + } + mdb_cursor_close(cursor); + mdb_txn_abort(txn); + + if (payments_count) + { + size_t body_size = 160 * payments_count + 128; + char *body = (char*) alloca(body_size); + char *start = body; + char *end = body + body_size; + start = stecpy(start, "{\"id\":\"0\",\"jsonrpc\":\"2.0\",\"method\":\"transfer_split\",\"params\":{" + "\"ring_size\":11,\"destinations\":[", end); + for (size_t i=0; iaddress, end); + start = stecpy(start, "\",\"amount\":", end); + sprintf(start, "%"PRIu64"}", p->amount); + start = body + strlen(body); + if (i != payments_count -1) + start = stecpy(start, ",", end); + else + start = stecpy(start, "]}}", end); + } log_trace(body); rpc_callback_t *callback = calloc(1, sizeof(rpc_callback_t)); - callback->data = calloc(ADDRESS_MAX, sizeof(char)); - memcpy(callback->data, address, ADDRESS_MAX); + callback->data = payments; callback->cb = rpc_on_wallet_transferred; rpc_wallet_request(base, body, callback); } - mdb_cursor_close(cursor); - mdb_txn_abort(txn); + return 0; } @@ -1466,11 +1506,11 @@ rpc_on_block_submitted(const char* data, rpc_callback_t *callback) JSON_GET_OR_WARN(message, error, json_type_string); int ec = json_object_get_int(code); const char *em = json_object_get_string(message); - log_debug("Error (%d) with block submission: %s", ec, em); + log_warn("Error (%d) with block submission: %s", ec, em); } if (status == NULL || strcmp(ss, "OK") != 0) { - log_debug("Error submitting block: %s", ss); + log_warn("Error submitting block: %s", ss); } pool_stats.pool_blocks_found++; block_t *b = (block_t*)callback->data; @@ -1486,7 +1526,6 @@ static void rpc_on_wallet_transferred(const char* data, rpc_callback_t *callback) { log_trace("Transfer response: \n%s", data); - const char* address = callback->data; json_object *root = json_tokener_parse(data); JSON_GET_OR_WARN(result, root, json_type_object); json_object *error = NULL; @@ -1506,6 +1545,8 @@ rpc_on_wallet_transferred(const char* data, rpc_callback_t *callback) char *err; MDB_txn *txn; MDB_cursor *cursor; + + /* First, updated balance(s) */ if ((rc = mdb_txn_begin(env, NULL, 0, &txn)) != 0) { err = mdb_strerror(rc); @@ -1519,38 +1560,43 @@ rpc_on_wallet_transferred(const char* data, rpc_callback_t *callback) mdb_txn_abort(txn); goto cleanup; } - - MDB_cursor_op op = MDB_SET; - MDB_val key = {ADDRESS_MAX, (void*)address}; - MDB_val val; - rc = mdb_cursor_get(cursor, &key, &val, op); - if (rc == MDB_NOTFOUND) + payment_t *payment = (payment_t*) callback->data; + for (; payment->amount; payment++) { - log_error("Payment made to non-existent address"); - mdb_txn_abort(txn); - goto cleanup; + MDB_cursor_op op = MDB_SET; + MDB_val key = {ADDRESS_MAX, (void*)payment->address}; + MDB_val val; + rc = mdb_cursor_get(cursor, &key, &val, op); + if (rc == MDB_NOTFOUND) + { + log_error("Payment made to non-existent address"); + continue; + } + else if (rc != 0 && rc != MDB_NOTFOUND) + { + err = mdb_strerror(rc); + log_error("%s", err); + continue; + } + uint64_t current_amount = *(uint64_t*)val.mv_data; + current_amount -= payment->amount; + MDB_val new_val = {sizeof(current_amount), (void*)¤t_amount}; + rc = mdb_cursor_put(cursor, &key, &new_val, MDB_CURRENT); + if (rc != 0) + { + err = mdb_strerror(rc); + log_error("%s", err); + } } - else if (rc != 0 && rc != MDB_NOTFOUND) + if ((rc = mdb_txn_commit(txn)) != 0) { err = mdb_strerror(rc); - log_error("%s", err); + log_error("Error committing updated balance(s): %s", err); mdb_txn_abort(txn); goto cleanup; } - mdb_cursor_del(cursor, 0); - mdb_txn_commit(txn); /* Now store payment info */ - JSON_GET_OR_WARN(tx_hash, result, json_type_string); - JSON_GET_OR_WARN(amount, result, json_type_int); - const char *ths = json_object_get_string(tx_hash); - uint64_t ai = json_object_get_int64(amount); - time_t now = time(NULL); - payment_t payment; - memcpy(payment.tx_hash, ths, sizeof(payment.tx_hash)); - payment.amount = ai; - payment.timestamp = now; - if ((rc = mdb_txn_begin(env, NULL, 0, &txn)) != 0) { err = mdb_strerror(rc); @@ -1564,16 +1610,19 @@ rpc_on_wallet_transferred(const char* data, rpc_callback_t *callback) mdb_txn_abort(txn); goto cleanup; } - key.mv_data = (void*)address; - key.mv_size = ADDRESS_MAX; - val.mv_data = &payment; - val.mv_size = sizeof(payment); - if ((rc = mdb_cursor_put(cursor, &key, &val, MDB_APPENDDUP)) != 0) + time_t now = time(NULL); + payment = (payment_t*) callback->data; + for (; payment->amount; payment++) { - err = mdb_strerror(rc); - log_error("Error putting payment: %s", err); - mdb_txn_abort(txn); - goto cleanup; + payment->timestamp = now; + MDB_val key = {ADDRESS_MAX, (void*)payment->address}; + MDB_val val = {sizeof(payment_t), payment}; + if ((rc = mdb_cursor_put(cursor, &key, &val, MDB_APPENDDUP)) != 0) + { + err = mdb_strerror(rc); + log_error("Error putting payment: %s", err); + continue; + } } if ((rc = mdb_txn_commit(txn)) != 0) { @@ -1582,6 +1631,7 @@ rpc_on_wallet_transferred(const char* data, rpc_callback_t *callback) mdb_txn_abort(txn); goto cleanup; } + cleanup: json_object_put(root); } diff --git a/src/util.c b/src/util.c index 8dd6b32..3816f92 100644 --- a/src/util.c +++ b/src/util.c @@ -100,3 +100,12 @@ reverse_bin(char *bin, size_t len) } } +char *stecpy(char *dst, const char *src, const char *end) +{ + while (*src && dst < end) + *dst++ = *src++; + if (dst < end) + *dst = '\0'; + return dst; +} + diff --git a/src/util.h b/src/util.h index 858e52a..efc7030 100644 --- a/src/util.h +++ b/src/util.h @@ -37,5 +37,6 @@ int is_hex_string(const char *str); void hex_to_bin(const char *hex, char *bin, size_t bin_size); void bin_to_hex(const char *bin, size_t bin_size, char *hex); void reverse_bin(char *bin, size_t len); +char *stecpy(char *dst, const char *src, const char *end); #endif