diff --git a/src/common/threadpool.cpp b/src/common/threadpool.cpp index 51e071577..fb238dca7 100644 --- a/src/common/threadpool.cpp +++ b/src/common/threadpool.cpp @@ -39,11 +39,11 @@ static __thread int depth = 0; namespace tools { -threadpool::threadpool() : running(true), active(0) { +threadpool::threadpool(unsigned int max_threads) : running(true), active(0) { boost::thread::attributes attrs; attrs.set_stack_size(THREAD_STACK_SIZE); - max = tools::get_max_concurrency(); - size_t i = max; + max = max_threads ? max_threads : tools::get_max_concurrency(); + unsigned int i = max; while(i--) { threads.push_back(boost::thread(attrs, boost::bind(&threadpool::run, this))); } @@ -78,7 +78,7 @@ void threadpool::submit(waiter *obj, std::function f) { } } -int threadpool::get_max_concurrency() { +unsigned int threadpool::get_max_concurrency() const { return max; } diff --git a/src/common/threadpool.h b/src/common/threadpool.h index 34152541c..bf80a87f6 100644 --- a/src/common/threadpool.h +++ b/src/common/threadpool.h @@ -46,6 +46,9 @@ public: static threadpool instance; return instance; } + static threadpool *getNewForUnitTests(unsigned max_threads = 0) { + return new threadpool(max_threads); + } // The waiter lets the caller know when all of its // tasks are completed. @@ -66,11 +69,12 @@ public: // task to finish. void submit(waiter *waiter, std::function f); - int get_max_concurrency(); + unsigned int get_max_concurrency() const; + + ~threadpool(); private: - threadpool(); - ~threadpool(); + threadpool(unsigned int max_threads = 0); typedef struct entry { waiter *wo; std::function f; @@ -79,8 +83,8 @@ public: boost::condition_variable has_work; boost::mutex mutex; std::vector threads; - int active; - int max; + unsigned int active; + unsigned int max; bool running; void run(); }; diff --git a/src/cryptonote_core/cryptonote_core.cpp b/src/cryptonote_core/cryptonote_core.cpp index 7d34415c4..cd1b015ea 100644 --- a/src/cryptonote_core/cryptonote_core.cpp +++ b/src/cryptonote_core/cryptonote_core.cpp @@ -170,7 +170,6 @@ namespace cryptonote m_last_dns_checkpoints_update(0), m_last_json_checkpoints_update(0), m_disable_dns_checkpoints(false), - m_threadpool(tools::threadpool::getInstance()), m_update_download(0), m_nettype(UNDEFINED) { @@ -681,10 +680,11 @@ namespace cryptonote std::vector results(tx_blobs.size()); tvc.resize(tx_blobs.size()); + tools::threadpool& tpool = tools::threadpool::getInstance(); tools::threadpool::waiter waiter; std::list::const_iterator it = tx_blobs.begin(); for (size_t i = 0; i < tx_blobs.size(); i++, ++it) { - m_threadpool.submit(&waiter, [&, i, it] { + tpool.submit(&waiter, [&, i, it] { try { results[i].res = handle_incoming_tx_pre(*it, tvc[i], results[i].tx, results[i].hash, results[i].prefix_hash, keeped_by_block, relayed, do_not_relay); @@ -711,7 +711,7 @@ namespace cryptonote } else { - m_threadpool.submit(&waiter, [&, i, it] { + tpool.submit(&waiter, [&, i, it] { try { results[i].res = handle_incoming_tx_post(*it, tvc[i], results[i].tx, results[i].hash, results[i].prefix_hash, keeped_by_block, relayed, do_not_relay); diff --git a/src/cryptonote_core/cryptonote_core.h b/src/cryptonote_core/cryptonote_core.h index 567966d48..91bd50729 100644 --- a/src/cryptonote_core/cryptonote_core.h +++ b/src/cryptonote_core/cryptonote_core.h @@ -39,7 +39,6 @@ #include "cryptonote_protocol/cryptonote_protocol_handler_common.h" #include "storages/portable_storage_template_helper.h" #include "common/download.h" -#include "common/threadpool.h" #include "common/command_line.h" #include "tx_pool.h" #include "blockchain.h" @@ -991,8 +990,6 @@ namespace cryptonote std::unordered_set bad_semantics_txes[2]; boost::mutex bad_semantics_txes_lock; - tools::threadpool& m_threadpool; - enum { UPDATES_DISABLED, UPDATES_NOTIFY, diff --git a/tests/unit_tests/CMakeLists.txt b/tests/unit_tests/CMakeLists.txt index 8cc074bb2..6d79ba74b 100644 --- a/tests/unit_tests/CMakeLists.txt +++ b/tests/unit_tests/CMakeLists.txt @@ -62,6 +62,7 @@ set(unit_tests_sources test_tx_utils.cpp test_peerlist.cpp test_protocol_pack.cpp + threadpool.cpp hardfork.cpp unbound.cpp uri.cpp diff --git a/tests/unit_tests/threadpool.cpp b/tests/unit_tests/threadpool.cpp new file mode 100644 index 000000000..34be1417a --- /dev/null +++ b/tests/unit_tests/threadpool.cpp @@ -0,0 +1,101 @@ +// Copyright (c) 2018, The Monero Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#include +#include "gtest/gtest.h" +#include "misc_language.h" +#include "common/threadpool.h" + +TEST(threadpool, wait_nothing) +{ + std::shared_ptr tpool(tools::threadpool::getNewForUnitTests()); + tools::threadpool::waiter waiter; + waiter.wait(); +} + +TEST(threadpool, wait_waits) +{ + std::shared_ptr tpool(tools::threadpool::getNewForUnitTests()); + tools::threadpool::waiter waiter; + std::atomic b(false); + tpool->submit(&waiter, [&b](){ epee::misc_utils::sleep_no_w(1000); b = true; }); + ASSERT_FALSE(b); + waiter.wait(); + ASSERT_TRUE(b); +} + +TEST(threadpool, one_thread) +{ + std::shared_ptr tpool(tools::threadpool::getNewForUnitTests(1)); + tools::threadpool::waiter waiter; + + std::atomic counter(0); + for (size_t n = 0; n < 4096; ++n) + { + tpool->submit(&waiter, [&counter](){++counter;}); + } + waiter.wait(); + ASSERT_EQ(counter, 4096); +} + +TEST(threadpool, many_threads) +{ + std::shared_ptr tpool(tools::threadpool::getNewForUnitTests(256)); + tools::threadpool::waiter waiter; + + std::atomic counter(0); + for (size_t n = 0; n < 4096; ++n) + { + tpool->submit(&waiter, [&counter](){++counter;}); + } + waiter.wait(); + ASSERT_EQ(counter, 4096); +} + +static uint64_t fibonacci(std::shared_ptr tpool, uint64_t n) +{ + if (n <= 1) + return n; + uint64_t f1, f2; + tools::threadpool::waiter waiter; + tpool->submit(&waiter, [&tpool, &f1, n](){ f1 = fibonacci(tpool, n-1); }); + tpool->submit(&waiter, [&tpool, &f2, n](){ f2 = fibonacci(tpool, n-2); }); + waiter.wait(); + return f1 + f2; +} + +TEST(threadpool, reentrency) +{ + std::shared_ptr tpool(tools::threadpool::getNewForUnitTests(4)); + tools::threadpool::waiter waiter; + + uint64_t f = fibonacci(tpool, 13); + waiter.wait(); + ASSERT_EQ(f, 233); +} +