PIVX Core  5.6.99
P2P Digital Currency
bls_worker.cpp
Go to the documentation of this file.
1 // Copyright (c) 2018 The Dash Core developers
2 // Copyright (c) 2021 The PIVX Core developers
3 // Distributed under the MIT/X11 software license, see the accompanying
4 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
5 
6 #include "bls/bls_worker.h"
7 #include "hash.h"
8 #include "serialize.h"
9 #include "util/system.h"
10 #include "util/threadnames.h"
11 
12 
13 template <typename T>
14 bool VerifyVectorHelper(const std::vector<T>& vec, size_t start, size_t count)
15 {
16  if (start == 0 && count == 0) {
17  count = vec.size();
18  }
19  std::set<uint256> set;
20  for (size_t i = start; i < start + count; i++) {
21  if (!vec[i].IsValid())
22  return false;
23  // check duplicates
24  if (!set.emplace(vec[i].GetHash()).second) {
25  return false;
26  }
27  }
28  return true;
29 }
30 
31 // Creates a doneCallback and a future. The doneCallback simply finishes the future
32 template <typename T>
33 std::pair<std::function<void(const T&)>, std::future<T> > BuildFutureDoneCallback()
34 {
35  auto p = std::make_shared<std::promise<T> >();
36  std::function<void(const T&)> f = [p](const T& v) {
37  p->set_value(v);
38  };
39  return std::make_pair(std::move(f), p->get_future());
40 }
41 template <typename T>
42 std::pair<std::function<void(T)>, std::future<T> > BuildFutureDoneCallback2()
43 {
44  auto p = std::make_shared<std::promise<T> >();
45  std::function<void(const T&)> f = [p](T v) {
46  p->set_value(v);
47  };
48  return std::make_pair(std::move(f), p->get_future());
49 }
50 
51 
53 
55 {
56 }
57 
59 {
60  Stop();
61 }
62 
64 {
65  int workerCount = GetNumCores() / 2;
66  workerCount = std::max(std::min(1, workerCount), 4);
67  workerPool.resize(workerCount);
68  RenameThreadPool(workerPool, "pivx-bls-work");
69 }
70 
72 {
74  workerPool.stop(true);
75 }
76 
77 bool CBLSWorker::GenerateContributions(int quorumThreshold, const BLSIdVector& ids, BLSVerificationVectorPtr& vvecRet, BLSSecretKeyVector& skShares)
78 {
79  BLSSecretKeyVectorPtr svec = std::make_shared<BLSSecretKeyVector>((size_t)quorumThreshold);
80  vvecRet = std::make_shared<BLSVerificationVector>((size_t)quorumThreshold);
81  skShares.resize(ids.size());
82 
83  for (int i = 0; i < quorumThreshold; i++) {
84  (*svec)[i].MakeNewKey();
85  }
86  std::list<std::future<bool> > futures;
87  size_t batchSize = 8;
88 
89  for (size_t i = 0; i < (size_t)quorumThreshold; i += batchSize) {
90  size_t start = i;
91  size_t count = std::min(batchSize, quorumThreshold - start);
92  auto f = [&, start, count](int threadId) {
93  for (size_t j = start; j < start + count; j++) {
94  (*vvecRet)[j] = (*svec)[j].GetPublicKey();
95  }
96  return true;
97  };
98  futures.emplace_back(workerPool.push(f));
99  }
100 
101  for (size_t i = 0; i < ids.size(); i += batchSize) {
102  size_t start = i;
103  size_t count = std::min(batchSize, ids.size() - start);
104  auto f = [&, start, count](int threadId) {
105  for (size_t j = start; j < start + count; j++) {
106  if (!skShares[j].SecretKeyShare(*svec, ids[j])) {
107  return false;
108  }
109  }
110  return true;
111  };
112  futures.emplace_back(workerPool.push(f));
113  }
114  bool success = true;
115  for (auto& f : futures) {
116  if (!f.get()) {
117  success = false;
118  }
119  }
120  return success;
121 }
122 
123 // aggregates a single vector of BLS objects in parallel
124 // the input vector is split into batches and each batch is aggregated in parallel
125 // when enough batches are finished to form a new batch, the new batch is queued for further parallel aggregation
126 // when no more batches can be created from finished batch results, the final aggregated is created and the doneCallback
127 // called.
128 // The Aggregator object needs to be created on the heap and it will delete itself after calling the doneCallback
129 // The input vector is not copied into the Aggregator but instead a vector of pointers to the original entries from the
130 // input vector is stored. This means that the input vector must stay alive for the whole lifetime of the Aggregator
131 template <typename T>
132 struct Aggregator : public std::enable_shared_from_this<Aggregator<T>> {
133  typedef T ElementType;
134 
135  size_t batchSize{16};
137  bool parallel;
138 
139  std::shared_ptr<std::vector<const T*> > inputVec;
140 
141  std::mutex m;
142  // items in the queue are all intermediate aggregation results of finished batches.
143  // The intermediate results must be deleted by us again (which we do in SyncAggregateAndPushAggQueue)
145  std::atomic<size_t> aggQueueSize{0};
146 
147  typedef std::function<void(const T& agg)> DoneCallback;
149 
150  // keeps track of currently queued/in-progress batches. If it reaches 0, we are done
151  std::atomic<size_t> waitCount{0};
152 
153  // TP can either be a pointer or a reference
154  template <typename TP>
155  Aggregator(const std::vector<TP>& _inputVec,
156  size_t start, size_t count,
157  bool _parallel,
158  ctpl::thread_pool& _workerPool,
159  DoneCallback _doneCallback) :
160  workerPool(_workerPool),
161  parallel(_parallel),
162  doneCallback(std::move(_doneCallback))
163  {
164  inputVec = std::make_shared<std::vector<const T*> >(count);
165  for (size_t i = 0; i < count; i++) {
166  (*inputVec)[i] = pointer(_inputVec[start + i]);
167  }
168  }
169 
170  const T* pointer(const T& v) { return &v; }
171  const T* pointer(const T* v) { return v; }
172 
173  // Starts aggregation.
174  // If parallel=true, then this will return fast, otherwise this will block until aggregation is done
175  void Start()
176  {
177  size_t batchCount = (inputVec->size() + batchSize - 1) / batchSize;
178 
179  if (!parallel) {
180  if (inputVec->size() == 1) {
181  doneCallback(*(*inputVec)[0]);
182  } else {
184  }
185  return;
186  }
187 
188  if (batchCount == 1) {
189  // just a single batch of work, take a shortcut.
190  auto self(this->shared_from_this());
191  PushWork([self](int threadId) {
192  size_t vecSize = self->inputVec->size();
193  if (vecSize == 1) {
194  self->doneCallback(*(*self->inputVec)[0]);
195  } else {
196  self->doneCallback(self->SyncAggregate(*self->inputVec, 0, vecSize));
197  }
198  });
199  return;
200  }
201 
202  // increment wait counter as otherwise the first finished async aggregation might signal that we're done
203  IncWait();
204  for (size_t i = 0; i < batchCount; i++) {
205  size_t start = i * batchSize;
206  size_t count = std::min(batchSize, inputVec->size() - start);
207  AsyncAggregateAndPushAggQueue(inputVec, start, count, false);
208  }
209  // this will decrement the wait counter and in most cases NOT finish, as async work is still in progress
210  CheckDone();
211  }
212 
213  void IncWait()
214  {
215  ++waitCount;
216  }
217 
218  void CheckDone()
219  {
220  if (--waitCount == 0) {
221  Finish();
222  }
223  }
224 
225  void Finish()
226  {
227  // All async work is done, but we might have items in the aggQueue which are the results of the async
228  // work. This is the case when these did not add up to a new batch. In this case, we have to aggregate
229  // the items into the final result
230 
231  std::vector<T*> rem(aggQueueSize);
232  for (size_t i = 0; i < rem.size(); i++) {
233  T* p = nullptr;
234  bool s = aggQueue.pop(p);
235  assert(s);
236  rem[i] = p;
237  }
238 
239  T r;
240  if (rem.size() == 1) {
241  // just one intermediate result, which is actually the final result
242  r = *rem[0];
243  } else {
244  // multiple intermediate results left which did not add up to a new batch. aggregate them now
245  r = SyncAggregate(rem, 0, rem.size());
246  }
247 
248  // all items which are left in the queue are intermediate results, so we must delete them
249  for (size_t i = 0; i < rem.size(); i++) {
250  delete rem[i];
251  }
252  doneCallback(r);
253  }
254 
255  void AsyncAggregateAndPushAggQueue(const std::shared_ptr<std::vector<const T*>>& vec, size_t start, size_t count, bool del)
256  {
257  IncWait();
258  auto self(this->shared_from_this());
259  PushWork([self, vec, start, count, del](int threadId){
260  self->SyncAggregateAndPushAggQueue(vec, start, count, del);
261  });
262  }
263 
264  void SyncAggregateAndPushAggQueue(const std::shared_ptr<std::vector<const T*> >& vec, size_t start, size_t count, bool del)
265  {
266  // aggregate vec and push the intermediate result onto the work queue
267  PushAggQueue(SyncAggregate(*vec, start, count));
268  if (del) {
269  for (size_t i = 0; i < count; i++) {
270  delete (*vec)[start + i];
271  }
272  }
273  CheckDone();
274  }
275 
276  void PushAggQueue(const T& v)
277  {
278  auto copyT = new T(v);
279  try {
280  aggQueue.push(copyT);
281  } catch (...) {
282  delete copyT;
283  throw;
284  }
285 
286  if (++aggQueueSize >= batchSize) {
287  // we've collected enough intermediate results to form a new batch.
288  std::shared_ptr<std::vector<const T*> > newBatch;
289  {
290  std::unique_lock<std::mutex> l(m);
291  if (aggQueueSize < batchSize) {
292  // some other worker thread grabbed this batch
293  return;
294  }
295  newBatch = std::make_shared<std::vector<const T*> >(batchSize);
296  // collect items for new batch
297  for (size_t i = 0; i < batchSize; i++) {
298  T* p = nullptr;
299  bool s = aggQueue.pop(p);
300  assert(s);
301  (*newBatch)[i] = p;
302  }
304  }
305 
306  // push new batch to work queue. del=true this time as these items are intermediate results and need to be deleted
307  // after aggregation is done
308  AsyncAggregateAndPushAggQueue(newBatch, 0, newBatch->size(), true);
309  }
310  }
311 
312  template <typename TP>
313  T SyncAggregate(const std::vector<TP>& vec, size_t start, size_t count)
314  {
315  T result = *vec[start];
316  for (size_t j = 1; j < count; j++) {
317  result.AggregateInsecure(*vec[start + j]);
318  }
319  return result;
320  }
321 
322  template <typename Callable>
323  void PushWork(Callable&& f)
324  {
325  workerPool.push(f);
326  }
327 };
328 
329 // Aggregates multiple input vectors into a single output vector
330 // Inputs are in the following form:
331 // [
332 // [a1, b1, c1, d1],
333 // [a2, b2, c2, d2],
334 // [a3, b3, c3, d3],
335 // [a4, b4, c4, d4],
336 // ]
337 // The result is in the following form:
338 // [ a1+a2+a3+a4, b1+b2+b3+b4, c1+c2+c3+c4, d1+d2+d3+d4]
339 // Same rules for the input vectors apply to the VectorAggregator as for the Aggregator (they must stay alive)
340 template <typename T>
341 struct VectorAggregator : public std::enable_shared_from_this<VectorAggregator<T>> {
343  typedef std::vector<T> VectorType;
344  typedef std::shared_ptr<VectorType> VectorPtrType;
345  typedef std::vector<VectorPtrType> VectorVectorType;
346  typedef std::function<void(const VectorPtrType& agg)> DoneCallback;
347 
349  bool parallel;
350  size_t start;
351  size_t count;
352 
354 
356  std::atomic<size_t> doneCount;
357 
359  size_t vecSize;
360 
362  size_t _start, size_t _count,
363  bool _parallel, ctpl::thread_pool& _workerPool,
364  DoneCallback _doneCallback) :
365  vecs(_vecs),
366  parallel(_parallel),
367  start(_start),
368  count(_count),
369  workerPool(_workerPool),
370  doneCallback(std::move(_doneCallback))
371  {
372  assert(!vecs.empty());
373  vecSize = vecs[0]->size();
374  result = std::make_shared<VectorType>(vecSize);
375  doneCount = 0;
376  }
377 
378  void Start()
379  {
380  for (size_t i = 0; i < vecSize; i++) {
381  std::vector<const T*> tmp(count);
382  for (size_t j = 0; j < count; j++) {
383  tmp[j] = &(*vecs[start + j])[i];
384  }
385 
386  auto self(this->shared_from_this());
387  auto aggregator = std::make_shared<AggregatorType>(std::move(tmp), 0, count, parallel, workerPool, [self, i](const T& agg) {self->CheckDone(agg, i);});
388  aggregator->Start();
389  }
390  }
391 
392  void CheckDone(const T& agg, size_t idx)
393  {
394  (*result)[idx] = agg;
395  if (++doneCount == vecSize) {
397  }
398  }
399 };
400 
401 // See comment of AsyncVerifyContributionShares for a description on what this does
402 // Same rules as in Aggregator apply for the inputs
403 struct ContributionVerifier : public std::enable_shared_from_this<ContributionVerifier> {
404  struct BatchState {
405  size_t start;
406  size_t count;
407 
410 
411  // starts with 0 and is incremented if either vvec or skShare aggregation finishes. If it reaches 2, we know
412  // that aggregation for this batch is fully done. We can then start verification.
413  std::unique_ptr<std::atomic<int> > aggDone;
414 
415  // we can't directly update a vector<bool> in parallel
416  // as vector<bool> is not thread safe (uses bitsets internally)
417  // so we must use vector<char> temporarily and concatenate/convert
418  // each batch result into a final vector<bool>
419  std::vector<char> verifyResults;
420  };
421 
423  const std::vector<BLSVerificationVectorPtr>& vvecs;
425  size_t batchSize;
426  bool parallel;
428 
430 
431  size_t batchCount;
432  size_t verifyCount;
433 
434  std::vector<BatchState> batchStates;
435  std::atomic<size_t> verifyDoneCount{0};
436  std::function<void(const std::vector<bool>&)> doneCallback;
437 
438  ContributionVerifier(const CBLSId& _forId, const std::vector<BLSVerificationVectorPtr>& _vvecs,
439  const BLSSecretKeyVector& _skShares, size_t _batchSize,
440  bool _parallel, bool _aggregated, ctpl::thread_pool& _workerPool,
441  std::function<void(const std::vector<bool>&)> _doneCallback) :
442  forId(_forId),
443  vvecs(_vvecs),
444  skShares(_skShares),
445  batchSize(_batchSize),
446  parallel(_parallel),
447  aggregated(_aggregated),
448  workerPool(_workerPool),
449  doneCallback(std::move(_doneCallback))
450  {
451  }
452 
453  void Start()
454  {
455  if (!aggregated) {
456  // treat all inputs as one large batch
457  batchSize = vvecs.size();
458  batchCount = 1;
459  } else {
460  batchCount = (vvecs.size() + batchSize - 1) / batchSize;
461  }
462  verifyCount = vvecs.size();
463 
464  batchStates.resize(batchCount);
465  for (size_t i = 0; i < batchCount; i++) {
466  auto& batchState = batchStates[i];
467 
468  batchState.aggDone.reset(new std::atomic<int>(0));
469  batchState.start = i * batchSize;
470  batchState.count = std::min(batchSize, vvecs.size() - batchState.start);
471  batchState.verifyResults.assign(batchState.count, 0);
472  }
473 
474  if (aggregated) {
475  size_t batchCount2 = batchCount; // 'this' might get deleted while we're still looping
476  for (size_t i = 0; i < batchCount2; i++) {
477  AsyncAggregate(i);
478  }
479  } else {
480  // treat all inputs as a single batch and verify one-by-one
482  }
483  }
484 
485  void Finish()
486  {
487  size_t batchIdx = 0;
488  std::vector<bool> result(vvecs.size());
489  for (size_t i = 0; i < vvecs.size(); i += batchSize) {
490  auto& batchState = batchStates[batchIdx++];
491  for (size_t j = 0; j < batchState.count; j++) {
492  result[batchState.start + j] = batchState.verifyResults[j] != 0;
493  }
494  }
495  doneCallback(result);
496  }
497 
498  void AsyncAggregate(size_t batchIdx)
499  {
500  auto& batchState = batchStates[batchIdx];
501 
502  // aggregate vvecs and skShares of batch in parallel
503  auto self(this->shared_from_this());
504  auto vvecAgg = std::make_shared<VectorAggregator<CBLSPublicKey>>(vvecs, batchState.start, batchState.count, parallel, workerPool, [self, batchIdx] (const BLSVerificationVectorPtr& vvec) {self->HandleAggVvecDone(batchIdx, vvec);});
505  auto skShareAgg = std::make_shared<Aggregator<CBLSSecretKey>>(skShares, batchState.start, batchState.count, parallel, workerPool, [self, batchIdx] (const CBLSSecretKey& skShare) {self->HandleAggSkShareDone(batchIdx, skShare);});
506 
507  vvecAgg->Start();
508  skShareAgg->Start();
509  }
510 
511  void HandleAggVvecDone(size_t batchIdx, const BLSVerificationVectorPtr& vvec)
512  {
513  auto& batchState = batchStates[batchIdx];
514  batchState.vvec = vvec;
515  if (++(*batchState.aggDone) == 2) {
516  HandleAggDone(batchIdx);
517  }
518  }
519  void HandleAggSkShareDone(size_t batchIdx, const CBLSSecretKey& skShare)
520  {
521  auto& batchState = batchStates[batchIdx];
522  batchState.skShare = skShare;
523  if (++(*batchState.aggDone) == 2) {
524  HandleAggDone(batchIdx);
525  }
526  }
527 
528  void HandleVerifyDone(size_t batchIdx, size_t count)
529  {
530  size_t c = verifyDoneCount += count;
531  if (c == verifyCount) {
532  Finish();
533  }
534  }
535 
536  void HandleAggDone(size_t batchIdx)
537  {
538  auto& batchState = batchStates[batchIdx];
539 
540  if (batchState.vvec == nullptr || batchState.vvec->empty() || !batchState.skShare.IsValid()) {
541  // something went wrong while aggregating and there is nothing we can do now except mark the whole batch as failed
542  // this can only happen if inputs were invalid in some way
543  batchState.verifyResults.assign(batchState.count, 0);
544  HandleVerifyDone(batchIdx, batchState.count);
545  return;
546  }
547 
548  AsyncAggregatedVerifyBatch(batchIdx);
549  }
550 
551  void AsyncAggregatedVerifyBatch(size_t batchIdx)
552  {
553  auto self(shared_from_this());
554  auto f = [self, batchIdx](int threadId) {
555  auto& batchState = self->batchStates[batchIdx];
556  bool result = self->Verify(batchState.vvec, batchState.skShare);
557  if (result) {
558  // whole batch is valid
559  batchState.verifyResults.assign(batchState.count, 1);
560  self->HandleVerifyDone(batchIdx, batchState.count);
561  } else {
562  // at least one entry in the batch is invalid, revert to per-contribution verification (but parallelized)
563  self->AsyncVerifyBatchOneByOne(batchIdx);
564  }
565  };
566  PushOrDoWork(std::move(f));
567  }
568 
569  void AsyncVerifyBatchOneByOne(size_t batchIdx)
570  {
571  size_t count = batchStates[batchIdx].count;
572  batchStates[batchIdx].verifyResults.assign(count, 0);
573  for (size_t i = 0; i < count; i++) {
574  auto self(this->shared_from_this());
575  PushOrDoWork([self, i, batchIdx](int threadId) {
576  auto& batchState = self->batchStates[batchIdx];
577  batchState.verifyResults[i] = self->Verify(self->vvecs[batchState.start + i], self->skShares[batchState.start + i]);
578  self->HandleVerifyDone(batchIdx, 1);
579  });
580  }
581  }
582 
583  bool Verify(const BLSVerificationVectorPtr& vvec, const CBLSSecretKey& skShare)
584  {
585  CBLSPublicKey pk1;
586  if (!pk1.PublicKeyShare(*vvec, forId)) {
587  return false;
588  }
589 
590  CBLSPublicKey pk2 = skShare.GetPublicKey();
591  return pk1 == pk2;
592  }
593 
594  template <typename Callable>
595  void PushOrDoWork(Callable&& f)
596  {
597  if (parallel) {
598  workerPool.push(std::move(f));
599  } else {
600  f(0);
601  }
602  }
603 };
604 
605 void CBLSWorker::AsyncBuildQuorumVerificationVector(const std::vector<BLSVerificationVectorPtr>& vvecs,
606  size_t start, size_t count, bool parallel,
607  std::function<void(const BLSVerificationVectorPtr&)> doneCallback)
608 {
609  if (start == 0 && count == 0) {
610  count = vvecs.size();
611  }
612  if (vvecs.empty() || count == 0 || start > vvecs.size() || start + count > vvecs.size()) {
613  doneCallback(nullptr);
614  return;
615  }
616  if (!VerifyVerificationVectors(vvecs, start, count)) {
617  doneCallback(nullptr);
618  return;
619  }
620 
621  auto agg = std::make_shared<VectorAggregator<CBLSPublicKey>>(vvecs, start, count, parallel, workerPool, std::move(doneCallback));
622  agg->Start();
623 }
624 
625 std::future<BLSVerificationVectorPtr> CBLSWorker::AsyncBuildQuorumVerificationVector(const std::vector<BLSVerificationVectorPtr>& vvecs,
626  size_t start, size_t count, bool parallel)
627 {
628  auto p = BuildFutureDoneCallback<BLSVerificationVectorPtr>();
629  AsyncBuildQuorumVerificationVector(vvecs, start, count, parallel, std::move(p.first));
630  return std::move(p.second);
631 }
632 
633 BLSVerificationVectorPtr CBLSWorker::BuildQuorumVerificationVector(const std::vector<BLSVerificationVectorPtr>& vvecs,
634  size_t start, size_t count, bool parallel)
635 {
636  return AsyncBuildQuorumVerificationVector(vvecs, start, count, parallel).get();
637 }
638 
639 template <typename T>
641  const std::vector<T>& vec, size_t start, size_t count, bool parallel,
642  std::function<void(const T&)> doneCallback)
643 {
644  if (start == 0 && count == 0) {
645  count = vec.size();
646  }
647  if (vec.empty() || count == 0 || start > vec.size() || start + count > vec.size()) {
648  doneCallback(T());
649  return;
650  }
651  if (!VerifyVectorHelper(vec, start, count)) {
652  doneCallback(T());
653  return;
654  }
655 
656  auto agg = std::make_shared<Aggregator<T>>(vec, start, count, parallel, workerPool, std::move(doneCallback));
657  agg->Start();
658 }
659 
661  size_t start, size_t count, bool parallel,
662  std::function<void(const CBLSSecretKey&)> doneCallback)
663 {
664  AsyncAggregateHelper(workerPool, secKeys, start, count, parallel, doneCallback);
665 }
666 
667 std::future<CBLSSecretKey> CBLSWorker::AsyncAggregateSecretKeys(const BLSSecretKeyVector& secKeys,
668  size_t start, size_t count, bool parallel)
669 {
670  auto p = BuildFutureDoneCallback<CBLSSecretKey>();
671  AsyncAggregateSecretKeys(secKeys, start, count, parallel, std::move(p.first));
672  return std::move(p.second);
673 }
674 
676  size_t start, size_t count, bool parallel)
677 {
678  return AsyncAggregateSecretKeys(secKeys, start, count, parallel).get();
679 }
680 
682  size_t start, size_t count, bool parallel,
683  std::function<void(const CBLSPublicKey&)> doneCallback)
684 {
685  AsyncAggregateHelper(workerPool, pubKeys, start, count, parallel, doneCallback);
686 }
687 
688 std::future<CBLSPublicKey> CBLSWorker::AsyncAggregatePublicKeys(const BLSPublicKeyVector& pubKeys,
689  size_t start, size_t count, bool parallel)
690 {
691  auto p = BuildFutureDoneCallback<CBLSPublicKey>();
692  AsyncAggregatePublicKeys(pubKeys, start, count, parallel, std::move(p.first));
693  return std::move(p.second);
694 }
695 
697  size_t start, size_t count, bool parallel)
698 {
699  return AsyncAggregatePublicKeys(pubKeys, start, count, parallel).get();
700 }
701 
703  size_t start, size_t count, bool parallel,
704  std::function<void(const CBLSSignature&)> doneCallback)
705 {
706  AsyncAggregateHelper(workerPool, sigs, start, count, parallel, doneCallback);
707 }
708 
709 std::future<CBLSSignature> CBLSWorker::AsyncAggregateSigs(const BLSSignatureVector& sigs,
710  size_t start, size_t count, bool parallel)
711 {
712  auto p = BuildFutureDoneCallback<CBLSSignature>();
713  AsyncAggregateSigs(sigs, start, count, parallel, std::move(p.first));
714  return std::move(p.second);
715 }
716 
718  size_t start, size_t count, bool parallel)
719 {
720  return AsyncAggregateSigs(sigs, start, count, parallel).get();
721 }
722 
723 
725 {
726  CBLSPublicKey pkShare;
727  pkShare.PublicKeyShare(*vvec, id);
728  return pkShare;
729 }
730 
731 void CBLSWorker::AsyncVerifyContributionShares(const CBLSId& forId, const std::vector<BLSVerificationVectorPtr>& vvecs, const BLSSecretKeyVector& skShares,
732  bool parallel, bool aggregated, std::function<void(const std::vector<bool>&)> doneCallback)
733 {
734  if (!forId.IsValid() || !VerifyVerificationVectors(vvecs)) {
735  std::vector<bool> result;
736  result.assign(vvecs.size(), false);
737  doneCallback(result);
738  return;
739  }
740 
741  auto verifier = std::make_shared<ContributionVerifier>(forId, vvecs, skShares, 8, parallel, aggregated, workerPool, std::move(doneCallback));
742  verifier->Start();
743 }
744 
745 std::future<std::vector<bool> > CBLSWorker::AsyncVerifyContributionShares(const CBLSId& forId, const std::vector<BLSVerificationVectorPtr>& vvecs, const BLSSecretKeyVector& skShares,
746  bool parallel, bool aggregated)
747 {
748  auto p = BuildFutureDoneCallback<std::vector<bool> >();
749  AsyncVerifyContributionShares(forId, vvecs, skShares, parallel, aggregated, std::move(p.first));
750  return std::move(p.second);
751 }
752 
753 std::vector<bool> CBLSWorker::VerifyContributionShares(const CBLSId& forId, const std::vector<BLSVerificationVectorPtr>& vvecs, const BLSSecretKeyVector& skShares,
754  bool parallel, bool aggregated)
755 {
756  return AsyncVerifyContributionShares(forId, vvecs, skShares, parallel, aggregated).get();
757 }
758 
759 std::future<bool> CBLSWorker::AsyncVerifyContributionShare(const CBLSId& forId,
760  const BLSVerificationVectorPtr& vvec,
761  const CBLSSecretKey& skContribution)
762 {
763  if (!forId.IsValid() || !VerifyVerificationVector(*vvec)) {
764  auto p = BuildFutureDoneCallback<bool>();
765  p.first(false);
766  return std::move(p.second);
767  }
768 
769  auto f = [this, &forId, &vvec, &skContribution](int threadId) {
770  return VerifyContributionShare(forId, vvec, skContribution);
771  };
772  return workerPool.push(f);
773 }
774 
776  const CBLSSecretKey& skContribution)
777 {
778  CBLSPublicKey pk1;
779  if (!pk1.PublicKeyShare(*vvec, forId)) {
780  return false;
781  }
782 
783  CBLSPublicKey pk2 = skContribution.GetPublicKey();
784  return pk1 == pk2;
785 }
786 
787 bool CBLSWorker::VerifyVerificationVector(const BLSVerificationVector& vvec, size_t start, size_t count)
788 {
789  return VerifyVectorHelper(vvec, start, count);
790 }
791 
792 bool CBLSWorker::VerifyVerificationVectors(const std::vector<BLSVerificationVectorPtr>& vvecs,
793  size_t start, size_t count)
794 {
795  if (start == 0 && count == 0) {
796  count = vvecs.size();
797  }
798 
799  std::set<uint256> set;
800  for (size_t i = 0; i < count; i++) {
801  auto& vvec = vvecs[start + i];
802  if (vvec == nullptr) {
803  return false;
804  }
805  if (vvec->size() != vvecs[start]->size()) {
806  return false;
807  }
808  for (size_t j = 0; j < vvec->size(); j++) {
809  if (!(*vvec)[j].IsValid()) {
810  return false;
811  }
812  // check duplicates
813  if (!set.emplace((*vvec)[j].GetHash()).second) {
814  return false;
815  }
816  }
817  }
818 
819  return true;
820 }
821 
822 bool CBLSWorker::VerifySecretKeyVector(const BLSSecretKeyVector& secKeys, size_t start, size_t count)
823 {
824  return VerifyVectorHelper(secKeys, start, count);
825 }
826 
827 bool CBLSWorker::VerifySignatureVector(const BLSSignatureVector& sigs, size_t start, size_t count)
828 {
829  return VerifyVectorHelper(sigs, start, count);
830 }
831 
832 void CBLSWorker::AsyncSign(const CBLSSecretKey& secKey, const uint256& msgHash, CBLSWorker::SignDoneCallback doneCallback)
833 {
834  workerPool.push([secKey, msgHash, doneCallback](int threadId) {
835  doneCallback(secKey.Sign(msgHash));
836  });
837 }
838 
839 std::future<CBLSSignature> CBLSWorker::AsyncSign(const CBLSSecretKey& secKey, const uint256& msgHash)
840 {
841  auto p = BuildFutureDoneCallback<CBLSSignature>();
842  AsyncSign(secKey, msgHash, std::move(p.first));
843  return std::move(p.second);
844 }
845 
846 void CBLSWorker::AsyncVerifySig(const CBLSSignature& sig, const CBLSPublicKey& pubKey, const uint256& msgHash,
847  CBLSWorker::SigVerifyDoneCallback doneCallback, CancelCond cancelCond)
848 {
849  if (!sig.IsValid() || !pubKey.IsValid()) {
850  doneCallback(false);
851  return;
852  }
853 
854  std::unique_lock<std::mutex> l(sigVerifyMutex);
855 
856  bool foundDuplicate = false;
857  for (auto& s : sigVerifyQueue) {
858  if (s.msgHash == msgHash) {
859  foundDuplicate = true;
860  break;
861  }
862  }
863 
864  if (foundDuplicate) {
865  // batched/aggregated verification does not allow duplicate hashes, so we push what we currently have and start
866  // with a fresh batch
868  }
869 
870  sigVerifyQueue.emplace_back(std::move(doneCallback), std::move(cancelCond), sig, pubKey, msgHash);
873  }
874 }
875 
876 std::future<bool> CBLSWorker::AsyncVerifySig(const CBLSSignature& sig, const CBLSPublicKey& pubKey, const uint256& msgHash, CancelCond cancelCond)
877 {
878  auto p = BuildFutureDoneCallback2<bool>();
879  AsyncVerifySig(sig, pubKey, msgHash, std::move(p.first), cancelCond);
880  return std::move(p.second);
881 }
882 
884 {
885  std::unique_lock<std::mutex> l(sigVerifyMutex);
886  return sigVerifyBatchesInProgress != 0;
887 }
888 
889 // sigVerifyMutex must be held while calling
891 {
892  auto f = [this](int threadId, std::shared_ptr<std::vector<SigVerifyJob> > _jobs) {
893  auto& jobs = *_jobs;
894  if (jobs.size() == 1) {
895  auto& job = jobs[0];
896  if (!job.cancelCond()) {
897  bool valid = job.sig.VerifyInsecure(job.pubKey, job.msgHash);
898  job.doneCallback(valid);
899  }
900  std::unique_lock<std::mutex> l(sigVerifyMutex);
902  if (!sigVerifyQueue.empty()) {
904  }
905  return;
906  }
907 
908  CBLSSignature aggSig;
909  std::vector<size_t> indexes;
910  std::vector<CBLSPublicKey> pubKeys;
911  std::vector<uint256> msgHashes;
912  indexes.reserve(jobs.size());
913  pubKeys.reserve(jobs.size());
914  msgHashes.reserve(jobs.size());
915  for (size_t i = 0; i < jobs.size(); i++) {
916  auto& job = jobs[i];
917  if (job.cancelCond()) {
918  continue;
919  }
920  if (pubKeys.empty()) {
921  aggSig = job.sig;
922  } else {
923  aggSig.AggregateInsecure(job.sig);
924  }
925  indexes.emplace_back(i);
926  pubKeys.emplace_back(job.pubKey);
927  msgHashes.emplace_back(job.msgHash);
928  }
929 
930  if (!pubKeys.empty()) {
931  bool allValid = aggSig.VerifyInsecureAggregated(pubKeys, msgHashes);
932  if (allValid) {
933  for (size_t i = 0; i < pubKeys.size(); i++) {
934  jobs[indexes[i]].doneCallback(true);
935  }
936  } else {
937  // one or more sigs were not valid, revert to per-sig verification
938  // TODO this could be improved if we would cache pairing results in some way as the previous aggregated verification already calculated all the pairings for the hashes
939  for (size_t i = 0; i < pubKeys.size(); i++) {
940  auto& job = jobs[indexes[i]];
941  bool valid = job.sig.VerifyInsecure(job.pubKey, job.msgHash);
942  job.doneCallback(valid);
943  }
944  }
945  }
946 
947  std::unique_lock<std::mutex> l(sigVerifyMutex);
949  if (!sigVerifyQueue.empty()) {
951  }
952  };
953 
954  auto batch = std::make_shared<std::vector<SigVerifyJob> >(std::move(sigVerifyQueue));
956 
958  workerPool.push(f, batch);
959 }
std::pair< std::function< void(const T &)>, std::future< T > > BuildFutureDoneCallback()
Definition: bls_worker.cpp:33
std::pair< std::function< void(T)>, std::future< T > > BuildFutureDoneCallback2()
Definition: bls_worker.cpp:42
void AsyncAggregateHelper(ctpl::thread_pool &workerPool, const std::vector< T > &vec, size_t start, size_t count, bool parallel, std::function< void(const T &)> doneCallback)
Definition: bls_worker.cpp:640
bool VerifyVectorHelper(const std::vector< T > &vec, size_t start, size_t count)
Definition: bls_worker.cpp:14
std::vector< CBLSId > BLSIdVector
Definition: bls_wrapper.h:408
std::vector< CBLSPublicKey > BLSVerificationVector
Definition: bls_wrapper.h:409
std::vector< CBLSPublicKey > BLSPublicKeyVector
Definition: bls_wrapper.h:410
std::shared_ptr< BLSVerificationVector > BLSVerificationVectorPtr
Definition: bls_wrapper.h:415
std::shared_ptr< BLSSecretKeyVector > BLSSecretKeyVectorPtr
Definition: bls_wrapper.h:417
std::vector< CBLSSecretKey > BLSSecretKeyVector
Definition: bls_wrapper.h:411
std::vector< CBLSSignature > BLSSignatureVector
Definition: bls_wrapper.h:412
bool PublicKeyShare(const std::vector< CBLSPublicKey > &mpk, const CBLSId &id)
CBLSPublicKey GetPublicKey() const
Definition: bls_wrapper.cpp:99
CBLSSignature Sign(const uint256 &hash) const
void AggregateInsecure(const CBLSSignature &o)
bool VerifyInsecureAggregated(const std::vector< CBLSPublicKey > &pubKeys, const std::vector< uint256 > &hashes) const
int sigVerifyBatchesInProgress
Definition: bls_worker.h:47
void AsyncBuildQuorumVerificationVector(const std::vector< BLSVerificationVectorPtr > &vvecs, size_t start, size_t count, bool parallel, std::function< void(const BLSVerificationVectorPtr &)> doneCallback)
Definition: bls_worker.cpp:605
CBLSSecretKey AggregateSecretKeys(const BLSSecretKeyVector &secKeys, size_t start=0, size_t count=0, bool parallel=true)
Definition: bls_worker.cpp:675
void AsyncAggregatePublicKeys(const BLSPublicKeyVector &pubKeys, size_t start, size_t count, bool parallel, std::function< void(const CBLSPublicKey &)> doneCallback)
Definition: bls_worker.cpp:681
bool VerifySecretKeyVector(const BLSSecretKeyVector &secKeys, size_t start=0, size_t count=0)
Definition: bls_worker.cpp:822
void AsyncVerifyContributionShares(const CBLSId &forId, const std::vector< BLSVerificationVectorPtr > &vvecs, const BLSSecretKeyVector &skShares, bool parallel, bool aggregated, std::function< void(const std::vector< bool > &)> doneCallback)
Definition: bls_worker.cpp:731
bool VerifyContributionShare(const CBLSId &forId, const BLSVerificationVectorPtr &vvec, const CBLSSecretKey &skContribution)
Definition: bls_worker.cpp:775
bool IsAsyncVerifyInProgress()
Definition: bls_worker.cpp:883
void AsyncSign(const CBLSSecretKey &secKey, const uint256 &msgHash, SignDoneCallback doneCallback)
Definition: bls_worker.cpp:832
ctpl::thread_pool workerPool
Definition: bls_worker.h:27
std::mutex sigVerifyMutex
Definition: bls_worker.h:46
std::vector< bool > VerifyContributionShares(const CBLSId &forId, const std::vector< BLSVerificationVectorPtr > &vvecs, const BLSSecretKeyVector &skShares, bool parallel=true, bool aggregated=true)
Definition: bls_worker.cpp:753
CBLSPublicKey BuildPubKeyShare(const BLSVerificationVectorPtr &vvec, const CBLSId &id)
Definition: bls_worker.cpp:724
BLSVerificationVectorPtr BuildQuorumVerificationVector(const std::vector< BLSVerificationVectorPtr > &vvecs, size_t start=0, size_t count=0, bool parallel=true)
Definition: bls_worker.cpp:633
void AsyncAggregateSecretKeys(const BLSSecretKeyVector &secKeys, size_t start, size_t count, bool parallel, std::function< void(const CBLSSecretKey &)> doneCallback)
Definition: bls_worker.cpp:660
CBLSPublicKey AggregatePublicKeys(const BLSPublicKeyVector &pubKeys, size_t start=0, size_t count=0, bool parallel=true)
Definition: bls_worker.cpp:696
void Start()
Definition: bls_worker.cpp:63
bool VerifyVerificationVector(const BLSVerificationVector &vvec, size_t start=0, size_t count=0)
Definition: bls_worker.cpp:787
static const int SIG_VERIFY_BATCH_SIZE
Definition: bls_worker.h:29
std::function< void(const CBLSSignature &)> SignDoneCallback
Definition: bls_worker.h:22
void AsyncAggregateSigs(const BLSSignatureVector &sigs, size_t start, size_t count, bool parallel, std::function< void(const CBLSSignature &)> doneCallback)
Definition: bls_worker.cpp:702
std::vector< SigVerifyJob > sigVerifyQueue
Definition: bls_worker.h:48
void Stop()
Definition: bls_worker.cpp:71
bool VerifyVerificationVectors(const std::vector< BLSVerificationVectorPtr > &vvecs, size_t start=0, size_t count=0)
Definition: bls_worker.cpp:792
CBLSSignature AggregateSigs(const BLSSignatureVector &sigs, size_t start=0, size_t count=0, bool parallel=true)
Definition: bls_worker.cpp:717
std::future< bool > AsyncVerifyContributionShare(const CBLSId &forId, const BLSVerificationVectorPtr &vvec, const CBLSSecretKey &skContribution)
Definition: bls_worker.cpp:759
std::function< bool()> CancelCond
Definition: bls_worker.h:24
bool VerifySignatureVector(const BLSSignatureVector &sigs, size_t start=0, size_t count=0)
Definition: bls_worker.cpp:827
void AsyncVerifySig(const CBLSSignature &sig, const CBLSPublicKey &pubKey, const uint256 &msgHash, SigVerifyDoneCallback doneCallback, CancelCond cancelCond=[] { return false;})
Definition: bls_worker.cpp:846
std::function< void(bool)> SigVerifyDoneCallback
Definition: bls_worker.h:23
void PushSigVerifyBatch()
Definition: bls_worker.cpp:890
bool GenerateContributions(int threshold, const BLSIdVector &ids, BLSVerificationVectorPtr &vvecRet, BLSSecretKeyVector &skShares)
Definition: bls_worker.cpp:77
bool IsValid() const
Definition: bls_wrapper.h:87
bool pop(T &v)
Definition: ctpl_stl.h:53
bool push(T const &value)
Definition: ctpl_stl.h:47
void stop(bool isWait=false)
Definition: ctpl_stl.h:142
void clear_queue()
Definition: ctpl_stl.h:122
auto push(F &&f, Rest &&... rest) -> std::future< decltype(f(0, rest...))>
Definition: ctpl_stl.h:173
void resize(int nThreads)
Definition: ctpl_stl.h:93
256-bit opaque blob.
Definition: uint256.h:138
#define T(expected, seed, data)
Definition: uint256.h:212
std::atomic< size_t > aggQueueSize
Definition: bls_worker.cpp:145
ctpl::detail::Queue< T * > aggQueue
Definition: bls_worker.cpp:144
void CheckDone()
Definition: bls_worker.cpp:218
void PushAggQueue(const T &v)
Definition: bls_worker.cpp:276
const T * pointer(const T &v)
Definition: bls_worker.cpp:170
std::shared_ptr< std::vector< const T * > > inputVec
Definition: bls_worker.cpp:139
void Start()
Definition: bls_worker.cpp:175
T SyncAggregate(const std::vector< TP > &vec, size_t start, size_t count)
Definition: bls_worker.cpp:313
Aggregator(const std::vector< TP > &_inputVec, size_t start, size_t count, bool _parallel, ctpl::thread_pool &_workerPool, DoneCallback _doneCallback)
Definition: bls_worker.cpp:155
void IncWait()
Definition: bls_worker.cpp:213
size_t batchSize
Definition: bls_worker.cpp:135
std::atomic< size_t > waitCount
Definition: bls_worker.cpp:151
void AsyncAggregateAndPushAggQueue(const std::shared_ptr< std::vector< const T * >> &vec, size_t start, size_t count, bool del)
Definition: bls_worker.cpp:255
void Finish()
Definition: bls_worker.cpp:225
const T * pointer(const T *v)
Definition: bls_worker.cpp:171
std::function< void(const T &agg)> DoneCallback
Definition: bls_worker.cpp:147
ctpl::thread_pool & workerPool
Definition: bls_worker.cpp:136
DoneCallback doneCallback
Definition: bls_worker.cpp:148
void PushWork(Callable &&f)
Definition: bls_worker.cpp:323
void SyncAggregateAndPushAggQueue(const std::shared_ptr< std::vector< const T * > > &vec, size_t start, size_t count, bool del)
Definition: bls_worker.cpp:264
std::mutex m
Definition: bls_worker.cpp:141
std::unique_ptr< std::atomic< int > > aggDone
Definition: bls_worker.cpp:413
std::vector< char > verifyResults
Definition: bls_worker.cpp:419
BLSVerificationVectorPtr vvec
Definition: bls_worker.cpp:408
void HandleAggSkShareDone(size_t batchIdx, const CBLSSecretKey &skShare)
Definition: bls_worker.cpp:519
void HandleAggVvecDone(size_t batchIdx, const BLSVerificationVectorPtr &vvec)
Definition: bls_worker.cpp:511
void HandleVerifyDone(size_t batchIdx, size_t count)
Definition: bls_worker.cpp:528
void AsyncAggregatedVerifyBatch(size_t batchIdx)
Definition: bls_worker.cpp:551
std::function< void(const std::vector< bool > &)> doneCallback
Definition: bls_worker.cpp:436
ContributionVerifier(const CBLSId &_forId, const std::vector< BLSVerificationVectorPtr > &_vvecs, const BLSSecretKeyVector &_skShares, size_t _batchSize, bool _parallel, bool _aggregated, ctpl::thread_pool &_workerPool, std::function< void(const std::vector< bool > &)> _doneCallback)
Definition: bls_worker.cpp:438
void PushOrDoWork(Callable &&f)
Definition: bls_worker.cpp:595
void AsyncVerifyBatchOneByOne(size_t batchIdx)
Definition: bls_worker.cpp:569
const BLSSecretKeyVector & skShares
Definition: bls_worker.cpp:424
void HandleAggDone(size_t batchIdx)
Definition: bls_worker.cpp:536
std::vector< BatchState > batchStates
Definition: bls_worker.cpp:434
const std::vector< BLSVerificationVectorPtr > & vvecs
Definition: bls_worker.cpp:423
std::atomic< size_t > verifyDoneCount
Definition: bls_worker.cpp:435
ctpl::thread_pool & workerPool
Definition: bls_worker.cpp:429
void AsyncAggregate(size_t batchIdx)
Definition: bls_worker.cpp:498
bool Verify(const BLSVerificationVectorPtr &vvec, const CBLSSecretKey &skShare)
Definition: bls_worker.cpp:583
void CheckDone(const T &agg, size_t idx)
Definition: bls_worker.cpp:392
Aggregator< T > AggregatorType
Definition: bls_worker.cpp:342
DoneCallback doneCallback
Definition: bls_worker.cpp:355
std::function< void(const VectorPtrType &agg)> DoneCallback
Definition: bls_worker.cpp:346
std::vector< T > VectorType
Definition: bls_worker.cpp:343
VectorPtrType result
Definition: bls_worker.cpp:358
VectorAggregator(const VectorVectorType &_vecs, size_t _start, size_t _count, bool _parallel, ctpl::thread_pool &_workerPool, DoneCallback _doneCallback)
Definition: bls_worker.cpp:361
std::vector< VectorPtrType > VectorVectorType
Definition: bls_worker.cpp:345
const VectorVectorType & vecs
Definition: bls_worker.cpp:348
std::shared_ptr< VectorType > VectorPtrType
Definition: bls_worker.cpp:344
std::atomic< size_t > doneCount
Definition: bls_worker.cpp:356
ctpl::thread_pool & workerPool
Definition: bls_worker.cpp:353
int GetNumCores()
Return the number of cores available on the current system.
Definition: system.cpp:1095
void RenameThreadPool(ctpl::thread_pool &tp, const char *baseName)
Definition: threadnames.cpp:73