PIVX Core  5.6.99
P2P Digital Currency
quorums_signing_shares.cpp
Go to the documentation of this file.
1 // Copyright (c) 2018 The Dash Core developers
2 // Copyright (c) 2023 The PIVX Core developers
3 // Distributed under the MIT/X11 software license, see the accompanying
4 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
5 
6 #include "quorums_signing.h"
7 
8 #include "activemasternode.h"
10 #include "cxxtimer.h"
11 #include "init.h"
12 #include "net.h"
13 #include "net_processing.h"
14 #include "netmessagemaker.h"
15 #include "quorums_signing_shares.h"
16 #include "quorums_utils.h"
17 #include "random.h"
18 #include "shutdown.h"
19 #include "tiertwo/masternode_meta_manager.h" // for g_mmetaman
21 #include "validation.h"
22 
23 namespace llmq
24 {
25 
26 std::unique_ptr<CSigSharesManager> quorumSigSharesManager{nullptr};
27 
29 {
30  key.first = llmq::utils::BuildSignHash(*this);
31  key.second = quorumMember;
32 }
33 
34 std::string CSigSesAnn::ToString() const
35 {
36  return strprintf("sessionId=%d, llmqType=%d, quorumHash=%s, id=%s, msgHash=%s",
37  sessionId, llmqType, quorumHash.ToString(), id.ToString(), msgHash.ToString());
38 }
39 
41 {
42  for (size_t i = 0; i < inv.size(); i++) {
43  if (inv2.inv[i]) {
44  inv[i] = inv2.inv[i];
45  }
46  }
47 }
48 
50 {
51  return (size_t)std::count(inv.begin(), inv.end(), true);
52 }
53 
54 std::string CSigSharesInv::ToString() const
55 {
56  std::string str = "(";
57  bool first = true;
58  for (size_t i = 0; i < inv.size(); i++) {
59  if (!inv[i]) {
60  continue;
61  }
62 
63  if (!first) {
64  str += ",";
65  }
66  first = false;
67  str += strprintf("%d", i);
68  }
69  str += ")";
70  return str;
71 }
72 
73 void CSigSharesInv::Init(size_t size)
74 {
75  inv.resize(size, false);
76 }
77 
78 bool CSigSharesInv::IsSet(uint16_t quorumMember) const
79 {
80  assert(quorumMember < inv.size());
81  return inv[quorumMember];
82 }
83 
84 void CSigSharesInv::Set(uint16_t quorumMember, bool v)
85 {
86  assert(quorumMember < inv.size());
87  inv[quorumMember] = v;
88 }
89 
91 {
92  for (size_t i = 0; i < inv.size(); i++) {
93  inv[i] = v;
94  }
95 }
96 
97 std::string CBatchedSigShares::ToInvString() const
98 {
99  CSigSharesInv inv;
100  // we use 400 here no matter what the real size is. We don't really care about that size as we just want to call ToString()
101  inv.Init(400);
102  for (size_t i = 0; i < sigShares.size(); i++) {
103  inv.inv[sigShares[i].first] = true;
104  }
105  return inv.ToString();
106 }
107 
108 template <typename T>
109 static void InitSession(CSigSharesNodeState::Session& s, const uint256& signHash, T& from)
110 {
111  const auto& params = Params().GetConsensus().llmqs.at((Consensus::LLMQType)from.llmqType);
112 
113  s.llmqType = (Consensus::LLMQType)from.llmqType;
114  s.quorumHash = from.quorumHash;
115  s.id = from.id;
116  s.msgHash = from.msgHash;
117  s.signHash = signHash;
118  s.announced.Init((size_t)params.size);
119  s.requested.Init((size_t)params.size);
120  s.knows.Init((size_t)params.size);
121 }
122 
124 {
125  auto& s = sessions[sigShare.GetSignHash()];
126  if (s.announced.inv.empty()) {
127  InitSession(s, sigShare.GetSignHash(), sigShare);
128  }
129  return s;
130 }
131 
133 {
134  auto signHash = llmq::utils::BuildSignHash((Consensus::LLMQType)ann.llmqType, ann.quorumHash, ann.id, ann.msgHash);
135  auto& s = sessions[signHash];
136  if (s.announced.inv.empty()) {
137  InitSession(s, signHash, ann);
138  }
139  return s;
140 }
141 
143 {
144  auto it = sessions.find(signHash);
145  if (it == sessions.end()) {
146  return nullptr;
147  }
148  return &it->second;
149 }
150 
152 {
153  auto it = sessionByRecvId.find(sessionId);
154  if (it == sessionByRecvId.end()) {
155  return nullptr;
156  }
157  return it->second;
158 }
159 
161 {
162  auto s = GetSessionByRecvId(sessionId);
163  if (!s) {
164  return false;
165  }
166  retInfo.llmqType = s->llmqType;
167  retInfo.quorumHash = s->quorumHash;
168  retInfo.id = s->id;
169  retInfo.msgHash = s->msgHash;
170  retInfo.signHash = s->signHash;
171  retInfo.quorum = s->quorum;
172 
173  return true;
174 }
175 
177 {
178  auto it = sessions.find(signHash);
179  if (it != sessions.end()) {
180  sessionByRecvId.erase(it->second.recvSessionId);
181  sessions.erase(it);
182  }
184  pendingIncomingSigShares.EraseAllForSignHash(signHash);
185 }
186 
188 
190 {
192 }
193 
195 {
196 }
197 
199 {
200  workThread = std::thread(&TraceThread<std::function<void()>>, "quorum-sigshares", std::function<void()>(std::bind(&CSigSharesManager::WorkThreadMain, this)));
201 }
202 
204 {
205  if (workThread.joinable()) {
206  workThread.join();
207  }
208 }
209 
211 {
212  quorumSigningManager->RegisterRecoveredSigsListener(this);
213 }
214 
216 {
217  quorumSigningManager->UnregisterRecoveredSigsListener(this);
218 }
219 
221 {
223 }
224 
225 void CSigSharesManager::ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman)
226 {
227  // non-masternodes are not interested in sigshares
229  return;
230  }
231 
232  if (strCommand == NetMsgType::QSIGSHARE) {
233  std::vector<CSigShare> sigShares;
234  vRecv >> sigShares;
235 
236  if (sigShares.size() > MAX_MSGS_SIG_SHARES) {
237  LogPrint(BCLog::LLMQ, "CSigSharesManager::%s -- too many sigs in QSIGSHARE message. cnt=%d, max=%d, node=%d\n", __func__, sigShares.size(), MAX_MSGS_SIG_SHARES, pfrom->GetId());
238  BanNode(pfrom->GetId());
239  return;
240  }
241 
242  for (auto& sigShare : sigShares) {
243  ProcessMessageSigShare(pfrom->GetId(), sigShare, connman);
244  }
245  }
246 
247  if (strCommand == NetMsgType::QSIGSESANN) {
248  std::vector<CSigSesAnn> msgs;
249  vRecv >> msgs;
250  if (msgs.size() > MAX_MSGS_CNT_QSIGSESANN) {
251  LogPrintf("CSigSharesManager::%s -- too many announcements in QSIGSESANN message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QSIGSESANN, pfrom->GetId());
252  BanNode(pfrom->GetId());
253  return;
254  }
255  for (auto& ann : msgs) {
256  if (!ProcessMessageSigSesAnn(pfrom, ann, connman)) {
257  BanNode(pfrom->GetId());
258  return;
259  }
260  }
261  } else if (strCommand == NetMsgType::QSIGSHARESINV) {
262  std::vector<CSigSharesInv> msgs;
263  vRecv >> msgs;
264  if (msgs.size() > MAX_MSGS_CNT_QSIGSHARESINV) {
265  LogPrintf("CSigSharesManager::%s -- too many invs in QSIGSHARESINV message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QSIGSHARESINV, pfrom->GetId());
266  BanNode(pfrom->GetId());
267  return;
268  }
269  for (auto& inv : msgs) {
270  if (!ProcessMessageSigSharesInv(pfrom, inv, connman)) {
271  BanNode(pfrom->GetId());
272  return;
273  }
274  }
275  } else if (strCommand == NetMsgType::QGETSIGSHARES) {
276  std::vector<CSigSharesInv> msgs;
277  vRecv >> msgs;
278  if (msgs.size() > MAX_MSGS_CNT_QGETSIGSHARES) {
279  LogPrintf("CSigSharesManager::%s -- too many invs in QGETSIGSHARES message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QGETSIGSHARES, pfrom->GetId());
280  BanNode(pfrom->GetId());
281  return;
282  }
283  for (auto& inv : msgs) {
284  if (!ProcessMessageGetSigShares(pfrom, inv, connman)) {
285  BanNode(pfrom->GetId());
286  return;
287  }
288  }
289  } else if (strCommand == NetMsgType::QBSIGSHARES) {
290  std::vector<CBatchedSigShares> msgs;
291  vRecv >> msgs;
292  size_t totalSigsCount = 0;
293  for (auto& bs : msgs) {
294  totalSigsCount += bs.sigShares.size();
295  }
296  if (totalSigsCount > MAX_MSGS_TOTAL_BATCHED_SIGS) {
297  LogPrintf("CSigSharesManager::%s -- too many sigs in QBSIGSHARES message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_TOTAL_BATCHED_SIGS, pfrom->GetId());
298  BanNode(pfrom->GetId());
299  return;
300  }
301  for (auto& bs : msgs) {
302  if (!ProcessMessageBatchedSigShares(pfrom, bs, connman)) {
303  BanNode(pfrom->GetId());
304  return;
305  }
306  }
307  }
308 }
309 
311 {
312  auto llmqType = (Consensus::LLMQType)ann.llmqType;
313  if (!Params().GetConsensus().llmqs.count(llmqType)) {
314  return false;
315  }
316  if (ann.sessionId == (uint32_t)-1 || ann.quorumHash.IsNull() || ann.id.IsNull() || ann.msgHash.IsNull()) {
317  return false;
318  }
319 
320  LogPrint(BCLog::LLMQ, "CSigSharesManager::%s -- ann={%s}, node=%d\n", __func__, ann.ToString(), pfrom->GetId());
321 
322  auto quorum = quorumManager->GetQuorum(llmqType, ann.quorumHash);
323  if (!quorum) {
324  // TODO should we ban here?
325  LogPrint(BCLog::LLMQ, "CSigSharesManager::%s -- quorum %s not found, node=%d\n", __func__,
326  ann.quorumHash.ToString(), pfrom->GetId());
327  return true; // let's still try other announcements from the same message
328  }
329 
330  auto signHash = llmq::utils::BuildSignHash(llmqType, ann.quorumHash, ann.id, ann.msgHash);
331 
332  LOCK(cs);
333  auto& nodeState = nodeStates[pfrom->GetId()];
334  auto& session = nodeState.GetOrCreateSessionFromAnn(ann);
335  nodeState.sessionByRecvId.erase(session.recvSessionId);
336  nodeState.sessionByRecvId.erase(ann.sessionId);
337  session.recvSessionId = ann.sessionId;
338  session.quorum = quorum;
339  nodeState.sessionByRecvId.emplace(ann.sessionId, &session);
340 
341  return true;
342 }
343 
345 {
346  size_t quorumSize = (size_t)Params().GetConsensus().llmqs.at(llmqType).size;
347 
348  if (inv.inv.size() != quorumSize) {
349  return false;
350  }
351  return true;
352 }
353 
355 {
357  if (!GetSessionInfoByRecvId(pfrom->GetId(), inv.sessionId, sessionInfo)) {
358  return true;
359  }
360 
361  if (!VerifySigSharesInv(pfrom->GetId(), sessionInfo.llmqType, inv)) {
362  return false;
363  }
364 
365  // TODO for PoSe, we should consider propagating shares even if we already have a recovered sig
366  if (quorumSigningManager->HasRecoveredSigForSession(sessionInfo.signHash)) {
367  return true;
368  }
369 
370  LogPrint(BCLog::LLMQ, "CSigSharesManager::%s -- signHash=%s, inv={%s}, node=%d\n", __func__,
371  sessionInfo.signHash.ToString(), inv.ToString(), pfrom->GetId());
372 
373  if (sessionInfo.quorum->quorumVvec == nullptr) {
374  // TODO we should allow to ask other nodes for the quorum vvec if we missed it in the DKG
375  LogPrint(BCLog::LLMQ, "CSigSharesManager::%s -- we don't have the quorum vvec for %s, not requesting sig shares. node=%d\n", __func__,
376  sessionInfo.quorumHash.ToString(), pfrom->GetId());
377  return true;
378  }
379 
380  LOCK(cs);
381  auto& nodeState = nodeStates[pfrom->GetId()];
382  auto session = nodeState.GetSessionByRecvId(inv.sessionId);
383  if (!session) {
384  return true;
385  }
386  session->announced.Merge(inv);
387  session->knows.Merge(inv);
388  return true;
389 }
390 
392 {
394  if (!GetSessionInfoByRecvId(pfrom->GetId(), inv.sessionId, sessionInfo)) {
395  return true;
396  }
397 
398  if (!VerifySigSharesInv(pfrom->GetId(), sessionInfo.llmqType, inv)) {
399  return false;
400  }
401 
402  // TODO for PoSe, we should consider propagating shares even if we already have a recovered sig
403  if (quorumSigningManager->HasRecoveredSigForSession(sessionInfo.signHash)) {
404  return true;
405  }
406 
407  LogPrint(BCLog::LLMQ, "CSigSharesManager::%s -- signHash=%s, inv={%s}, node=%d\n", __func__,
408  sessionInfo.signHash.ToString(), inv.ToString(), pfrom->GetId());
409 
410  LOCK(cs);
411  auto& nodeState = nodeStates[pfrom->GetId()];
412  auto session = nodeState.GetSessionByRecvId(inv.sessionId);
413  if (!session) {
414  return true;
415  }
416  session->requested.Merge(inv);
417  session->knows.Merge(inv);
418  return true;
419 }
420 
422 {
424  if (!GetSessionInfoByRecvId(pfrom->GetId(), batchedSigShares.sessionId, sessionInfo)) {
425  return true;
426  }
427 
428  bool ban = false;
429  if (!PreVerifyBatchedSigShares(pfrom->GetId(), sessionInfo, batchedSigShares, ban)) {
430  return !ban;
431  }
432 
433  std::vector<CSigShare> sigShares;
434  sigShares.reserve(batchedSigShares.sigShares.size());
435 
436  {
437  LOCK(cs);
438  auto& nodeState = nodeStates[pfrom->GetId()];
439 
440  for (size_t i = 0; i < batchedSigShares.sigShares.size(); i++) {
441  CSigShare sigShare = RebuildSigShare(sessionInfo, batchedSigShares, i);
442  nodeState.requestedSigShares.Erase(sigShare.GetKey());
443 
444  // TODO track invalid sig shares received for PoSe?
445  // It's important to only skip seen *valid* sig shares here. If a node sends us a
446  // batch of mostly valid sig shares with a single invalid one and thus batched
447  // verification fails, we'd skip the valid ones in the future if received from other nodes
448  if (this->sigShares.Has(sigShare.GetKey())) {
449  continue;
450  }
451 
452  // TODO for PoSe, we should consider propagating shares even if we already have a recovered sig
453  if (quorumSigningManager->HasRecoveredSigForId((Consensus::LLMQType)sigShare.llmqType, sigShare.id)) {
454  continue;
455  }
456 
457  sigShares.emplace_back(sigShare);
458  }
459  }
460 
461  LogPrint(BCLog::LLMQ, "CSigSharesManager::%s -- signHash=%s, shares=%d, new=%d, inv={%s}, node=%d\n", __func__,
462  sessionInfo.signHash.ToString(), batchedSigShares.sigShares.size(), sigShares.size(), batchedSigShares.ToInvString(), pfrom->GetId());
463 
464  if (sigShares.empty()) {
465  return true;
466  }
467 
468  LOCK(cs);
469  auto& nodeState = nodeStates[pfrom->GetId()];
470  for (auto& s : sigShares) {
471  nodeState.pendingIncomingSigShares.Add(s.GetKey(), s);
472  }
473  return true;
474 }
475 
477 {
478  auto quorum = quorumManager->GetQuorum((Consensus::LLMQType)sigShare.llmqType, sigShare.quorumHash);
479  if (!quorum) {
480  return;
481  }
482  if (!llmq::utils::IsQuorumActive((Consensus::LLMQType)sigShare.llmqType, quorum->qc.quorumHash)) {
483  // quorum is too old
484  return;
485  }
486  if (!quorum->IsMember(activeMasternodeManager->GetProTx())) {
487  // we're not a member so we can't verify it (we actually shouldn't have received it)
488  return;
489  }
490  if (quorum->quorumVvec == nullptr) {
491  // TODO we should allow to ask other nodes for the quorum vvec if we missed it in the DKG
492  LogPrint(BCLog::LLMQ, "CSigSharesManager::%s -- we don't have the quorum vvec for %s, no verification possible. node=%d\n", __func__,
493  quorum->qc.quorumHash.ToString(), fromId);
494  return;
495  }
496 
497  if (sigShare.quorumMember >= quorum->members.size()) {
498  LogPrint(BCLog::LLMQ, "CSigSharesManager::%s -- quorumMember out of bounds\n", __func__);
499  BanNode(fromId);
500  return;
501  }
502  if (!quorum->qc.validMembers[sigShare.quorumMember]) {
503  LogPrint(BCLog::LLMQ, "CSigSharesManager::%s -- quorumMember not valid\n", __func__);
504  BanNode(fromId);
505  return;
506  }
507 
508  {
509  LOCK(cs);
510 
511  if (sigShares.Has(sigShare.GetKey())) {
512  return;
513  }
514 
515  if (quorumSigningManager->HasRecoveredSigForId((Consensus::LLMQType)sigShare.llmqType, sigShare.id)) {
516  return;
517  }
518 
519  auto& nodeState = nodeStates[fromId];
520  nodeState.pendingIncomingSigShares.Add(sigShare.GetKey(), sigShare);
521  }
522 
523  LogPrint(BCLog::LLMQ, "CSigSharesManager::%s -- signHash=%s, id=%s, msgHash=%s, member=%d, node=%d\n", __func__,
524  sigShare.GetSignHash().ToString(), sigShare.id.ToString(), sigShare.msgHash.ToString(), sigShare.quorumMember, fromId);
525 }
526 
527 bool CSigSharesManager::PreVerifyBatchedSigShares(NodeId nodeId, const CSigSharesNodeState::SessionInfo& session, const CBatchedSigShares& batchedSigShares, bool& retBan)
528 {
529  retBan = false;
530 
531  if (!llmq::utils::IsQuorumActive(session.llmqType, session.quorum->qc.quorumHash)) {
532  // quorum is too old
533  return false;
534  }
535  if (!session.quorum->IsMember(activeMasternodeManager->GetProTx())) {
536  // we're not a member so we can't verify it (we actually shouldn't have received it)
537  return false;
538  }
539  if (session.quorum->quorumVvec == nullptr) {
540  // TODO we should allow to ask other nodes for the quorum vvec if we missed it in the DKG
541  LogPrint(BCLog::LLMQ, "CSigSharesManager::%s -- we don't have the quorum vvec for %s, no verification possible. node=%d\n", __func__,
542  session.quorumHash.ToString(), nodeId);
543  return false;
544  }
545 
546  std::unordered_set<uint16_t> dupMembers;
547 
548  for (size_t i = 0; i < batchedSigShares.sigShares.size(); i++) {
549  auto quorumMember = batchedSigShares.sigShares[i].first;
550  if (!dupMembers.emplace(quorumMember).second) {
551  retBan = true;
552  return false;
553  }
554 
555  if (quorumMember >= session.quorum->members.size()) {
556  LogPrintf("CSigSharesManager::%s -- quorumMember out of bounds\n", __func__);
557  retBan = true;
558  return false;
559  }
560  if (!session.quorum->qc.validMembers[quorumMember]) {
561  LogPrintf("CSigSharesManager::%s -- quorumMember not valid\n", __func__);
562  retBan = true;
563  return false;
564  }
565  }
566  return true;
567 }
568 
570  size_t maxUniqueSessions,
571  std::unordered_map<NodeId, std::vector<CSigShare>>& retSigShares,
572  std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& retQuorums)
573 {
574  {
575  LOCK(cs);
576  if (nodeStates.empty()) {
577  return;
578  }
579 
580  // This will iterate node states in random order and pick one sig share at a time. This avoids processing
581  // of large batches at once from the same node while other nodes also provided shares. If we wouldn't do this,
582  // other nodes would be able to poison us with a large batch with N-1 valid shares and the last one being
583  // invalid, making batch verification fail and revert to per-share verification, which in turn would slow down
584  // the whole verification process
585 
586  std::unordered_set<std::pair<NodeId, uint256>, StaticSaltedHasher> uniqueSignHashes;
587  llmq::utils::IterateNodesRandom(nodeStates, [&]() { return uniqueSignHashes.size() < maxUniqueSessions; }, [&](NodeId nodeId, CSigSharesNodeState& ns) {
588  if (ns.pendingIncomingSigShares.Empty()) {
589  return false;
590  }
591  auto& sigShare = *ns.pendingIncomingSigShares.GetFirst();
592 
593  bool alreadyHave = this->sigShares.Has(sigShare.GetKey());
594  if (!alreadyHave) {
595  uniqueSignHashes.emplace(nodeId, sigShare.GetSignHash());
596  retSigShares[nodeId].emplace_back(sigShare);
597  }
598  ns.pendingIncomingSigShares.Erase(sigShare.GetKey());
599  return !ns.pendingIncomingSigShares.Empty(); }, rnd);
600 
601  if (retSigShares.empty()) {
602  return;
603  }
604  }
605 
606  {
607  LOCK(cs_main);
608 
609  // For the convenience of the caller, also build a map of quorumHash -> quorum
610 
611  for (auto& p : retSigShares) {
612  for (auto& sigShare : p.second) {
613  auto llmqType = (Consensus::LLMQType)sigShare.llmqType;
614 
615  auto k = std::make_pair(llmqType, sigShare.quorumHash);
616  if (retQuorums.count(k)) {
617  continue;
618  }
619 
620  CQuorumCPtr quorum = quorumManager->GetQuorum(llmqType, sigShare.quorumHash);
621  assert(quorum != nullptr);
622  retQuorums.emplace(k, quorum);
623  }
624  }
625  }
626 }
627 
629 {
630  std::unordered_map<NodeId, std::vector<CSigShare>> sigSharesByNodes;
631  std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher> quorums;
632 
633  CollectPendingSigSharesToVerify(32, sigSharesByNodes, quorums);
634  if (sigSharesByNodes.empty()) {
635  return false;
636  }
637 
638  // It's ok to perform insecure batched verification here as we verify against the quorum public key shares,
639  // which are not craftable by individual entities, making the rogue public key attack impossible
640  CBLSBatchVerifier<NodeId, SigShareKey> batchVerifier(false, true);
641 
642  cxxtimer::Timer prepareTimer(true);
643  size_t verifyCount = 0;
644  for (auto& p : sigSharesByNodes) {
645  auto nodeId = p.first;
646  auto& v = p.second;
647 
648  for (auto& sigShare : v) {
649  if (quorumSigningManager->HasRecoveredSigForId((Consensus::LLMQType)sigShare.llmqType, sigShare.id)) {
650  continue;
651  }
652 
653  // we didn't check this earlier because we use a lazy BLS signature and tried to avoid doing the expensive
654  // deserialization in the message thread
655  if (!sigShare.sigShare.Get().IsValid()) {
656  BanNode(nodeId);
657  // don't process any additional shares from this node
658  break;
659  }
660 
661  auto quorum = quorums.at(std::make_pair((Consensus::LLMQType)sigShare.llmqType, sigShare.quorumHash));
662  auto pubKeyShare = quorum->GetPubKeyShare(sigShare.quorumMember);
663 
664  if (!pubKeyShare.IsValid()) {
665  // this should really not happen (we already ensured we have the quorum vvec,
666  // so we should also be able to create all pubkey shares)
667  LogPrintf("CSigSharesManager::%s -- pubKeyShare is invalid, which should not be possible here.\n", __func__);
668  assert(false);
669  }
670 
671  batchVerifier.PushMessage(nodeId, sigShare.GetKey(), sigShare.GetSignHash(), sigShare.sigShare.Get(), pubKeyShare);
672  verifyCount++;
673  }
674  }
675  prepareTimer.stop();
676 
677  cxxtimer::Timer verifyTimer(true);
678  batchVerifier.Verify();
679  verifyTimer.stop();
680 
681  LogPrint(BCLog::LLMQ, "CSigSharesManager::%s -- verified sig shares. count=%d, pt=%d, vt=%d, nodes=%d\n", __func__, verifyCount, prepareTimer.count(), verifyTimer.count(), sigSharesByNodes.size());
682 
683  for (auto& p : sigSharesByNodes) {
684  auto nodeId = p.first;
685  auto& v = p.second;
686 
687  if (batchVerifier.badSources.count(nodeId)) {
688  LogPrintf("CSigSharesManager::%s -- invalid sig shares from other node, banning peer=%d\n",
689  __func__, nodeId);
690  // this will also cause re-requesting of the shares that were sent by this node
691  BanNode(nodeId);
692  continue;
693  }
694 
695  ProcessPendingSigSharesFromNode(nodeId, v, quorums, connman);
696  }
697 
698  return true;
699 }
700 
701 // It's ensured that no duplicates are passed to this method
703  const std::vector<CSigShare>& sigShares,
704  const std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& quorums,
705  CConnman& connman)
706 {
707  LOCK(cs);
708  auto& nodeState = nodeStates[nodeId];
709 
710  cxxtimer::Timer t(true);
711  for (auto& sigShare : sigShares) {
712  auto quorumKey = std::make_pair((Consensus::LLMQType)sigShare.llmqType, sigShare.quorumHash);
713  ProcessSigShare(nodeId, sigShare, connman, quorums.at(quorumKey));
714  }
715  t.stop();
716 
717  LogPrint(BCLog::LLMQ, "CSigSharesManager::%s -- processed sigShare batch. shares=%d, time=%d, node=%d\n", __func__,
718  sigShares.size(), t.count(), nodeId);
719 }
720 
721 // sig shares are already verified when entering this method
722 void CSigSharesManager::ProcessSigShare(NodeId nodeId, const CSigShare& sigShare, CConnman& connman, const CQuorumCPtr& quorum)
723 {
724  auto llmqType = quorum->params.type;
725 
726  bool canTryRecovery = false;
727 
728  // prepare node set for direct-push in case this is our sig share
729  std::set<NodeId> quorumNodes;
730 
731  if (quorumSigningManager->HasRecoveredSigForId(llmqType, sigShare.id)) {
732  return;
733  }
734 
735  {
736  LOCK(cs);
737 
738  if (!sigShares.Add(sigShare.GetKey(), sigShare)) {
739  return;
740  }
741  sigSharesToAnnounce.Add(sigShare.GetKey(), true);
742 
743  // Update the time we've seen the last sigShare
745 
746  // TODO: unreachable
747  if (!quorumNodes.empty()) {
748  // don't announce and wait for other nodes to request this share and directly send it to them
749  // there is no way the other nodes know about this share as this is the one created on this node
750  for (auto otherNodeId : quorumNodes) {
751  auto& nodeState = nodeStates[otherNodeId];
752  auto& session = nodeState.GetOrCreateSessionFromShare(sigShare);
753  session.quorum = quorum;
754  session.requested.Set(sigShare.quorumMember, true);
755  session.knows.Set(sigShare.quorumMember, true);
756  }
757  }
758 
759  size_t sigShareCount = sigShares.CountForSignHash(sigShare.GetSignHash());
760  if (sigShareCount >= quorum->params.threshold) {
761  canTryRecovery = true;
762  }
763  }
764 
765  if (canTryRecovery) {
766  TryRecoverSig(quorum, sigShare.id, sigShare.msgHash, connman);
767  }
768 }
769 
770 void CSigSharesManager::TryRecoverSig(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash, CConnman& connman)
771 {
772  if (quorumSigningManager->HasRecoveredSigForId(quorum->params.type, id)) {
773  return;
774  }
775 
776  std::vector<CBLSSignature> sigSharesForRecovery;
777  std::vector<CBLSId> idsForRecovery;
778  {
779  LOCK(cs);
780 
781  auto k = std::make_pair(quorum->params.type, id);
782 
783  auto signHash = llmq::utils::BuildSignHash(quorum->params.type, quorum->qc.quorumHash, id, msgHash);
784  auto sigShares = this->sigShares.GetAllForSignHash(signHash);
785  if (!sigShares) {
786  return;
787  }
788 
789  sigSharesForRecovery.reserve((size_t)quorum->params.threshold);
790  idsForRecovery.reserve((size_t)quorum->params.threshold);
791  for (auto it = sigShares->begin(); it != sigShares->end() && sigSharesForRecovery.size() < quorum->params.threshold; ++it) {
792  auto& sigShare = it->second;
793  sigSharesForRecovery.emplace_back(sigShare.sigShare.Get());
794  idsForRecovery.emplace_back(CBLSId(quorum->members[sigShare.quorumMember]->proTxHash));
795  }
796 
797  // check if we can recover the final signature
798  if (sigSharesForRecovery.size() < quorum->params.threshold) {
799  return;
800  }
801  }
802 
803  // now recover it
804  cxxtimer::Timer t(true);
805  CBLSSignature recoveredSig;
806  if (!recoveredSig.Recover(sigSharesForRecovery, idsForRecovery)) {
807  LogPrintf("CSigSharesManager::%s -- failed to recover signature. id=%s, msgHash=%s, time=%d\n", __func__,
808  id.ToString(), msgHash.ToString(), t.count());
809  return;
810  }
811 
812  LogPrint(BCLog::LLMQ, "CSigSharesManager::%s -- recovered signature. id=%s, msgHash=%s, time=%d\n", __func__,
813  id.ToString(), msgHash.ToString(), t.count());
814 
815  CRecoveredSig rs;
816  rs.llmqType = quorum->params.type;
817  rs.quorumHash = quorum->qc.quorumHash;
818  rs.id = id;
819  rs.msgHash = msgHash;
820  rs.sig.Set(recoveredSig);
821  rs.UpdateHash();
822 
823  // There should actually be no need to verify the self-recovered signatures as it should always succeed. Let's
824  // however still verify it from time to time, so that we have a chance to catch bugs. We do only this sporadic
825  // verification because this is unbatched and thus slow verification that happens here.
826  if (((recoveredSigsCounter++) % 100) == 0) {
827  auto signHash = llmq::utils::BuildSignHash(rs);
828  bool valid = recoveredSig.VerifyInsecure(quorum->qc.quorumPublicKey, signHash);
829  if (!valid) {
830  // this should really not happen as we have verified all signature shares before
831  LogPrintf("CSigSharesManager::%s -- own recovered signature is invalid. id=%s, msgHash=%s\n", __func__,
832  id.ToString(), msgHash.ToString());
833  return;
834  }
835  }
836 
837  quorumSigningManager->ProcessRecoveredSig(-1, rs, quorum, connman);
838 }
839 
841 {
842  assert(attempt < quorum->members.size());
843 
844  std::vector<std::pair<uint256, CDeterministicMNCPtr>> v;
845  v.reserve(quorum->members.size());
846  for (const auto& dmn : quorum->members) {
847  auto h = ::SerializeHash(std::make_pair(dmn->proTxHash, id));
848  v.emplace_back(h, dmn);
849  }
850  std::sort(v.begin(), v.end());
851 
852  return v[attempt].second;
853 }
854 
855 // TODO unused
856 void CSigSharesManager::CollectSigSharesToRequest(std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>>& sigSharesToRequest)
857 {
859 
860  int64_t now = GetAdjustedTime();
861  const size_t maxRequestsForNode = 32;
862 
863  // avoid requesting from same nodes all the time
864  std::vector<NodeId> shuffledNodeIds;
865  shuffledNodeIds.reserve(nodeStates.size());
866  for (auto& p : nodeStates) {
867  if (p.second.sessions.empty()) {
868  continue;
869  }
870  shuffledNodeIds.emplace_back(p.first);
871  }
872  Shuffle(shuffledNodeIds.begin(), shuffledNodeIds.end(), rnd);
873 
874  for (auto& nodeId : shuffledNodeIds) {
875  auto& nodeState = nodeStates[nodeId];
876 
877  if (nodeState.banned) {
878  continue;
879  }
880 
881  nodeState.requestedSigShares.EraseIf([&](const SigShareKey& k, int64_t t) {
882  if (now - t >= SIG_SHARE_REQUEST_TIMEOUT) {
883  // timeout while waiting for this one, so retry it with another node
884  LogPrint(BCLog::LLMQ, "CSigSharesManager::CollectSigSharesToRequest -- timeout while waiting for %s-%d, node=%d\n",
885  k.first.ToString(), k.second, nodeId);
886  return true;
887  }
888  return false;
889  });
890 
891  decltype(sigSharesToRequest.begin()->second)* invMap = nullptr;
892 
893  for (auto& p2 : nodeState.sessions) {
894  auto& signHash = p2.first;
895  auto& session = p2.second;
896 
897  if (quorumSigningManager->HasRecoveredSigForSession(signHash)) {
898  continue;
899  }
900 
901  for (size_t i = 0; i < session.announced.inv.size(); i++) {
902  if (!session.announced.inv[i]) {
903  continue;
904  }
905  auto k = std::make_pair(signHash, (uint16_t)i);
906  if (sigShares.Has(k)) {
907  // we already have it
908  session.announced.inv[i] = false;
909  continue;
910  }
911  if (nodeState.requestedSigShares.Size() >= maxRequestsForNode) {
912  // too many pending requests for this node
913  break;
914  }
915  auto p = sigSharesRequested.Get(k);
916  if (p) {
917  if (now - p->second >= SIG_SHARE_REQUEST_TIMEOUT && nodeId != p->first) {
918  // other node timed out, re-request from this node
919  LogPrint(BCLog::LLMQ, "CSigSharesManager::%s -- other node timeout while waiting for %s-%d, re-request from=%d, node=%d\n", __func__,
920  k.first.ToString(), k.second, nodeId, p->first);
921  } else {
922  continue;
923  }
924  }
925  // if we got this far we should do a request
926 
927  // track when we initiated the request so that we can detect timeouts
928  nodeState.requestedSigShares.Add(k, now);
929 
930  // don't request it from other nodes until a timeout happens
931  auto& r = sigSharesRequested.GetOrAdd(k);
932  r.first = nodeId;
933  r.second = now;
934 
935  if (!invMap) {
936  invMap = &sigSharesToRequest[nodeId];
937  }
938  auto& inv = (*invMap)[signHash];
939  if (inv.inv.empty()) {
940  const auto& params = Params().GetConsensus().llmqs.at((Consensus::LLMQType)session.llmqType);
941  inv.Init((size_t)params.size);
942  }
943  inv.inv[k.second] = true;
944 
945  // don't request it again from this node
946  session.announced.inv[i] = false;
947  }
948  }
949  }
950 }
951 
952 // TODO: unused
953 void CSigSharesManager::CollectSigSharesToSend(std::unordered_map<NodeId, std::unordered_map<uint256, CBatchedSigShares, StaticSaltedHasher>>& sigSharesToSend)
954 {
956 
957  for (auto& p : nodeStates) {
958  auto nodeId = p.first;
959  auto& nodeState = p.second;
960 
961  if (nodeState.banned) {
962  continue;
963  }
964 
965  decltype(sigSharesToSend.begin()->second)* sigSharesToSend2 = nullptr;
966 
967  for (auto& p2 : nodeState.sessions) {
968  auto& signHash = p2.first;
969  auto& session = p2.second;
970 
971  if (quorumSigningManager->HasRecoveredSigForSession(signHash)) {
972  continue;
973  }
974 
975  CBatchedSigShares batchedSigShares;
976 
977  for (size_t i = 0; i < session.requested.inv.size(); i++) {
978  if (!session.requested.inv[i]) {
979  continue;
980  }
981  session.requested.inv[i] = false;
982 
983  auto k = std::make_pair(signHash, (uint16_t)i);
984  const CSigShare* sigShare = sigShares.Get(k);
985  if (!sigShare) {
986  // he requested something we don'have
987  session.requested.inv[i] = false;
988  continue;
989  }
990 
991  batchedSigShares.sigShares.emplace_back((uint16_t)i, sigShare->sigShare);
992  }
993 
994  if (!batchedSigShares.sigShares.empty()) {
995  if (sigSharesToSend2 == nullptr) {
996  // only create the map if we actually add a batched sig
997  sigSharesToSend2 = &sigSharesToSend[nodeId];
998  }
999  (*sigSharesToSend2).emplace(signHash, std::move(batchedSigShares));
1000  }
1001  }
1002  }
1003 }
1004 
1005 void CSigSharesManager::CollectSigSharesToSend(std::unordered_map<NodeId, std::vector<CSigShare>>& sigSharesToSend, const std::vector<CNode*>& vNodes)
1006 {
1007  AssertLockHeld(cs);
1008 
1009  std::unordered_map<uint256, CNode*> proTxToNode;
1010  for (const auto& pnode : vNodes) {
1011  if (pnode->verifiedProRegTxHash.IsNull()) {
1012  continue;
1013  }
1014  proTxToNode.emplace(pnode->verifiedProRegTxHash, pnode);
1015  }
1016 
1017  auto curTime = GetTime<std::chrono::milliseconds>().count();
1018 
1019  for (auto& p : signedSessions) {
1020  if (p.second.attempt >= p.second.quorum->params.recoveryMembers) {
1021  continue;
1022  }
1023 
1024  if (curTime >= p.second.nextAttemptTime) {
1025  int64_t waitTime = exp2(p.second.attempt) * EXP_SEND_FOR_RECOVERY_TIMEOUT;
1026  waitTime = std::min(MAX_SEND_FOR_RECOVERY_TIMEOUT, waitTime);
1027  p.second.nextAttemptTime = curTime + waitTime;
1028  auto dmn = SelectMemberForRecovery(p.second.quorum, p.second.sigShare.id, p.second.attempt);
1029  p.second.attempt++;
1030 
1031  LogPrint(BCLog::LLMQ, "CSigSharesManager::%s -- signHash=%s, sending to %s, attempt=%d\n", __func__,
1032  p.second.sigShare.GetSignHash().ToString(), dmn->proTxHash.ToString(), p.second.attempt);
1033 
1034  auto it = proTxToNode.find(dmn->proTxHash);
1035  if (it == proTxToNode.end()) {
1036  continue;
1037  }
1038 
1039  auto& m = sigSharesToSend[it->second->GetId()];
1040  m.emplace_back(p.second.sigShare);
1041  }
1042  }
1043 }
1044 
1045 // TODO unused
1046 void CSigSharesManager::CollectSigSharesToAnnounce(std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>>& sigSharesToAnnounce)
1047 {
1048  AssertLockHeld(cs);
1049 
1050  std::unordered_map<std::pair<Consensus::LLMQType, uint256>, std::unordered_set<NodeId>, StaticSaltedHasher> quorumNodesMap;
1051 
1052  this->sigSharesToAnnounce.ForEach([&](const SigShareKey& sigShareKey, bool) {
1053  auto& signHash = sigShareKey.first;
1054  auto quorumMember = sigShareKey.second;
1055  const CSigShare* sigShare = sigShares.Get(sigShareKey);
1056  if (!sigShare) {
1057  return;
1058  }
1059 
1060  // announce to the nodes which we know through the intra-quorum-communication system
1061  auto quorumKey = std::make_pair((Consensus::LLMQType)sigShare->llmqType, sigShare->quorumHash);
1062  auto it = quorumNodesMap.find(quorumKey);
1063  if (it == quorumNodesMap.end()) {
1064  auto nodeIds = g_connman->GetTierTwoConnMan()->getQuorumNodes(quorumKey.first, quorumKey.second);
1065  it = quorumNodesMap.emplace(std::piecewise_construct, std::forward_as_tuple(quorumKey), std::forward_as_tuple(nodeIds.begin(), nodeIds.end())).first;
1066  }
1067 
1068  auto& quorumNodes = it->second;
1069 
1070  for (auto& nodeId : quorumNodes) {
1071  auto& nodeState = nodeStates[nodeId];
1072 
1073  if (nodeState.banned) {
1074  continue;
1075  }
1076 
1077  auto& session = nodeState.GetOrCreateSessionFromShare(*sigShare);
1078 
1079  if (session.knows.inv[quorumMember]) {
1080  // he already knows that one
1081  continue;
1082  }
1083 
1084  auto& inv = sigSharesToAnnounce[nodeId][signHash];
1085  if (inv.inv.empty()) {
1086  const auto& params = Params().GetConsensus().llmqs.at((Consensus::LLMQType)sigShare->llmqType);
1087  inv.Init((size_t)params.size);
1088  }
1089  inv.inv[quorumMember] = true;
1090  session.knows.inv[quorumMember] = true;
1091  }
1092  });
1093 
1094  // don't announce these anymore
1095  this->sigSharesToAnnounce.Clear();
1096 }
1097 
1099 {
1100  std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>> sigSharesToRequest;
1101  std::unordered_map<NodeId, std::unordered_map<uint256, CBatchedSigShares, StaticSaltedHasher>> sigShareBatchesToSend;
1102  std::unordered_map<NodeId, std::vector<CSigShare>> sigSharesToSend;
1103  std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>> sigSharesToAnnounce;
1104  std::unordered_map<NodeId, std::vector<CSigSesAnn>> sigSessionAnnouncements;
1105 
1106  auto addSigSesAnnIfNeeded = [&](NodeId nodeId, const uint256& signHash) {
1107  auto& nodeState = nodeStates[nodeId];
1108  auto session = nodeState.GetSessionBySignHash(signHash);
1109  assert(session);
1110  if (session->sendSessionId == (uint32_t)-1) {
1111  session->sendSessionId = nodeState.nextSendSessionId++;
1112 
1113  CSigSesAnn sigSesAnn;
1114  sigSesAnn.sessionId = session->sendSessionId;
1115  sigSesAnn.llmqType = (uint8_t)session->llmqType;
1116  sigSesAnn.quorumHash = session->quorumHash;
1117  sigSesAnn.id = session->id;
1118  sigSesAnn.msgHash = session->msgHash;
1119 
1120  sigSessionAnnouncements[nodeId].emplace_back(sigSesAnn);
1121  }
1122  return session->sendSessionId;
1123  };
1124 
1125  std::vector<CNode*> vNodesCopy = g_connman->CopyNodeVector(CConnman::FullyConnectedOnly);
1126 
1127  {
1128  LOCK(cs);
1129  CollectSigSharesToSend(sigSharesToSend, vNodesCopy);
1130 
1131  for (auto& p : sigSharesToRequest) {
1132  for (auto& p2 : p.second) {
1133  p2.second.sessionId = addSigSesAnnIfNeeded(p.first, p2.first);
1134  }
1135  }
1136  for (auto& p : sigShareBatchesToSend) {
1137  for (auto& p2 : p.second) {
1138  p2.second.sessionId = addSigSesAnnIfNeeded(p.first, p2.first);
1139  }
1140  }
1141  for (auto& p : sigSharesToAnnounce) {
1142  for (auto& p2 : p.second) {
1143  p2.second.sessionId = addSigSesAnnIfNeeded(p.first, p2.first);
1144  }
1145  }
1146  }
1147 
1148  bool didSend = false;
1149 
1150  for (auto& pnode : vNodesCopy) {
1151  CNetMsgMaker msgMaker(pnode->GetSendVersion());
1152 
1153  auto it1 = sigSessionAnnouncements.find(pnode->GetId());
1154  if (it1 != sigSessionAnnouncements.end()) {
1155  std::vector<CSigSesAnn> msgs;
1156  msgs.reserve(it1->second.size());
1157  for (auto& sigSesAnn : it1->second) {
1158  LogPrint(BCLog::LLMQ, "CSigSharesManager::SendMessages -- QSIGSESANN signHash=%s, sessionId=%d, node=%d\n",
1159  llmq::utils::BuildSignHash(sigSesAnn).ToString(), sigSesAnn.sessionId, pnode->GetId());
1160  msgs.emplace_back(sigSesAnn);
1161  if (msgs.size() == MAX_MSGS_CNT_QSIGSESANN) {
1162  g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, msgs), false);
1163  msgs.clear();
1164  didSend = true;
1165  }
1166  }
1167  if (!msgs.empty()) {
1168  g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, msgs), false);
1169  didSend = true;
1170  }
1171  }
1172 
1173  auto it = sigSharesToRequest.find(pnode->GetId());
1174  if (it != sigSharesToRequest.end()) {
1175  std::vector<CSigSharesInv> msgs;
1176  for (auto& p : it->second) {
1177  assert(p.second.CountSet() != 0);
1178  LogPrint(BCLog::LLMQ, "CSigSharesManager::SendMessages -- QGETSIGSHARES signHash=%s, inv={%s}, node=%d\n",
1179  p.first.ToString(), p.second.ToString(), pnode->GetId());
1180  msgs.emplace_back(std::move(p.second));
1181  if (msgs.size() == MAX_MSGS_CNT_QGETSIGSHARES) {
1182  g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, msgs), false);
1183  msgs.clear();
1184  didSend = true;
1185  }
1186  }
1187  if (!msgs.empty()) {
1188  g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, msgs), false);
1189  didSend = true;
1190  }
1191  }
1192 
1193  auto jt = sigShareBatchesToSend.find(pnode->GetId());
1194  if (jt != sigShareBatchesToSend.end()) {
1195  size_t totalSigsCount = 0;
1196  std::vector<CBatchedSigShares> msgs;
1197  for (auto& p : jt->second) {
1198  assert(!p.second.sigShares.empty());
1199  LogPrint(BCLog::LLMQ, "CSigSharesManager::SendMessages -- QBSIGSHARES signHash=%s, inv={%s}, node=%d\n",
1200  p.first.ToString(), p.second.ToInvString(), pnode->GetId());
1201  if (totalSigsCount + p.second.sigShares.size() > MAX_MSGS_TOTAL_BATCHED_SIGS) {
1202  g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, msgs), false);
1203  msgs.clear();
1204  totalSigsCount = 0;
1205  didSend = true;
1206  }
1207  totalSigsCount += p.second.sigShares.size();
1208  msgs.emplace_back(std::move(p.second));
1209  }
1210  if (!msgs.empty()) {
1211  g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, std::move(msgs)), false);
1212  didSend = true;
1213  }
1214  }
1215 
1216  auto kt = sigSharesToAnnounce.find(pnode->GetId());
1217  if (kt != sigSharesToAnnounce.end()) {
1218  std::vector<CSigSharesInv> msgs;
1219  for (auto& p : kt->second) {
1220  assert(p.second.CountSet() != 0);
1221  LogPrint(BCLog::LLMQ, "CSigSharesManager::SendMessages -- QSIGSHARESINV signHash=%s, inv={%s}, node=%d\n",
1222  p.first.ToString(), p.second.ToString(), pnode->GetId());
1223  msgs.emplace_back(std::move(p.second));
1224  if (msgs.size() == MAX_MSGS_CNT_QSIGSHARESINV) {
1225  g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, msgs), false);
1226  msgs.clear();
1227  didSend = true;
1228  }
1229  }
1230  if (!msgs.empty()) {
1231  g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, msgs), false);
1232  didSend = true;
1233  }
1234  }
1235 
1236  auto lt = sigSharesToSend.find(pnode->GetId());
1237  if (lt != sigSharesToSend.end()) {
1238  std::vector<CSigShare> msgs;
1239  for (auto& sigShare : lt->second) {
1240  LogPrint(BCLog::LLMQ, "CSigSharesManager::SendMessages -- QSIGSHARE signHash=%s, node=%d\n",
1241  sigShare.GetSignHash().ToString(), pnode->GetId());
1242  msgs.emplace_back(std::move(sigShare));
1243  if (msgs.size() == MAX_MSGS_SIG_SHARES) {
1244  g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARE, msgs), false);
1245  msgs.clear();
1246  didSend = true;
1247  }
1248  }
1249  if (!msgs.empty()) {
1250  g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARE, msgs), false);
1251  didSend = true;
1252  }
1253  }
1254  }
1255 
1256  // looped through all nodes, release them
1257  g_connman->ReleaseNodeVector(vNodesCopy);
1258 
1259  return didSend;
1260 }
1261 
1263 {
1264  LOCK(cs);
1265  return nodeStates[nodeId].GetSessionInfoByRecvId(sessionId, retInfo);
1266 }
1267 
1269 {
1270  assert(idx < batchedSigShares.sigShares.size());
1271  auto& s = batchedSigShares.sigShares[idx];
1272  CSigShare sigShare;
1273  sigShare.llmqType = session.llmqType;
1274  sigShare.quorumHash = session.quorumHash;
1275  sigShare.quorumMember = s.first;
1276  sigShare.id = session.id;
1277  sigShare.msgHash = session.msgHash;
1278  sigShare.sigShare = s.second;
1279  sigShare.UpdateKey();
1280  return sigShare;
1281 }
1282 
1284 {
1285  int64_t now = GetAdjustedTime();
1286  if (now - lastCleanupTime < 5) {
1287  return;
1288  }
1289 
1290  // This map is first filled with all quorums found in all sig shares. Then we remove all inactive quorums and
1291  // loop through all sig shares again to find the ones belonging to the inactive quorums. We then delete the
1292  // sessions belonging to the sig shares. At the same time, we use this map as a cache when we later need to resolve
1293  // quorumHash -> quorumPtr (as GetQuorum() requires cs_main, leading to deadlocks with cs held)
1294  std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher> quorums;
1295 
1296  {
1297  LOCK(cs);
1298  sigShares.ForEach([&](const SigShareKey& k, const CSigShare& sigShare) {
1299  quorums.emplace(std::make_pair((Consensus::LLMQType) sigShare.llmqType, sigShare.quorumHash), nullptr);
1300  });
1301  }
1302 
1303  // Find quorums which became inactive
1304  for (auto it = quorums.begin(); it != quorums.end(); ) {
1305  if (llmq::utils::IsQuorumActive(it->first.first, it->first.second)) {
1306  it->second = quorumManager->GetQuorum(it->first.first, it->first.second);
1307  ++it;
1308  } else {
1309  it = quorums.erase(it);
1310  }
1311  }
1312 
1313  {
1314  // Now delete sessions which are for inactive quorums
1315  LOCK(cs);
1316  std::unordered_set<uint256, StaticSaltedHasher> inactiveQuorumSessions;
1317  sigShares.ForEach([&](const SigShareKey& k, const CSigShare& sigShare) {
1318  if (!quorums.count(std::make_pair((Consensus::LLMQType)sigShare.llmqType, sigShare.quorumHash))) {
1319  inactiveQuorumSessions.emplace(sigShare.GetSignHash());
1320  }
1321  });
1322  for (auto& signHash : inactiveQuorumSessions) {
1323  RemoveSigSharesForSession(signHash);
1324  }
1325  }
1326 
1327  {
1328  LOCK(cs);
1329 
1330  // Remove sessions which were successfully recovered
1331  std::unordered_set<uint256, StaticSaltedHasher> doneSessions;
1332  sigShares.ForEach([&](const SigShareKey& k, const CSigShare& sigShare) {
1333  if (doneSessions.count(sigShare.GetSignHash())) {
1334  return;
1335  }
1336  if (quorumSigningManager->HasRecoveredSigForSession(sigShare.GetSignHash())) {
1337  doneSessions.emplace(sigShare.GetSignHash());
1338  }
1339  });
1340  for (auto& signHash : doneSessions) {
1341  RemoveSigSharesForSession(signHash);
1342  }
1343 
1344  // Remove sessions which timed out
1345  std::unordered_set<uint256, StaticSaltedHasher> timeoutSessions;
1346  for (auto& p : timeSeenForSessions) {
1347  auto& signHash = p.first;
1348  int64_t lastSeenTime = p.second;
1349 
1350  if (now - lastSeenTime >= SESSION_NEW_SHARES_TIMEOUT) {
1351  timeoutSessions.emplace(signHash);
1352  }
1353  }
1354  for (auto& signHash : timeoutSessions) {
1355  size_t count = sigShares.CountForSignHash(signHash);
1356 
1357  if (count > 0) {
1358  auto m = sigShares.GetAllForSignHash(signHash);
1359  assert(m);
1360 
1361  auto& oneSigShare = m->begin()->second;
1362 
1363  std::string strMissingMembers;
1364  if (LogAcceptCategory(BCLog::LogFlags::LLMQ)) {
1365  auto quorumIt = quorums.find(std::make_pair((Consensus::LLMQType)oneSigShare.llmqType, oneSigShare.quorumHash));
1366  if (quorumIt != quorums.end()) {
1367  auto& quorum = quorumIt->second;
1368  for (size_t i = 0; i < quorum->members.size(); i++) {
1369  if (!m->count((uint16_t)i)) {
1370  auto& dmn = quorum->members[i];
1371  strMissingMembers += strprintf("\n %s", dmn->proTxHash.ToString());
1372  }
1373  }
1374  }
1375  }
1376 
1377  LogPrint(BCLog::LLMQ, "CSigSharesManager::%s -- signing session timed out. signHash=%s, id=%s, msgHash=%s, sigShareCount=%d, missingMembers=%s\n", __func__,
1378  signHash.ToString(), oneSigShare.id.ToString(), oneSigShare.msgHash.ToString(), count, strMissingMembers);
1379  } else {
1380  LogPrint(BCLog::LLMQ, "CSigSharesManager::%s -- signing session timed out. signHash=%s, sigShareCount=%d\n", __func__,
1381  signHash.ToString(), count);
1382  }
1383  RemoveSigSharesForSession(signHash);
1384  }
1385  }
1386 
1387  // Find node states for peers that disappeared from CConnman
1388  std::unordered_set<NodeId> nodeStatesToDelete;
1389  for (auto& p : nodeStates) {
1390  nodeStatesToDelete.emplace(p.first);
1391  }
1392  g_connman->ForEachNode([&](CNode* pnode) {
1393  nodeStatesToDelete.erase(pnode->GetId());
1394  });
1395 
1396  // Now delete these node states
1397  LOCK(cs);
1398  for (auto nodeId : nodeStatesToDelete) {
1399  auto& nodeState = nodeStates[nodeId];
1400  // remove global requested state to force a re-request from another node
1401  nodeState.requestedSigShares.ForEach([&](const SigShareKey& k, bool) {
1403  });
1404  nodeStates.erase(nodeId);
1405  }
1406 
1408 }
1409 
1411 {
1412  for (auto& p : nodeStates) {
1413  auto& ns = p.second;
1414  ns.RemoveSession(signHash);
1415  }
1416 
1419  sigShares.EraseAllForSignHash(signHash);
1420  signedSessions.erase(signHash);
1421  timeSeenForSessions.erase(signHash);
1422 }
1423 
1425 {
1426  // Called regularly to cleanup local node states for banned nodes
1427 
1428  LOCK2(cs, cs_main);
1429  std::unordered_set<NodeId> toRemove;
1430  for (auto it = nodeStates.begin(); it != nodeStates.end();) {
1431  if (IsBanned(it->first)) {
1432  // re-request sigshares from other nodes
1433  it->second.requestedSigShares.ForEach([&](const SigShareKey& k, int64_t) {
1435  });
1436  it = nodeStates.erase(it);
1437  } else {
1438  ++it;
1439  }
1440  }
1441 }
1442 
1444 {
1445  if (nodeId == -1) {
1446  return;
1447  }
1448 
1449  {
1450  LOCK(cs_main);
1451  Misbehaving(nodeId, 100);
1452  }
1453 
1454  LOCK(cs);
1455  auto it = nodeStates.find(nodeId);
1456  if (it == nodeStates.end()) {
1457  return;
1458  }
1459  auto& nodeState = it->second;
1460 
1461  // Whatever we requested from him, let's request it from someone else now
1462  nodeState.requestedSigShares.ForEach([&](const SigShareKey& k, int64_t) {
1464  });
1465  nodeState.requestedSigShares.Clear();
1466 
1467  nodeState.banned = true;
1468 }
1469 
1471 {
1472  int64_t lastSendTime = 0;
1473  while (!interruptSigningShare) {
1474  bool didWork = false;
1475 
1477  didWork |= quorumSigningManager->ProcessPendingRecoveredSigs(*g_connman);
1478  didWork |= ProcessPendingSigShares(*g_connman);
1479  didWork |= SignPendingSigShares();
1480 
1481  if (GetTimeMillis() - lastSendTime > 100) {
1482  SendMessages();
1483  lastSendTime = GetTimeMillis();
1484  }
1485 
1486  Cleanup();
1487  quorumSigningManager->Cleanup();
1488 
1489  // TODO Wakeup when pending signing is needed?
1490  if (!didWork) {
1491  if (!interruptSigningShare.sleep_for(std::chrono::milliseconds(100))) {
1492  return;
1493  }
1494  }
1495  }
1496 }
1497 
1498 void CSigSharesManager::AsyncSign(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash)
1499 {
1500  LOCK(cs);
1501  pendingSigns.emplace_back(quorum, id, msgHash);
1502 }
1503 
1505 {
1506  std::vector<std::tuple<const CQuorumCPtr, uint256, uint256>> v;
1507  {
1508  LOCK(cs);
1509  v = std::move(pendingSigns);
1510  }
1511 
1512  for (auto& t : v) {
1513  Sign(std::get<0>(t), std::get<1>(t), std::get<2>(t));
1514  }
1515 
1516  return !v.empty();
1517 }
1518 
1519 void CSigSharesManager::Sign(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash)
1520 {
1521  cxxtimer::Timer t(true);
1522 
1523  if (!quorum->IsValidMember(activeMasternodeManager->GetProTx())) {
1524  return;
1525  }
1526 
1527  CBLSSecretKey skShare = quorum->GetSkShare();
1528  if (!skShare.IsValid()) {
1529  LogPrint(BCLog::LLMQ, "CSigSharesManager::%s -- we don't have our skShare for quorum %s\n", __func__, quorum->qc.quorumHash.ToString());
1530  return;
1531  }
1532 
1533  int memberIdx = quorum->GetMemberIndex(activeMasternodeManager->GetProTx());
1534  if (memberIdx == -1) {
1535  // this should really not happen (IsValidMember gave true)
1536  return;
1537  }
1538 
1539  CSigShare sigShare;
1540  sigShare.llmqType = quorum->params.type;
1541  sigShare.quorumHash = quorum->qc.quorumHash;
1542  sigShare.id = id;
1543  sigShare.msgHash = msgHash;
1544  sigShare.quorumMember = (uint16_t)memberIdx;
1545  uint256 signHash = llmq::utils::BuildSignHash(sigShare);
1546 
1547  sigShare.sigShare.Set(skShare.Sign(signHash));
1548  if (!sigShare.sigShare.Get().IsValid()) {
1549  LogPrintf("CSigSharesManager::%s -- failed to sign sigShare. signHahs=%s, id=%s, msgHash=%s, time=%s\n", __func__,
1550  signHash.ToString(), sigShare.id.ToString(), sigShare.msgHash.ToString(), t.count());
1551  return;
1552  }
1553 
1554  sigShare.UpdateKey();
1555 
1556  LogPrint(BCLog::LLMQ, "CSigSharesManager::%s -- signed sigShare. signHash=%s, id=%s, msgHash=%s, llmqType=%d, quorum=%s, time=%s\n", __func__,
1557  signHash.ToString(), sigShare.id.ToString(), sigShare.msgHash.ToString(), quorum->params.type, quorum->qc.quorumHash.ToString(), t.count());
1558  ProcessSigShare(-1, sigShare, *g_connman, quorum);
1559 
1560  LOCK(cs);
1561  auto& session = signedSessions[sigShare.GetSignHash()];
1562  session.sigShare = sigShare;
1563  session.quorum = quorum;
1564  session.nextAttemptTime = 0;
1565  session.attempt = 0;
1566 }
1567 
1568 // causes all known sigShares to be re-announced
1569 void CSigSharesManager::ForceReAnnouncement(const CQuorumCPtr& quorum, Consensus::LLMQType llmqType, const uint256& id, const uint256& msgHash)
1570 {
1571  LOCK(cs);
1572  auto signHash = llmq::utils::BuildSignHash(llmqType, quorum->qc.quorumHash, id, msgHash);
1573  auto sigs = sigShares.GetAllForSignHash(signHash);
1574  if (sigs) {
1575  for (auto& p : *sigs) {
1576  // re-announce every sigshare to every node
1577  sigSharesToAnnounce.Add(std::make_pair(signHash, p.first), true);
1578  }
1579  }
1580  for (auto& p : nodeStates) {
1581  CSigSharesNodeState& nodeState = p.second;
1582  auto session = nodeState.GetSessionBySignHash(signHash);
1583  if (!session) {
1584  continue;
1585  }
1586  // pretend that the other node doesn't know about any shares so that we re-announce everything
1587  session->knows.SetAll(false);
1588  // we need to use a new session id as we don't know if the other node has run into a timeout already
1589  session->sendSessionId = (uint32_t)-1;
1590  }
1591 }
1592 
1594 {
1595  LOCK(cs);
1597 }
1598 
1599 } // namespace llmq
CActiveDeterministicMasternodeManager * activeMasternodeManager
const CChainParams & Params()
Return the currently selected parameters.
std::set< SourceId > badSources
void PushMessage(const SourceId &sourceId, const MessageId &msgId, const uint256 &msgHash, const CBLSSignature &sig, const CBLSPublicKey &pubKey)
const BLSObject & Get() const
Definition: bls_wrapper.h:350
void Set(const BLSObject &_obj)
Definition: bls_wrapper.h:341
CBLSSignature Sign(const uint256 &hash) const
bool VerifyInsecure(const CBLSPublicKey &pubKey, const uint256 &hash) const
bool Recover(const std::vector< CBLSSignature > &sigs, const std::vector< CBLSId > &ids)
bool IsValid() const
Definition: bls_wrapper.h:87
size_type size() const
Definition: streams.h:165
const Consensus::Params & GetConsensus() const
Definition: chainparams.h:72
Definition: net.h:145
constexpr static const CFullyConnectedOnly FullyConnectedOnly
Definition: net.h:221
CSerializedNetMsg Make(int nFlags, std::string sCommand, Args &&... args)
Information about a peer.
Definition: net.h:669
NodeId GetId() const
Definition: net.h:825
bool sleep_for(std::chrono::milliseconds rel_time)
std::string ToString() const
Definition: uint256.cpp:65
bool IsNull() const
Definition: uint256.h:36
This class works as a stopwatch.
Definition: cxxtimer.h:37
void stop()
Stop/pause the timer.
Definition: cxxtimer.h:151
duration_t::rep count() const
Return the elapsed time.
Definition: cxxtimer.h:169
std::vector< std::pair< uint16_t, CBLSLazySignature > > sigShares
CBLSLazySignature sig
std::string ToString() const
const SigShareKey & GetKey() const
CBLSLazySignature sigShare
const uint256 & GetSignHash() const
std::string ToString() const
void Set(uint16_t quorumMember, bool v)
void Merge(const CSigSharesInv &inv2)
std::vector< bool > inv
bool IsSet(uint16_t quorumMember) const
void AsyncSign(const CQuorumCPtr &quorum, const uint256 &id, const uint256 &msgHash)
void CollectSigSharesToAnnounce(std::unordered_map< NodeId, std::unordered_map< uint256, CSigSharesInv, StaticSaltedHasher >> &sigSharesToAnnounce)
static const int64_t SESSION_NEW_SHARES_TIMEOUT
bool VerifySigSharesInv(NodeId from, Consensus::LLMQType llmqType, const CSigSharesInv &inv)
bool ProcessMessageGetSigShares(CNode *pfrom, const CSigSharesInv &inv, CConnman &connman)
bool GetSessionInfoByRecvId(NodeId nodeId, uint32_t sessionId, CSigSharesNodeState::SessionInfo &retInfo)
void ProcessMessageSigShare(NodeId fromId, const CSigShare &sigShare, CConnman &connman)
std::unordered_map< NodeId, CSigSharesNodeState > nodeStates
void RemoveSigSharesForSession(const uint256 &signHash)
void CollectPendingSigSharesToVerify(size_t maxUniqueSessions, std::unordered_map< NodeId, std::vector< CSigShare >> &retSigShares, std::unordered_map< std::pair< Consensus::LLMQType, uint256 >, CQuorumCPtr, StaticSaltedHasher > &retQuorums)
std::vector< std::tuple< const CQuorumCPtr, uint256, uint256 > > pendingSigns
void ProcessPendingSigSharesFromNode(NodeId nodeId, const std::vector< CSigShare > &sigShares, const std::unordered_map< std::pair< Consensus::LLMQType, uint256 >, CQuorumCPtr, StaticSaltedHasher > &quorums, CConnman &connman)
void ProcessMessage(CNode *pnode, const std::string &strCommand, CDataStream &vRecv, CConnman &connman)
void CollectSigSharesToSend(std::unordered_map< NodeId, std::unordered_map< uint256, CBatchedSigShares, StaticSaltedHasher >> &sigSharesToSend)
std::unordered_map< uint256, int64_t, StaticSaltedHasher > timeSeenForSessions
SigShareMap< CSigShare > sigShares
std::atomic< uint32_t > recoveredSigsCounter
SigShareMap< bool > sigSharesToAnnounce
std::unordered_map< uint256, CSignedSession, StaticSaltedHasher > signedSessions
CSigShare RebuildSigShare(const CSigSharesNodeState::SessionInfo &session, const CBatchedSigShares &batchedSigShares, size_t idx)
static const int64_t SIG_SHARE_REQUEST_TIMEOUT
bool ProcessMessageSigSharesInv(CNode *pfrom, const CSigSharesInv &inv, CConnman &connman)
void Sign(const CQuorumCPtr &quorum, const uint256 &id, const uint256 &msgHash)
void ProcessSigShare(NodeId nodeId, const CSigShare &sigShare, CConnman &connman, const CQuorumCPtr &quorum)
CThreadInterrupt interruptSigningShare
bool ProcessMessageBatchedSigShares(CNode *pfrom, const CBatchedSigShares &batchedSigShares, CConnman &connman)
bool ProcessPendingSigShares(CConnman &connman)
bool PreVerifyBatchedSigShares(NodeId nodeId, const CSigSharesNodeState::SessionInfo &session, const CBatchedSigShares &batchedSigShares, bool &retBan)
void TryRecoverSig(const CQuorumCPtr &quorum, const uint256 &id, const uint256 &msgHash, CConnman &connman)
const int64_t MAX_SEND_FOR_RECOVERY_TIMEOUT
void CollectSigSharesToRequest(std::unordered_map< NodeId, std::unordered_map< uint256, CSigSharesInv, StaticSaltedHasher >> &sigSharesToRequest)
bool ProcessMessageSigSesAnn(CNode *pfrom, const CSigSesAnn &ann, CConnman &connman)
void HandleNewRecoveredSig(const CRecoveredSig &recoveredSig)
const int64_t EXP_SEND_FOR_RECOVERY_TIMEOUT
static CDeterministicMNCPtr SelectMemberForRecovery(const CQuorumCPtr &quorum, const uint256 &id, int attempt)
SigShareMap< std::pair< NodeId, int64_t > > sigSharesRequested
void ForceReAnnouncement(const CQuorumCPtr &quorum, Consensus::LLMQType llmqType, const uint256 &id, const uint256 &msgHash)
SigShareMap< CSigShare > pendingIncomingSigShares
Session * GetSessionBySignHash(const uint256 &signHash)
Session & GetOrCreateSessionFromShare(const CSigShare &sigShare)
std::unordered_map< uint32_t, Session * > sessionByRecvId
void RemoveSession(const uint256 &signHash)
Session & GetOrCreateSessionFromAnn(const CSigSesAnn &ann)
bool GetSessionInfoByRecvId(uint32_t sessionId, SessionInfo &retInfo)
std::unordered_map< uint256, Session, StaticSaltedHasher > sessions
SigShareMap< int64_t > requestedSigShares
Session * GetSessionByRecvId(uint32_t sessionId)
void Erase(const SigShareKey &k)
T * Get(const SigShareKey &k)
void EraseAllForSignHash(const uint256 &signHash)
T & GetOrAdd(const SigShareKey &k)
bool Add(const SigShareKey &k, const T &v)
256-bit opaque blob.
Definition: uint256.h:138
std::shared_ptr< const CDeterministicMN > CDeterministicMNCPtr
if(!read_stdin(buffer))
Definition: fuzz.cpp:72
uint256 SerializeHash(const T &obj, int nType=SER_GETHASH, int nVersion=PROTOCOL_VERSION)
Compute the 256-bit hash of an object's serialization.
Definition: hash.h:289
#define T(expected, seed, data)
std::unique_ptr< CConnman > g_connman
Definition: init.cpp:90
@ LOCK
Definition: lockunlock.h:16
#define LogPrint(category,...)
Definition: logging.h:163
@ LLMQ
Definition: logging.h:66
LLMQType
Definition: params.h:90
const char * QSIGSESANN
Definition: protocol.cpp:63
const char * QGETSIGSHARES
Definition: protocol.cpp:65
const char * QBSIGSHARES
Definition: protocol.cpp:66
const char * QSIGSHARE
Definition: protocol.cpp:68
const char * QSIGSHARESINV
Definition: protocol.cpp:64
uint256 BuildSignHash(Consensus::LLMQType llmqType, const uint256 &quorumHash, const uint256 &id, const uint256 &msgHash)
bool IsQuorumActive(Consensus::LLMQType llmqType, const uint256 &quorumHash)
Definition: quorums.cpp:26
std::shared_ptr< const CQuorum > CQuorumCPtr
Definition: quorums.h:72
std::unique_ptr< CQuorumManager > quorumManager
Definition: quorums.cpp:31
std::pair< uint256, uint16_t > SigShareKey
std::unique_ptr< CSigningManager > quorumSigningManager
std::unique_ptr< CSigSharesManager > quorumSigSharesManager
int NodeId
Definition: net.h:109
void Misbehaving(NodeId pnode, int howmuch, const std::string &message) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
Increase a node's misbehavior score.
bool IsBanned(NodeId pnode)
RecursiveMutex cs_main
Global state.
Definition: validation.cpp:80
void Shuffle(I first, I last, R &&rng)
More efficient than using std::shuffle on a FastRandomContext.
Definition: random.h:217
std::map< LLMQType, LLMQParams > llmqs
Definition: params.h:279
#define LOCK2(cs1, cs2)
Definition: sync.h:221
#define AssertLockHeld(cs)
Definition: sync.h:75
void TraceThread(const std::string name, Callable func)
Definition: system.h:271
int64_t GetAdjustedTime()
Definition: timedata.cpp:36
#define strprintf
Definition: tinyformat.h:1056
int64_t GetTimeMillis()
Returns the system time (not mockable)
Definition: utiltime.cpp:61