PIVX Core  5.6.99
P2P Digital Currency
quorums_signing_shares.h
Go to the documentation of this file.
1 // Copyright (c) 2018-2022 The Dash Core developers
2 // Copyright (c) 2023 The PIVX Core developers
3 // Distributed under the MIT/X11 software license, see the accompanying
4 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
5 
6 #ifndef PIVX_LLMQ_QUORUMS_SIGNING_SHARES_H
7 #define PIVX_LLMQ_QUORUMS_SIGNING_SHARES_H
8 
9 #include "chainparams.h"
10 #include "consensus/params.h"
11 #include "net.h"
12 #include "random.h"
13 #include "saltedhasher.h"
14 #include "serialize.h"
15 #include "sync.h"
16 #include "tinyformat.h"
17 #include "uint256.h"
18 
19 #include "llmq/quorums.h"
20 
21 #include <mutex>
22 #include <thread>
23 #include <unordered_map>
24 #include <unordered_set>
25 
26 class CEvoDB;
27 class CScheduler;
28 
29 namespace llmq
30 {
31 // <signHash, quorumMember>
32 typedef std::pair<uint256, uint16_t> SigShareKey;
33 
34 class CSigShare
35 {
36 public:
37  uint8_t llmqType;
39  uint16_t quorumMember;
43 
45 
46 public:
47  void UpdateKey();
48  const SigShareKey& GetKey() const
49  {
50  return key;
51  }
52  const uint256& GetSignHash() const
53  {
54  assert(!key.first.IsNull());
55  return key.first;
56  }
57 
59  {
60  READWRITE(obj.llmqType);
61  READWRITE(obj.quorumHash);
62  READWRITE(obj.quorumMember);
63  READWRITE(obj.id);
64  READWRITE(obj.msgHash);
65  READWRITE(obj.sigShare);
66  SER_READ(obj, obj.UpdateKey());
67  }
68 };
69 
70 // Nodes will first announce a signing session with a sessionId to be used in all future P2P messages related to that
71 // session. We locally keep track of the mapping for each node. We also assign new sessionIds for outgoing sessions
72 // and send QSIGSESANN messages appropriately. All values except the max value for uint32_t are valid as sessionId
74 {
75 public:
76  uint32_t sessionId{(uint32_t)-1};
77  uint8_t llmqType;
81 
83  {
84  READWRITE(VARINT(obj.sessionId));
85  READWRITE(obj.llmqType);
86  READWRITE(obj.quorumHash);
87  READWRITE(obj.id);
88  READWRITE(obj.msgHash);
89  }
90 
91  std::string ToString() const;
92 };
93 
95 {
96 public:
97  uint32_t sessionId{(uint32_t)-1};
98  std::vector<bool> inv;
99 
100 public:
102  {
103  uint64_t invSize = obj.inv.size();
104 
105  READWRITE(VARINT(obj.sessionId));
106  READWRITE(COMPACTSIZE(invSize));
107  READWRITE(AUTOBITSET(obj.inv, (size_t)invSize));
108  }
109 
110  void Init(size_t size);
111  bool IsSet(uint16_t quorumMember) const;
112  void Set(uint16_t quorumMember, bool v);
113  void SetAll(bool v);
114  void Merge(const CSigSharesInv& inv2);
115 
116  size_t CountSet() const;
117  std::string ToString() const;
118 };
119 
120 // sent through the message QBSIGSHARES as a vector of multiple batches
122 {
123 public:
124  uint32_t sessionId{(uint32_t)-1};
125  std::vector<std::pair<uint16_t, CBLSLazySignature>> sigShares;
126 
127 public:
129  {
130  READWRITE(VARINT(obj.sessionId));
131  READWRITE(obj.sigShares);
132  }
133 
134  std::string ToInvString() const;
135 };
136 
137 template <typename T>
139 {
140 private:
141  std::unordered_map<uint256, std::unordered_map<uint16_t, T>, StaticSaltedHasher> internalMap;
142 
143 public:
144  bool Add(const SigShareKey& k, const T& v)
145  {
146  auto& m = internalMap[k.first];
147  return m.emplace(k.second, v).second;
148  }
149 
150  void Erase(const SigShareKey& k)
151  {
152  auto it = internalMap.find(k.first);
153  if (it == internalMap.end()) {
154  return;
155  }
156  it->second.erase(k.second);
157  if (it->second.empty()) {
158  internalMap.erase(it);
159  }
160  }
161 
162  void Clear()
163  {
164  internalMap.clear();
165  }
166 
167  bool Has(const SigShareKey& k) const
168  {
169  auto it = internalMap.find(k.first);
170  if (it == internalMap.end()) {
171  return false;
172  }
173  return it->second.count(k.second) != 0;
174  }
175 
176  T* Get(const SigShareKey& k)
177  {
178  auto it = internalMap.find(k.first);
179  if (it == internalMap.end()) {
180  return nullptr;
181  }
182 
183  auto jt = it->second.find(k.second);
184  if (jt == it->second.end()) {
185  return nullptr;
186  }
187 
188  return &jt->second;
189  }
190 
191  T& GetOrAdd(const SigShareKey& k)
192  {
193  T* v = Get(k);
194  if (!v) {
195  Add(k, T());
196  v = Get(k);
197  }
198  return *v;
199  }
200 
201  const T* GetFirst() const
202  {
203  if (internalMap.empty()) {
204  return nullptr;
205  }
206  return &internalMap.begin()->second.begin()->second;
207  }
208 
209  size_t Size() const
210  {
211  size_t s = 0;
212  for (auto& p : internalMap) {
213  s += p.second.size();
214  }
215  return s;
216  }
217 
218  size_t CountForSignHash(const uint256& signHash) const
219  {
220  auto it = internalMap.find(signHash);
221  if (it == internalMap.end()) {
222  return 0;
223  }
224  return it->second.size();
225  }
226 
227  bool Empty() const
228  {
229  return internalMap.empty();
230  }
231 
232  const std::unordered_map<uint16_t, T>* GetAllForSignHash(const uint256& signHash)
233  {
234  auto it = internalMap.find(signHash);
235  if (it == internalMap.end()) {
236  return nullptr;
237  }
238  return &it->second;
239  }
240 
241  void EraseAllForSignHash(const uint256& signHash)
242  {
243  internalMap.erase(signHash);
244  }
245 
246  template <typename F>
247  void EraseIf(F&& f)
248  {
249  for (auto it = internalMap.begin(); it != internalMap.end();) {
250  SigShareKey k;
251  k.first = it->first;
252  for (auto jt = it->second.begin(); jt != it->second.end();) {
253  k.second = jt->first;
254  if (f(k, jt->second)) {
255  jt = it->second.erase(jt);
256  } else {
257  ++jt;
258  }
259  }
260  if (it->second.empty()) {
261  it = internalMap.erase(it);
262  } else {
263  ++it;
264  }
265  }
266  }
267 
268  template <typename F>
269  void ForEach(F&& f)
270  {
271  for (auto& p : internalMap) {
272  SigShareKey k;
273  k.first = p.first;
274  for (auto& p2 : p.second) {
275  k.second = p2.first;
276  f(k, p2.second);
277  }
278  }
279  }
280 };
281 
283 {
284 public:
285  // Used to avoid holding locks too long
286  struct SessionInfo {
292 
294  };
295 
296  struct Session {
297  uint32_t recvSessionId{(uint32_t)-1};
298  uint32_t sendSessionId{(uint32_t)-1};
299 
305 
307 
311  };
312  // TODO limit number of sessions per node
313  std::unordered_map<uint256, Session, StaticSaltedHasher> sessions;
314 
315  std::unordered_map<uint32_t, Session*> sessionByRecvId;
316  uint32_t nextSendSessionId{1};
317 
320 
321  bool banned{false};
322 
323  Session& GetOrCreateSessionFromShare(const CSigShare& sigShare);
324  Session& GetOrCreateSessionFromAnn(const CSigSesAnn& ann);
325  Session* GetSessionBySignHash(const uint256& signHash);
326  Session* GetSessionByRecvId(uint32_t sessionId);
327  bool GetSessionInfoByRecvId(uint32_t sessionId, SessionInfo& retInfo);
328 
329  void RemoveSession(const uint256& signHash);
330 };
331 
333 {
334 public:
337 
338  int64_t nextAttemptTime{0};
339  int attempt{0};
340 };
341 
343 {
344  static const int64_t SESSION_NEW_SHARES_TIMEOUT = 60;
345  static const int64_t SIG_SHARE_REQUEST_TIMEOUT = 5;
346 
347  // we try to keep total message size below 10k
348  const size_t MAX_MSGS_CNT_QSIGSESANN = 100;
349  const size_t MAX_MSGS_CNT_QGETSIGSHARES = 200;
350  const size_t MAX_MSGS_CNT_QSIGSHARESINV = 200;
351  // 400 is the maximum quorum size, so this is also the maximum number of sigs we need to support
352  const size_t MAX_MSGS_TOTAL_BATCHED_SIGS = 400;
353 
354  const int64_t EXP_SEND_FOR_RECOVERY_TIMEOUT = 2000;
355  const int64_t MAX_SEND_FOR_RECOVERY_TIMEOUT = 10000;
356  const size_t MAX_MSGS_SIG_SHARES = 32;
357 
358 private:
360 
361  std::thread workThread;
363 
365  std::unordered_map<uint256, CSignedSession, StaticSaltedHasher> signedSessions;
366 
367  // stores time of last receivedSigShare. Used to detect timeouts
368  std::unordered_map<uint256, int64_t, StaticSaltedHasher> timeSeenForSessions;
369 
370  std::unordered_map<NodeId, CSigSharesNodeState> nodeStates;
373 
374  std::vector<std::tuple<const CQuorumCPtr, uint256, uint256>> pendingSigns;
375 
376  // must be protected by cs
378 
379  int64_t lastCleanupTime{0};
380  std::atomic<uint32_t> recoveredSigsCounter{0};
381 
382 public:
385 
386  void StartWorkerThread();
387  void StopWorkerThread();
388  void Interrupt();
391 
392 public:
393  void ProcessMessage(CNode* pnode, const std::string& strCommand, CDataStream& vRecv, CConnman& connman);
394 
395  void AsyncSign(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash);
396  void Sign(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash);
397  void ForceReAnnouncement(const CQuorumCPtr& quorum, Consensus::LLMQType llmqType, const uint256& id, const uint256& msgHash);
398 
399  void HandleNewRecoveredSig(const CRecoveredSig& recoveredSig);
400 
401  static CDeterministicMNCPtr SelectMemberForRecovery(const CQuorumCPtr& quorum, const uint256& id, int attempt);
402 
403 private:
404  // all of these return false when the currently processed message should be aborted (as each message actually contains multiple messages)
405  bool ProcessMessageSigSesAnn(CNode* pfrom, const CSigSesAnn& ann, CConnman& connman);
406  bool ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman);
407  bool ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman);
408  bool ProcessMessageBatchedSigShares(CNode* pfrom, const CBatchedSigShares& batchedSigShares, CConnman& connman);
409  void ProcessMessageSigShare(NodeId fromId, const CSigShare& sigShare, CConnman& connman);
410 
411  bool VerifySigSharesInv(NodeId from, Consensus::LLMQType llmqType, const CSigSharesInv& inv);
412  bool PreVerifyBatchedSigShares(NodeId nodeId, const CSigSharesNodeState::SessionInfo& session, const CBatchedSigShares& batchedSigShares, bool& retBan);
413 
414  void CollectPendingSigSharesToVerify(size_t maxUniqueSessions,
415  std::unordered_map<NodeId, std::vector<CSigShare>>& retSigShares,
416  std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& retQuorums);
417  bool ProcessPendingSigShares(CConnman& connman);
418 
420  const std::vector<CSigShare>& sigShares,
421  const std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& quorums,
422  CConnman& connman);
423 
424  void ProcessSigShare(NodeId nodeId, const CSigShare& sigShare, CConnman& connman, const CQuorumCPtr& quorum);
425  void TryRecoverSig(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash, CConnman& connman);
426 
427 private:
428  bool GetSessionInfoByRecvId(NodeId nodeId, uint32_t sessionId, CSigSharesNodeState::SessionInfo& retInfo);
429  CSigShare RebuildSigShare(const CSigSharesNodeState::SessionInfo& session, const CBatchedSigShares& batchedSigShares, size_t idx);
430 
431  void Cleanup();
432  void RemoveSigSharesForSession(const uint256& signHash);
433  void RemoveBannedNodeStates();
434 
435  void BanNode(NodeId nodeId);
436 
437  bool SendMessages();
438  void CollectSigSharesToRequest(std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>>& sigSharesToRequest);
439  void CollectSigSharesToSend(std::unordered_map<NodeId, std::unordered_map<uint256, CBatchedSigShares, StaticSaltedHasher>>& sigSharesToSend);
440  void CollectSigSharesToSend(std::unordered_map<NodeId, std::vector<CSigShare>>& sigSharesToSend, const std::vector<CNode*>& vNodes);
441  void CollectSigSharesToAnnounce(std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>>& sigSharesToAnnounce);
442  bool SignPendingSigShares();
443  void WorkThreadMain();
444 };
445 
446 extern std::unique_ptr<CSigSharesManager> quorumSigSharesManager;
447 
448 } // namespace llmq
449 
450 #endif // PIVX_LLMQ_QUORUMS_SIGNING_SHARES_H
Definition: net.h:145
Definition: evodb.h:32
Information about a peer.
Definition: net.h:669
Fast randomness source.
Definition: random.h:107
std::vector< std::pair< uint16_t, CBLSLazySignature > > sigShares
SERIALIZE_METHODS(CBatchedSigShares, obj)
std::string ToString() const
SERIALIZE_METHODS(CSigSesAnn, obj)
const SigShareKey & GetKey() const
CBLSLazySignature sigShare
const uint256 & GetSignHash() const
SERIALIZE_METHODS(CSigShare, obj)
std::string ToString() const
SERIALIZE_METHODS(CSigSharesInv, obj)
void Set(uint16_t quorumMember, bool v)
void Merge(const CSigSharesInv &inv2)
std::vector< bool > inv
bool IsSet(uint16_t quorumMember) const
void AsyncSign(const CQuorumCPtr &quorum, const uint256 &id, const uint256 &msgHash)
void CollectSigSharesToAnnounce(std::unordered_map< NodeId, std::unordered_map< uint256, CSigSharesInv, StaticSaltedHasher >> &sigSharesToAnnounce)
static const int64_t SESSION_NEW_SHARES_TIMEOUT
bool VerifySigSharesInv(NodeId from, Consensus::LLMQType llmqType, const CSigSharesInv &inv)
bool ProcessMessageGetSigShares(CNode *pfrom, const CSigSharesInv &inv, CConnman &connman)
bool GetSessionInfoByRecvId(NodeId nodeId, uint32_t sessionId, CSigSharesNodeState::SessionInfo &retInfo)
void ProcessMessageSigShare(NodeId fromId, const CSigShare &sigShare, CConnman &connman)
std::unordered_map< NodeId, CSigSharesNodeState > nodeStates
void RemoveSigSharesForSession(const uint256 &signHash)
void CollectPendingSigSharesToVerify(size_t maxUniqueSessions, std::unordered_map< NodeId, std::vector< CSigShare >> &retSigShares, std::unordered_map< std::pair< Consensus::LLMQType, uint256 >, CQuorumCPtr, StaticSaltedHasher > &retQuorums)
std::vector< std::tuple< const CQuorumCPtr, uint256, uint256 > > pendingSigns
void ProcessPendingSigSharesFromNode(NodeId nodeId, const std::vector< CSigShare > &sigShares, const std::unordered_map< std::pair< Consensus::LLMQType, uint256 >, CQuorumCPtr, StaticSaltedHasher > &quorums, CConnman &connman)
void ProcessMessage(CNode *pnode, const std::string &strCommand, CDataStream &vRecv, CConnman &connman)
void CollectSigSharesToSend(std::unordered_map< NodeId, std::unordered_map< uint256, CBatchedSigShares, StaticSaltedHasher >> &sigSharesToSend)
std::unordered_map< uint256, int64_t, StaticSaltedHasher > timeSeenForSessions
SigShareMap< CSigShare > sigShares
std::atomic< uint32_t > recoveredSigsCounter
SigShareMap< bool > sigSharesToAnnounce
std::unordered_map< uint256, CSignedSession, StaticSaltedHasher > signedSessions
CSigShare RebuildSigShare(const CSigSharesNodeState::SessionInfo &session, const CBatchedSigShares &batchedSigShares, size_t idx)
static const int64_t SIG_SHARE_REQUEST_TIMEOUT
bool ProcessMessageSigSharesInv(CNode *pfrom, const CSigSharesInv &inv, CConnman &connman)
void Sign(const CQuorumCPtr &quorum, const uint256 &id, const uint256 &msgHash)
void ProcessSigShare(NodeId nodeId, const CSigShare &sigShare, CConnman &connman, const CQuorumCPtr &quorum)
CThreadInterrupt interruptSigningShare
bool ProcessMessageBatchedSigShares(CNode *pfrom, const CBatchedSigShares &batchedSigShares, CConnman &connman)
bool ProcessPendingSigShares(CConnman &connman)
bool PreVerifyBatchedSigShares(NodeId nodeId, const CSigSharesNodeState::SessionInfo &session, const CBatchedSigShares &batchedSigShares, bool &retBan)
void TryRecoverSig(const CQuorumCPtr &quorum, const uint256 &id, const uint256 &msgHash, CConnman &connman)
const int64_t MAX_SEND_FOR_RECOVERY_TIMEOUT
void CollectSigSharesToRequest(std::unordered_map< NodeId, std::unordered_map< uint256, CSigSharesInv, StaticSaltedHasher >> &sigSharesToRequest)
bool ProcessMessageSigSesAnn(CNode *pfrom, const CSigSesAnn &ann, CConnman &connman)
void HandleNewRecoveredSig(const CRecoveredSig &recoveredSig)
const int64_t EXP_SEND_FOR_RECOVERY_TIMEOUT
static CDeterministicMNCPtr SelectMemberForRecovery(const CQuorumCPtr &quorum, const uint256 &id, int attempt)
SigShareMap< std::pair< NodeId, int64_t > > sigSharesRequested
void ForceReAnnouncement(const CQuorumCPtr &quorum, Consensus::LLMQType llmqType, const uint256 &id, const uint256 &msgHash)
SigShareMap< CSigShare > pendingIncomingSigShares
Session * GetSessionBySignHash(const uint256 &signHash)
Session & GetOrCreateSessionFromShare(const CSigShare &sigShare)
std::unordered_map< uint32_t, Session * > sessionByRecvId
void RemoveSession(const uint256 &signHash)
Session & GetOrCreateSessionFromAnn(const CSigSesAnn &ann)
bool GetSessionInfoByRecvId(uint32_t sessionId, SessionInfo &retInfo)
std::unordered_map< uint256, Session, StaticSaltedHasher > sessions
SigShareMap< int64_t > requestedSigShares
Session * GetSessionByRecvId(uint32_t sessionId)
void Erase(const SigShareKey &k)
bool Has(const SigShareKey &k) const
T * Get(const SigShareKey &k)
void EraseAllForSignHash(const uint256 &signHash)
const T * GetFirst() const
T & GetOrAdd(const SigShareKey &k)
std::unordered_map< uint256, std::unordered_map< uint16_t, T >, StaticSaltedHasher > internalMap
bool Add(const SigShareKey &k, const T &v)
size_t CountForSignHash(const uint256 &signHash) const
const std::unordered_map< uint16_t, T > * GetAllForSignHash(const uint256 &signHash)
256-bit opaque blob.
Definition: uint256.h:138
std::shared_ptr< const CDeterministicMN > CDeterministicMNCPtr
#define T(expected, seed, data)
LLMQType
Definition: params.h:90
Definition: quorums.cpp:26
std::shared_ptr< const CQuorum > CQuorumCPtr
Definition: quorums.h:72
std::pair< uint256, uint16_t > SigShareKey
std::unique_ptr< CSigSharesManager > quorumSigSharesManager
int NodeId
Definition: net.h:109
#define VARINT(obj)
Definition: serialize.h:513
#define SER_READ(obj, code)
Definition: serialize.h:185
#define AUTOBITSET(obj, size)
Definition: serialize.h:535
#define COMPACTSIZE(obj)
Definition: serialize.h:514
#define READWRITE(...)
Definition: serialize.h:183