PIVX Core  5.6.99
P2P Digital Currency
checkqueue.h
Go to the documentation of this file.
1 // Copyright (c) 2012-2014 The Bitcoin developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #ifndef PIVX_CHECKQUEUE_H
6 #define PIVX_CHECKQUEUE_H
7 
8 #include <algorithm>
9 #include <vector>
10 
11 #include <boost/thread/condition_variable.hpp>
12 #include <boost/thread/mutex.hpp>
13 
14 template <typename T>
15 class CCheckQueueControl;
16 
27 template <typename T>
29 {
30 private:
32  boost::mutex mutex;
33 
35  boost::condition_variable condWorker;
36 
38  boost::condition_variable condMaster;
39 
42  std::vector<T> queue;
43 
45  int nIdle;
46 
48  int nTotal;
49 
51  bool fAllOk;
52 
58  unsigned int nTodo;
59 
61  unsigned int nBatchSize;
62 
64  bool Loop(bool fMaster = false)
65  {
66  boost::condition_variable& cond = fMaster ? condMaster : condWorker;
67  std::vector<T> vChecks;
68  vChecks.reserve(nBatchSize);
69  unsigned int nNow = 0;
70  bool fOk = true;
71  do {
72  {
73  boost::unique_lock<boost::mutex> lock(mutex);
74  // first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
75  if (nNow) {
76  fAllOk &= fOk;
77  nTodo -= nNow;
78  if (nTodo == 0 && !fMaster)
79  // We processed the last element; inform the master he can exit and return the result
80  condMaster.notify_one();
81  } else {
82  // first iteration
83  nTotal++;
84  }
85  // logically, the do loop starts here
86  while (queue.empty()) {
87  if (fMaster && nTodo == 0) {
88  nTotal--;
89  bool fRet = fAllOk;
90  // reset the status for new work later
91  if (fMaster)
92  fAllOk = true;
93  // return the current status
94  return fRet;
95  }
96  nIdle++;
97  cond.wait(lock); // wait
98  nIdle--;
99  }
100  // Decide how many work units to process now.
101  // * Do not try to do everything at once, but aim for increasingly smaller batches so
102  // all workers finish approximately simultaneously.
103  // * Try to account for idle jobs which will instantly start helping.
104  // * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
105  nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
106  vChecks.resize(nNow);
107  for (unsigned int i = 0; i < nNow; i++) {
108  // We want the lock on the mutex to be as short as possible, so swap jobs from the global
109  // queue to the local batch vector instead of copying.
110  vChecks[i].swap(queue.back());
111  queue.pop_back();
112  }
113  // Check whether we need to do work at all
114  fOk = fAllOk;
115  }
116  // execute work
117  for (T& check : vChecks)
118  if (fOk)
119  fOk = check();
120  vChecks.clear();
121  } while (true);
122  }
123 
124 public:
126  explicit CCheckQueue(unsigned int nBatchSizeIn) : nIdle(0), nTotal(0), fAllOk(true), nTodo(0), nBatchSize(nBatchSizeIn) {}
127 
129  void Thread()
130  {
131  Loop();
132  }
133 
135  bool Wait()
136  {
137  return Loop(true);
138  }
139 
141  void Add(std::vector<T>& vChecks)
142  {
143  boost::unique_lock<boost::mutex> lock(mutex);
144  for (T& check : vChecks) {
145  queue.push_back(T());
146  check.swap(queue.back());
147  }
148  nTodo += vChecks.size();
149  if (vChecks.size() == 1)
150  condWorker.notify_one();
151  else if (vChecks.size() > 1)
152  condWorker.notify_all();
153  }
154 
156  {
157  }
158 
159  bool IsIdle()
160  {
161  boost::unique_lock<boost::mutex> lock(mutex);
162  return (nTotal == nIdle && nTodo == 0 && fAllOk == true);
163  }
164 };
165 
170 template <typename T>
172 {
173 private:
175  bool fDone;
176 
177 public:
178  explicit CCheckQueueControl(CCheckQueue<T>* pqueueIn) : pqueue(pqueueIn), fDone(false)
179  {
180  // passed queue is supposed to be unused, or nullptr
181  if (pqueue != nullptr) {
182  bool isIdle = pqueue->IsIdle();
183  assert(isIdle);
184  }
185  }
186 
187  bool Wait()
188  {
189  if (pqueue == nullptr)
190  return true;
191  bool fRet = pqueue->Wait();
192  fDone = true;
193  return fRet;
194  }
195 
196  void Add(std::vector<T>& vChecks)
197  {
198  if (pqueue != nullptr)
199  pqueue->Add(vChecks);
200  }
201 
203  {
204  if (!fDone)
205  Wait();
206  }
207 };
208 
209 #endif // PIVX_CHECKQUEUE_H
true
Definition: bls_dkg.cpp:153
false
Definition: bls_dkg.cpp:151
RAII-style controller object for a CCheckQueue that guarantees the passed queue is finished before co...
Definition: checkqueue.h:172
CCheckQueue< T > * pqueue
Definition: checkqueue.h:174
void Add(std::vector< T > &vChecks)
Definition: checkqueue.h:196
CCheckQueueControl(CCheckQueue< T > *pqueueIn)
Definition: checkqueue.h:178
Queue for verifications that have to be performed.
Definition: checkqueue.h:29
unsigned int nTodo
Number of verifications that haven't completed yet.
Definition: checkqueue.h:58
bool fAllOk
The temporary evaluation result.
Definition: checkqueue.h:51
int nIdle
The number of workers (including the master) that are idle.
Definition: checkqueue.h:45
bool Loop(bool fMaster=false)
Internal function that does bulk of the verification work.
Definition: checkqueue.h:64
std::vector< T > queue
The queue of elements to be processed.
Definition: checkqueue.h:42
bool IsIdle()
Definition: checkqueue.h:159
bool Wait()
Wait until execution finishes, and return whether all evaluations where successful.
Definition: checkqueue.h:135
boost::condition_variable condMaster
Master thread blocks on this when out of work.
Definition: checkqueue.h:38
int nTotal
The total number of workers (including the master).
Definition: checkqueue.h:48
boost::condition_variable condWorker
Worker threads block on this when out of work.
Definition: checkqueue.h:35
CCheckQueue(unsigned int nBatchSizeIn)
Create a new check queue.
Definition: checkqueue.h:126
void Thread()
Worker thread.
Definition: checkqueue.h:129
unsigned int nBatchSize
The maximum number of elements to be processed in one batch.
Definition: checkqueue.h:61
boost::mutex mutex
Mutex to protect the inner state.
Definition: checkqueue.h:32
void Add(std::vector< T > &vChecks)
Add a batch of checks to the queue.
Definition: checkqueue.h:141
#define T(expected, seed, data)