PIVX Core  5.6.99
P2P Digital Currency
scheduler_tests.cpp
Go to the documentation of this file.
1 // Copyright (c) 2012-2013 The Bitcoin Core developers
2 // Copyright (c) 2017-2020 The PIVX Core developers
3 // Distributed under the MIT software license, see the accompanying
4 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
5 
6 #include "random.h"
7 #include "scheduler.h"
8 #include "utiltime.h"
9 #if defined(HAVE_CONFIG_H)
10 #include "config/pivx-config.h"
11 #endif
12 
13 #include <boost/thread.hpp>
14 #include <boost/test/unit_test.hpp>
15 
16 BOOST_AUTO_TEST_SUITE(scheduler_tests)
17 
18 static void microTask(CScheduler& s, boost::mutex& mutex, int& counter, int delta, std::chrono::system_clock::time_point rescheduleTime)
19 {
20  {
21  boost::unique_lock<boost::mutex> lock(mutex);
22  counter += delta;
23  }
24  std::chrono::system_clock::time_point noTime = std::chrono::system_clock::time_point::min();
25  if (rescheduleTime != noTime) {
26  CScheduler::Function f = std::bind(&microTask, std::ref(s), std::ref(mutex), std::ref(counter), -delta + 1, noTime);
27  s.schedule(f, rescheduleTime);
28  }
29 }
30 
32 {
33  // Stress test: hundreds of microsecond-scheduled tasks,
34  // serviced by 10 threads.
35  //
36  // So... ten shared counters, which if all the tasks execute
37  // properly will sum to the number of tasks done.
38  // Each task adds or subtracts from one of the counters a
39  // random amount, and then schedules another task 0-1000
40  // microseconds in the future to subtract or add from
41  // the counter -random_amount+1, so in the end the shared
42  // counters should sum to the number of initial tasks performed.
43  CScheduler microTasks;
44 
45  boost::mutex counterMutex[10];
46  int counter[10] = { 0 };
47  FastRandomContext rng(42);
48  auto zeroToNine = [](FastRandomContext& rc) -> int { return rc.randrange(10); }; // [0, 9]
49  auto randomMsec = [](FastRandomContext& rc) -> int { return -11 + rc.randrange(1012); }; // [-11, 1000]
50  auto randomDelta = [](FastRandomContext& rc) -> int { return -1000 + rc.randrange(2001); }; // [-1000, 1000]
51 
52  std::chrono::system_clock::time_point start = std::chrono::system_clock::now();
55  size_t nTasks = microTasks.getQueueInfo(first, last);
56  BOOST_CHECK(nTasks == 0);
57 
58  for (int i = 0; i < 100; ++i) {
59  std::chrono::system_clock::time_point t = now + std::chrono::microseconds(randomMsec(rng));
60  std::chrono::system_clock::time_point tReschedule = now + std::chrono::microseconds(500 + randomMsec(rng));
61  int whichCounter = zeroToNine(rng);
62  CScheduler::Function f = std::bind(&microTask, std::ref(microTasks),
63  std::ref(counterMutex[whichCounter]), std::ref(counter[whichCounter]),
64  randomDelta(rng), tReschedule);
65  microTasks.schedule(f, t);
66  }
67  nTasks = microTasks.getQueueInfo(first, last);
68  BOOST_CHECK(nTasks == 100);
69  BOOST_CHECK(first < last);
70  BOOST_CHECK(last > now);
71 
72  // As soon as these are created they will start running and servicing the queue
73  boost::thread_group microThreads;
74  for (int i = 0; i < 5; i++)
75  microThreads.create_thread(std::bind(&CScheduler::serviceQueue, &microTasks));
76 
77  UninterruptibleSleep(std::chrono::microseconds{600});
78  now = std::chrono::system_clock::now();
79 
80  // More threads and more tasks:
81  for (int i = 0; i < 5; i++)
82  microThreads.create_thread(std::bind(&CScheduler::serviceQueue, &microTasks));
83  for (int i = 0; i < 100; i++) {
84  std::chrono::system_clock::time_point t = now + std::chrono::microseconds(randomMsec(rng));
85  std::chrono::system_clock::time_point tReschedule = now + std::chrono::microseconds(500 + randomMsec(rng));
86  int whichCounter = zeroToNine(rng);
87  CScheduler::Function f = std::bind(&microTask, std::ref(microTasks),
88  std::ref(counterMutex[whichCounter]), std::ref(counter[whichCounter]),
89  randomDelta(rng), tReschedule);
90  microTasks.schedule(f, t);
91  }
92 
93  // Drain the task queue then exit threads
94  microTasks.stop(true);
95  microThreads.join_all(); // ... wait until all the threads are done
96 
97  int counterSum = 0;
98  for (int i = 0; i < 10; i++) {
99  BOOST_CHECK(counter[i] != 0);
100  counterSum += counter[i];
101  }
102  BOOST_CHECK_EQUAL(counterSum, 200);
103 }
104 
105 BOOST_AUTO_TEST_CASE(singlethreadedscheduler_ordered)
106 {
108 
109  // each queue should be well ordered with respect to itself but not other queues
112 
113  // create more threads than queues
114  // if the queues only permit execution of one task at once then
115  // the extra threads should effectively be doing nothing
116  // if they don't we'll get out of order behaviour
117  boost::thread_group threads;
118  for (int i = 0; i < 5; ++i) {
119  threads.create_thread(boost::bind(&CScheduler::serviceQueue, &scheduler));
120  }
121 
122  // these are not atomic, if SinglethreadedSchedulerClient prevents
123  // parallel execution at the queue level no synchronization should be required here
124  int counter1 = 0;
125  int counter2 = 0;
126 
127  // just simply count up on each queue - if execution is properly ordered then
128  // the callbacks should run in exactly the order in which they were enqueued
129  for (int i = 0; i < 100; ++i) {
130  queue1.AddToProcessQueue([i, &counter1]() {
131  BOOST_CHECK_EQUAL(i, counter1++);
132  });
133 
134  queue2.AddToProcessQueue([i, &counter2]() {
135  BOOST_CHECK_EQUAL(i, counter2++);
136  });
137  }
138 
139  // finish up
140  scheduler.stop(true);
141  threads.join_all();
142 
143  BOOST_CHECK_EQUAL(counter1, 100);
144  BOOST_CHECK_EQUAL(counter2, 100);
145 }
146 
void serviceQueue()
Definition: scheduler.cpp:22
void stop(bool drain=false)
Definition: scheduler.cpp:73
void schedule(Function f, std::chrono::system_clock::time_point t)
Definition: scheduler.cpp:85
std::function< void(void)> Function
Definition: scheduler.h:45
size_t getQueueInfo(std::chrono::system_clock::time_point &first, std::chrono::system_clock::time_point &last) const
Definition: scheduler.cpp:110
Fast randomness source.
Definition: random.h:107
Class used by CScheduler clients which may schedule multiple jobs which are required to be run serial...
Definition: scheduler.h:99
void AddToProcessQueue(std::function< void(void)> func)
Add a callback to be executed.
Definition: scheduler.cpp:168
BOOST_AUTO_TEST_SUITE(cuckoocache_tests)
Test Suite for CuckooCache.
BOOST_AUTO_TEST_SUITE_END()
clock::time_point time_point
Definition: bench.h:48
#define BOOST_CHECK_EQUAL(v1, v2)
Definition: object.cpp:18
#define BOOST_CHECK(expr)
Definition: object.cpp:17
BOOST_AUTO_TEST_CASE(manythreads)
void UninterruptibleSleep(const std::chrono::microseconds &n)
Definition: utiltime.cpp:22
CScheduler scheduler