16 if (start == 0 && count == 0) {
19 std::set<uint256> set;
20 for (
size_t i = start; i < start + count; i++) {
21 if (!vec[i].IsValid())
24 if (!set.emplace(vec[i].GetHash()).second) {
35 auto p = std::make_shared<std::promise<T> >();
36 std::function<void(
const T&)> f = [p](
const T& v) {
39 return std::make_pair(std::move(f), p->get_future());
44 auto p = std::make_shared<std::promise<T> >();
45 std::function<void(
const T&)> f = [p](
T v) {
48 return std::make_pair(std::move(f), p->get_future());
66 workerCount = std::max(std::min(1, workerCount), 4);
80 vvecRet = std::make_shared<BLSVerificationVector>((
size_t)quorumThreshold);
81 skShares.resize(ids.size());
83 for (
int i = 0; i < quorumThreshold; i++) {
84 (*svec)[i].MakeNewKey();
86 std::list<std::future<bool> > futures;
89 for (
size_t i = 0; i < (size_t)quorumThreshold; i += batchSize) {
91 size_t count = std::min(batchSize, quorumThreshold - start);
92 auto f = [&, start, count](
int threadId) {
93 for (
size_t j = start; j < start + count; j++) {
94 (*vvecRet)[j] = (*svec)[j].GetPublicKey();
101 for (
size_t i = 0; i < ids.size(); i += batchSize) {
103 size_t count = std::min(batchSize, ids.size() - start);
104 auto f = [&, start, count](
int threadId) {
105 for (
size_t j = start; j < start + count; j++) {
106 if (!skShares[j].SecretKeyShare(*svec, ids[j])) {
115 for (
auto& f : futures) {
131 template <
typename T>
132 struct Aggregator :
public std::enable_shared_from_this<Aggregator<T>> {
154 template <
typename TP>
156 size_t start,
size_t count,
164 inputVec = std::make_shared<std::vector<const T*> >(count);
165 for (
size_t i = 0; i < count; i++) {
166 (*inputVec)[i] =
pointer(_inputVec[start + i]);
188 if (batchCount == 1) {
190 auto self(this->shared_from_this());
192 size_t vecSize =
self->inputVec->size();
194 self->doneCallback(*(*self->inputVec)[0]);
196 self->doneCallback(self->SyncAggregate(*self->inputVec, 0, vecSize));
204 for (
size_t i = 0; i < batchCount; i++) {
232 for (
size_t i = 0; i < rem.size(); i++) {
240 if (rem.size() == 1) {
249 for (
size_t i = 0; i < rem.size(); i++) {
258 auto self(this->shared_from_this());
259 PushWork([
self, vec, start, count, del](
int threadId){
260 self->SyncAggregateAndPushAggQueue(vec, start, count, del);
269 for (
size_t i = 0; i < count; i++) {
270 delete (*vec)[start + i];
278 auto copyT =
new T(v);
288 std::shared_ptr<std::vector<const T*> > newBatch;
290 std::unique_lock<std::mutex> l(
m);
295 newBatch = std::make_shared<std::vector<const T*> >(
batchSize);
312 template <
typename TP>
315 T result = *vec[start];
316 for (
size_t j = 1; j < count; j++) {
317 result.AggregateInsecure(*vec[start + j]);
322 template <
typename Callable>
340 template <
typename T>
362 size_t _start,
size_t _count,
372 assert(!
vecs.empty());
380 for (
size_t i = 0; i <
vecSize; i++) {
381 std::vector<const T*> tmp(
count);
382 for (
size_t j = 0; j <
count; j++) {
386 auto self(this->shared_from_this());
387 auto aggregator = std::make_shared<AggregatorType>(std::move(tmp), 0,
count,
parallel,
workerPool, [
self, i](
const T& agg) {
self->CheckDone(agg, i);});
394 (*result)[idx] = agg;
423 const std::vector<BLSVerificationVectorPtr>&
vvecs;
441 std::function<
void(
const std::vector<bool>&)> _doneCallback) :
468 batchState.aggDone.reset(
new std::atomic<int>(0));
470 batchState.count = std::min(
batchSize,
vvecs.size() - batchState.start);
471 batchState.verifyResults.assign(batchState.count, 0);
476 for (
size_t i = 0; i < batchCount2; i++) {
488 std::vector<bool> result(
vvecs.size());
491 for (
size_t j = 0; j < batchState.count; j++) {
492 result[batchState.start + j] = batchState.verifyResults[j] != 0;
503 auto self(this->shared_from_this());
505 auto skShareAgg = std::make_shared<Aggregator<CBLSSecretKey>>(
skShares, batchState.start, batchState.count,
parallel,
workerPool, [
self, batchIdx] (
const CBLSSecretKey& skShare) {
self->HandleAggSkShareDone(batchIdx, skShare);});
514 batchState.vvec = vvec;
515 if (++(*batchState.aggDone) == 2) {
522 batchState.skShare = skShare;
523 if (++(*batchState.aggDone) == 2) {
540 if (batchState.vvec ==
nullptr || batchState.vvec->empty() || !batchState.skShare.IsValid()) {
543 batchState.verifyResults.assign(batchState.count, 0);
553 auto self(shared_from_this());
554 auto f = [
self, batchIdx](
int threadId) {
555 auto& batchState =
self->batchStates[batchIdx];
556 bool result =
self->Verify(batchState.vvec, batchState.skShare);
559 batchState.verifyResults.assign(batchState.count, 1);
560 self->HandleVerifyDone(batchIdx, batchState.count);
563 self->AsyncVerifyBatchOneByOne(batchIdx);
572 batchStates[batchIdx].verifyResults.assign(count, 0);
573 for (
size_t i = 0; i < count; i++) {
574 auto self(this->shared_from_this());
576 auto& batchState =
self->batchStates[batchIdx];
577 batchState.verifyResults[i] =
self->Verify(self->vvecs[batchState.start + i], self->skShares[batchState.start + i]);
578 self->HandleVerifyDone(batchIdx, 1);
594 template <
typename Callable>
606 size_t start,
size_t count,
bool parallel,
609 if (start == 0 && count == 0) {
610 count = vvecs.size();
612 if (vvecs.empty() || count == 0 || start > vvecs.size() || start + count > vvecs.size()) {
613 doneCallback(
nullptr);
617 doneCallback(
nullptr);
621 auto agg = std::make_shared<VectorAggregator<CBLSPublicKey>>(vvecs, start, count, parallel,
workerPool, std::move(doneCallback));
626 size_t start,
size_t count,
bool parallel)
628 auto p = BuildFutureDoneCallback<BLSVerificationVectorPtr>();
630 return std::move(p.second);
634 size_t start,
size_t count,
bool parallel)
639 template <
typename T>
641 const std::vector<T>& vec,
size_t start,
size_t count,
bool parallel,
642 std::function<
void(
const T&)> doneCallback)
644 if (start == 0 && count == 0) {
647 if (vec.empty() || count == 0 || start > vec.size() || start + count > vec.size()) {
656 auto agg = std::make_shared<Aggregator<T>>(vec, start, count, parallel, workerPool, std::move(doneCallback));
661 size_t start,
size_t count,
bool parallel,
668 size_t start,
size_t count,
bool parallel)
670 auto p = BuildFutureDoneCallback<CBLSSecretKey>();
672 return std::move(p.second);
676 size_t start,
size_t count,
bool parallel)
682 size_t start,
size_t count,
bool parallel,
689 size_t start,
size_t count,
bool parallel)
691 auto p = BuildFutureDoneCallback<CBLSPublicKey>();
693 return std::move(p.second);
697 size_t start,
size_t count,
bool parallel)
703 size_t start,
size_t count,
bool parallel,
710 size_t start,
size_t count,
bool parallel)
712 auto p = BuildFutureDoneCallback<CBLSSignature>();
714 return std::move(p.second);
718 size_t start,
size_t count,
bool parallel)
732 bool parallel,
bool aggregated, std::function<
void(
const std::vector<bool>&)> doneCallback)
735 std::vector<bool> result;
736 result.assign(vvecs.size(),
false);
737 doneCallback(result);
741 auto verifier = std::make_shared<ContributionVerifier>(forId, vvecs, skShares, 8, parallel, aggregated,
workerPool, std::move(doneCallback));
746 bool parallel,
bool aggregated)
748 auto p = BuildFutureDoneCallback<std::vector<bool> >();
750 return std::move(p.second);
754 bool parallel,
bool aggregated)
764 auto p = BuildFutureDoneCallback<bool>();
766 return std::move(p.second);
769 auto f = [
this, &forId, &vvec, &skContribution](
int threadId) {
793 size_t start,
size_t count)
795 if (start == 0 && count == 0) {
796 count = vvecs.size();
799 std::set<uint256> set;
800 for (
size_t i = 0; i < count; i++) {
801 auto& vvec = vvecs[start + i];
802 if (vvec ==
nullptr) {
805 if (vvec->size() != vvecs[start]->size()) {
808 for (
size_t j = 0; j < vvec->size(); j++) {
809 if (!(*vvec)[j].IsValid()) {
813 if (!set.emplace((*vvec)[j].GetHash()).second) {
835 doneCallback(secKey.
Sign(msgHash));
841 auto p = BuildFutureDoneCallback<CBLSSignature>();
842 AsyncSign(secKey, msgHash, std::move(p.first));
843 return std::move(p.second);
856 bool foundDuplicate =
false;
858 if (s.msgHash == msgHash) {
859 foundDuplicate =
true;
864 if (foundDuplicate) {
870 sigVerifyQueue.emplace_back(std::move(doneCallback), std::move(cancelCond), sig, pubKey, msgHash);
878 auto p = BuildFutureDoneCallback2<bool>();
879 AsyncVerifySig(sig, pubKey, msgHash, std::move(p.first), cancelCond);
880 return std::move(p.second);
892 auto f = [
this](
int threadId, std::shared_ptr<std::vector<SigVerifyJob> > _jobs) {
894 if (jobs.size() == 1) {
896 if (!job.cancelCond()) {
897 bool valid = job.sig.VerifyInsecure(job.pubKey, job.msgHash);
898 job.doneCallback(valid);
909 std::vector<size_t> indexes;
910 std::vector<CBLSPublicKey> pubKeys;
911 std::vector<uint256> msgHashes;
912 indexes.reserve(jobs.size());
913 pubKeys.reserve(jobs.size());
914 msgHashes.reserve(jobs.size());
915 for (
size_t i = 0; i < jobs.size(); i++) {
917 if (job.cancelCond()) {
920 if (pubKeys.empty()) {
925 indexes.emplace_back(i);
926 pubKeys.emplace_back(job.pubKey);
927 msgHashes.emplace_back(job.msgHash);
930 if (!pubKeys.empty()) {
933 for (
size_t i = 0; i < pubKeys.size(); i++) {
934 jobs[indexes[i]].doneCallback(
true);
939 for (
size_t i = 0; i < pubKeys.size(); i++) {
940 auto& job = jobs[indexes[i]];
941 bool valid = job.sig.VerifyInsecure(job.pubKey, job.msgHash);
942 job.doneCallback(valid);
954 auto batch = std::make_shared<std::vector<SigVerifyJob> >(std::move(
sigVerifyQueue));
std::pair< std::function< void(const T &)>, std::future< T > > BuildFutureDoneCallback()
std::pair< std::function< void(T)>, std::future< T > > BuildFutureDoneCallback2()
void AsyncAggregateHelper(ctpl::thread_pool &workerPool, const std::vector< T > &vec, size_t start, size_t count, bool parallel, std::function< void(const T &)> doneCallback)
bool VerifyVectorHelper(const std::vector< T > &vec, size_t start, size_t count)
std::vector< CBLSId > BLSIdVector
std::vector< CBLSPublicKey > BLSVerificationVector
std::vector< CBLSPublicKey > BLSPublicKeyVector
std::shared_ptr< BLSVerificationVector > BLSVerificationVectorPtr
std::shared_ptr< BLSSecretKeyVector > BLSSecretKeyVectorPtr
std::vector< CBLSSecretKey > BLSSecretKeyVector
std::vector< CBLSSignature > BLSSignatureVector
bool PublicKeyShare(const std::vector< CBLSPublicKey > &mpk, const CBLSId &id)
CBLSPublicKey GetPublicKey() const
CBLSSignature Sign(const uint256 &hash) const
void AggregateInsecure(const CBLSSignature &o)
bool VerifyInsecureAggregated(const std::vector< CBLSPublicKey > &pubKeys, const std::vector< uint256 > &hashes) const
int sigVerifyBatchesInProgress
void AsyncBuildQuorumVerificationVector(const std::vector< BLSVerificationVectorPtr > &vvecs, size_t start, size_t count, bool parallel, std::function< void(const BLSVerificationVectorPtr &)> doneCallback)
CBLSSecretKey AggregateSecretKeys(const BLSSecretKeyVector &secKeys, size_t start=0, size_t count=0, bool parallel=true)
void AsyncAggregatePublicKeys(const BLSPublicKeyVector &pubKeys, size_t start, size_t count, bool parallel, std::function< void(const CBLSPublicKey &)> doneCallback)
bool VerifySecretKeyVector(const BLSSecretKeyVector &secKeys, size_t start=0, size_t count=0)
void AsyncVerifyContributionShares(const CBLSId &forId, const std::vector< BLSVerificationVectorPtr > &vvecs, const BLSSecretKeyVector &skShares, bool parallel, bool aggregated, std::function< void(const std::vector< bool > &)> doneCallback)
bool VerifyContributionShare(const CBLSId &forId, const BLSVerificationVectorPtr &vvec, const CBLSSecretKey &skContribution)
bool IsAsyncVerifyInProgress()
void AsyncSign(const CBLSSecretKey &secKey, const uint256 &msgHash, SignDoneCallback doneCallback)
ctpl::thread_pool workerPool
std::mutex sigVerifyMutex
std::vector< bool > VerifyContributionShares(const CBLSId &forId, const std::vector< BLSVerificationVectorPtr > &vvecs, const BLSSecretKeyVector &skShares, bool parallel=true, bool aggregated=true)
CBLSPublicKey BuildPubKeyShare(const BLSVerificationVectorPtr &vvec, const CBLSId &id)
BLSVerificationVectorPtr BuildQuorumVerificationVector(const std::vector< BLSVerificationVectorPtr > &vvecs, size_t start=0, size_t count=0, bool parallel=true)
void AsyncAggregateSecretKeys(const BLSSecretKeyVector &secKeys, size_t start, size_t count, bool parallel, std::function< void(const CBLSSecretKey &)> doneCallback)
CBLSPublicKey AggregatePublicKeys(const BLSPublicKeyVector &pubKeys, size_t start=0, size_t count=0, bool parallel=true)
bool VerifyVerificationVector(const BLSVerificationVector &vvec, size_t start=0, size_t count=0)
static const int SIG_VERIFY_BATCH_SIZE
std::function< void(const CBLSSignature &)> SignDoneCallback
void AsyncAggregateSigs(const BLSSignatureVector &sigs, size_t start, size_t count, bool parallel, std::function< void(const CBLSSignature &)> doneCallback)
std::vector< SigVerifyJob > sigVerifyQueue
bool VerifyVerificationVectors(const std::vector< BLSVerificationVectorPtr > &vvecs, size_t start=0, size_t count=0)
CBLSSignature AggregateSigs(const BLSSignatureVector &sigs, size_t start=0, size_t count=0, bool parallel=true)
std::future< bool > AsyncVerifyContributionShare(const CBLSId &forId, const BLSVerificationVectorPtr &vvec, const CBLSSecretKey &skContribution)
std::function< bool()> CancelCond
bool VerifySignatureVector(const BLSSignatureVector &sigs, size_t start=0, size_t count=0)
void AsyncVerifySig(const CBLSSignature &sig, const CBLSPublicKey &pubKey, const uint256 &msgHash, SigVerifyDoneCallback doneCallback, CancelCond cancelCond=[] { return false;})
std::function< void(bool)> SigVerifyDoneCallback
void PushSigVerifyBatch()
bool GenerateContributions(int threshold, const BLSIdVector &ids, BLSVerificationVectorPtr &vvecRet, BLSSecretKeyVector &skShares)
bool push(T const &value)
void stop(bool isWait=false)
auto push(F &&f, Rest &&... rest) -> std::future< decltype(f(0, rest...))>
void resize(int nThreads)
#define T(expected, seed, data)
std::atomic< size_t > aggQueueSize
ctpl::detail::Queue< T * > aggQueue
void PushAggQueue(const T &v)
const T * pointer(const T &v)
std::shared_ptr< std::vector< const T * > > inputVec
T SyncAggregate(const std::vector< TP > &vec, size_t start, size_t count)
Aggregator(const std::vector< TP > &_inputVec, size_t start, size_t count, bool _parallel, ctpl::thread_pool &_workerPool, DoneCallback _doneCallback)
std::atomic< size_t > waitCount
void AsyncAggregateAndPushAggQueue(const std::shared_ptr< std::vector< const T * >> &vec, size_t start, size_t count, bool del)
const T * pointer(const T *v)
std::function< void(const T &agg)> DoneCallback
ctpl::thread_pool & workerPool
DoneCallback doneCallback
void PushWork(Callable &&f)
void SyncAggregateAndPushAggQueue(const std::shared_ptr< std::vector< const T * > > &vec, size_t start, size_t count, bool del)
std::unique_ptr< std::atomic< int > > aggDone
std::vector< char > verifyResults
BLSVerificationVectorPtr vvec
void HandleAggSkShareDone(size_t batchIdx, const CBLSSecretKey &skShare)
void HandleAggVvecDone(size_t batchIdx, const BLSVerificationVectorPtr &vvec)
void HandleVerifyDone(size_t batchIdx, size_t count)
void AsyncAggregatedVerifyBatch(size_t batchIdx)
std::function< void(const std::vector< bool > &)> doneCallback
ContributionVerifier(const CBLSId &_forId, const std::vector< BLSVerificationVectorPtr > &_vvecs, const BLSSecretKeyVector &_skShares, size_t _batchSize, bool _parallel, bool _aggregated, ctpl::thread_pool &_workerPool, std::function< void(const std::vector< bool > &)> _doneCallback)
void PushOrDoWork(Callable &&f)
void AsyncVerifyBatchOneByOne(size_t batchIdx)
const BLSSecretKeyVector & skShares
void HandleAggDone(size_t batchIdx)
std::vector< BatchState > batchStates
const std::vector< BLSVerificationVectorPtr > & vvecs
std::atomic< size_t > verifyDoneCount
ctpl::thread_pool & workerPool
void AsyncAggregate(size_t batchIdx)
bool Verify(const BLSVerificationVectorPtr &vvec, const CBLSSecretKey &skShare)
void CheckDone(const T &agg, size_t idx)
Aggregator< T > AggregatorType
DoneCallback doneCallback
std::function< void(const VectorPtrType &agg)> DoneCallback
std::vector< T > VectorType
VectorAggregator(const VectorVectorType &_vecs, size_t _start, size_t _count, bool _parallel, ctpl::thread_pool &_workerPool, DoneCallback _doneCallback)
std::vector< VectorPtrType > VectorVectorType
const VectorVectorType & vecs
std::shared_ptr< VectorType > VectorPtrType
std::atomic< size_t > doneCount
ctpl::thread_pool & workerPool
int GetNumCores()
Return the number of cores available on the current system.
void RenameThreadPool(ctpl::thread_pool &tp, const char *baseName)