Line data Source code
1 : // Copyright (c) 2018-2022 The Dash Core developers
2 : // Copyright (c) 2023 The PIVX Core developers
3 : // Distributed under the MIT/X11 software license, see the accompanying
4 : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
5 :
6 : #ifndef PIVX_LLMQ_QUORUMS_SIGNING_SHARES_H
7 : #define PIVX_LLMQ_QUORUMS_SIGNING_SHARES_H
8 :
9 : #include "chainparams.h"
10 : #include "consensus/params.h"
11 : #include "net.h"
12 : #include "random.h"
13 : #include "saltedhasher.h"
14 : #include "serialize.h"
15 : #include "sync.h"
16 : #include "tinyformat.h"
17 : #include "uint256.h"
18 :
19 : #include "llmq/quorums.h"
20 :
21 : #include <mutex>
22 : #include <thread>
23 : #include <unordered_map>
24 : #include <unordered_set>
25 :
26 : class CEvoDB;
27 : class CScheduler;
28 :
29 : namespace llmq
30 : {
31 : // <signHash, quorumMember>
32 : typedef std::pair<uint256, uint16_t> SigShareKey;
33 :
34 14196 : class CSigShare
35 : {
36 : public:
37 : uint8_t llmqType;
38 : uint256 quorumHash;
39 : uint16_t quorumMember;
40 : uint256 id;
41 : uint256 msgHash;
42 : CBLSLazySignature sigShare;
43 :
44 : SigShareKey key;
45 :
46 : public:
47 : void UpdateKey();
48 6247 : const SigShareKey& GetKey() const
49 : {
50 6247 : return key;
51 : }
52 9366 : const uint256& GetSignHash() const
53 : {
54 18732 : assert(!key.first.IsNull());
55 9366 : return key.first;
56 : }
57 :
58 5022 : SERIALIZE_METHODS(CSigShare, obj)
59 : {
60 1744 : READWRITE(obj.llmqType);
61 1744 : READWRITE(obj.quorumHash);
62 1744 : READWRITE(obj.quorumMember);
63 1744 : READWRITE(obj.id);
64 1744 : READWRITE(obj.msgHash);
65 1744 : READWRITE(obj.sigShare);
66 1744 : SER_READ(obj, obj.UpdateKey());
67 1744 : }
68 : };
69 :
70 : // Nodes will first announce a signing session with a sessionId to be used in all future P2P messages related to that
71 : // session. We locally keep track of the mapping for each node. We also assign new sessionIds for outgoing sessions
72 : // and send QSIGSESANN messages appropriately. All values except the max value for uint32_t are valid as sessionId
73 0 : class CSigSesAnn
74 : {
75 : public:
76 : uint32_t sessionId{(uint32_t)-1};
77 : uint8_t llmqType;
78 : uint256 quorumHash;
79 : uint256 id;
80 : uint256 msgHash;
81 :
82 0 : SERIALIZE_METHODS(CSigSesAnn, obj)
83 : {
84 0 : READWRITE(VARINT(obj.sessionId));
85 0 : READWRITE(obj.llmqType);
86 0 : READWRITE(obj.quorumHash);
87 0 : READWRITE(obj.id);
88 0 : READWRITE(obj.msgHash);
89 0 : }
90 :
91 : std::string ToString() const;
92 : };
93 :
94 0 : class CSigSharesInv
95 : {
96 : public:
97 : uint32_t sessionId{(uint32_t)-1};
98 : std::vector<bool> inv;
99 :
100 : public:
101 0 : SERIALIZE_METHODS(CSigSharesInv, obj)
102 : {
103 0 : uint64_t invSize = obj.inv.size();
104 :
105 0 : READWRITE(VARINT(obj.sessionId));
106 0 : READWRITE(COMPACTSIZE(invSize));
107 0 : READWRITE(AUTOBITSET(obj.inv, (size_t)invSize));
108 0 : }
109 :
110 : void Init(size_t size);
111 : bool IsSet(uint16_t quorumMember) const;
112 : void Set(uint16_t quorumMember, bool v);
113 : void SetAll(bool v);
114 : void Merge(const CSigSharesInv& inv2);
115 :
116 : size_t CountSet() const;
117 : std::string ToString() const;
118 : };
119 :
120 : // sent through the message QBSIGSHARES as a vector of multiple batches
121 0 : class CBatchedSigShares
122 : {
123 : public:
124 : uint32_t sessionId{(uint32_t)-1};
125 : std::vector<std::pair<uint16_t, CBLSLazySignature>> sigShares;
126 :
127 : public:
128 0 : SERIALIZE_METHODS(CBatchedSigShares, obj)
129 : {
130 0 : READWRITE(VARINT(obj.sessionId));
131 0 : READWRITE(obj.sigShares);
132 0 : }
133 :
134 : std::string ToInvString() const;
135 : };
136 :
137 : template <typename T>
138 1425 : class SigShareMap
139 : {
140 : private:
141 : std::unordered_map<uint256, std::unordered_map<uint16_t, T>, StaticSaltedHasher> internalMap;
142 :
143 : public:
144 3596 : bool Add(const SigShareKey& k, const T& v)
145 : {
146 3596 : auto& m = internalMap[k.first];
147 3596 : return m.emplace(k.second, v).second;
148 : }
149 :
150 593 : void Erase(const SigShareKey& k)
151 : {
152 593 : auto it = internalMap.find(k.first);
153 593 : if (it == internalMap.end()) {
154 0 : return;
155 : }
156 593 : it->second.erase(k.second);
157 593 : if (it->second.empty()) {
158 593 : internalMap.erase(it);
159 : }
160 : }
161 :
162 0 : void Clear()
163 : {
164 0 : internalMap.clear();
165 : }
166 :
167 1465 : bool Has(const SigShareKey& k) const
168 : {
169 1465 : auto it = internalMap.find(k.first);
170 1465 : if (it == internalMap.end()) {
171 : return false;
172 : }
173 595 : return it->second.count(k.second) != 0;
174 : }
175 :
176 0 : T* Get(const SigShareKey& k)
177 : {
178 0 : auto it = internalMap.find(k.first);
179 0 : if (it == internalMap.end()) {
180 : return nullptr;
181 : }
182 :
183 0 : auto jt = it->second.find(k.second);
184 0 : if (jt == it->second.end()) {
185 : return nullptr;
186 : }
187 :
188 0 : return &jt->second;
189 : }
190 :
191 0 : T& GetOrAdd(const SigShareKey& k)
192 : {
193 0 : T* v = Get(k);
194 0 : if (!v) {
195 0 : Add(k, T());
196 0 : v = Get(k);
197 : }
198 0 : return *v;
199 : }
200 :
201 593 : const T* GetFirst() const
202 : {
203 593 : if (internalMap.empty()) {
204 : return nullptr;
205 : }
206 593 : return &internalMap.begin()->second.begin()->second;
207 : }
208 :
209 0 : size_t Size() const
210 : {
211 0 : size_t s = 0;
212 0 : for (auto& p : internalMap) {
213 0 : s += p.second.size();
214 : }
215 : return s;
216 : }
217 :
218 1492 : size_t CountForSignHash(const uint256& signHash) const
219 : {
220 1492 : auto it = internalMap.find(signHash);
221 1492 : if (it == internalMap.end()) {
222 : return 0;
223 : }
224 1492 : return it->second.size();
225 : }
226 :
227 24772 : bool Empty() const
228 : {
229 24772 : return internalMap.empty();
230 : }
231 :
232 385 : const std::unordered_map<uint16_t, T>* GetAllForSignHash(const uint256& signHash)
233 : {
234 385 : auto it = internalMap.find(signHash);
235 385 : if (it == internalMap.end()) {
236 : return nullptr;
237 : }
238 385 : return &it->second;
239 : }
240 :
241 6720 : void EraseAllForSignHash(const uint256& signHash)
242 : {
243 6720 : internalMap.erase(signHash);
244 : }
245 :
246 : template <typename F>
247 0 : void EraseIf(F&& f)
248 : {
249 0 : for (auto it = internalMap.begin(); it != internalMap.end();) {
250 0 : SigShareKey k;
251 0 : k.first = it->first;
252 0 : for (auto jt = it->second.begin(); jt != it->second.end();) {
253 0 : k.second = jt->first;
254 0 : if (f(k, jt->second)) {
255 0 : jt = it->second.erase(jt);
256 : } else {
257 0 : ++jt;
258 : }
259 : }
260 0 : if (it->second.empty()) {
261 0 : it = internalMap.erase(it);
262 : } else {
263 0 : ++it;
264 : }
265 : }
266 0 : }
267 :
268 : template <typename F>
269 19190 : void ForEach(F&& f)
270 : {
271 21509 : for (auto& p : internalMap) {
272 2319 : SigShareKey k;
273 2319 : k.first = p.first;
274 4638 : for (auto& p2 : p.second) {
275 2319 : k.second = p2.first;
276 2319 : f(k, p2.second);
277 : }
278 : }
279 19190 : }
280 : };
281 :
282 44 : class CSigSharesNodeState
283 : {
284 : public:
285 : // Used to avoid holding locks too long
286 0 : struct SessionInfo {
287 : Consensus::LLMQType llmqType;
288 : uint256 quorumHash;
289 : uint256 id;
290 : uint256 msgHash;
291 : uint256 signHash;
292 :
293 : CQuorumCPtr quorum;
294 : };
295 :
296 : struct Session {
297 : uint32_t recvSessionId{(uint32_t)-1};
298 : uint32_t sendSessionId{(uint32_t)-1};
299 :
300 : Consensus::LLMQType llmqType;
301 : uint256 quorumHash;
302 : uint256 id;
303 : uint256 msgHash;
304 : uint256 signHash;
305 :
306 : CQuorumCPtr quorum;
307 :
308 : CSigSharesInv announced;
309 : CSigSharesInv requested;
310 : CSigSharesInv knows;
311 : };
312 : // TODO limit number of sessions per node
313 : std::unordered_map<uint256, Session, StaticSaltedHasher> sessions;
314 :
315 : std::unordered_map<uint32_t, Session*> sessionByRecvId;
316 : uint32_t nextSendSessionId{1};
317 :
318 : SigShareMap<CSigShare> pendingIncomingSigShares;
319 : SigShareMap<int64_t> requestedSigShares;
320 :
321 : bool banned{false};
322 :
323 : Session& GetOrCreateSessionFromShare(const CSigShare& sigShare);
324 : Session& GetOrCreateSessionFromAnn(const CSigSesAnn& ann);
325 : Session* GetSessionBySignHash(const uint256& signHash);
326 : Session* GetSessionByRecvId(uint32_t sessionId);
327 : bool GetSessionInfoByRecvId(uint32_t sessionId, SessionInfo& retInfo);
328 :
329 : void RemoveSession(const uint256& signHash);
330 : };
331 :
332 968 : class CSignedSession
333 : {
334 : public:
335 : CSigShare sigShare;
336 : CQuorumCPtr quorum;
337 :
338 : int64_t nextAttemptTime{0};
339 : int attempt{0};
340 : };
341 :
342 : class CSigSharesManager : public CRecoveredSigsListener
343 : {
344 : static const int64_t SESSION_NEW_SHARES_TIMEOUT = 60;
345 : static const int64_t SIG_SHARE_REQUEST_TIMEOUT = 5;
346 :
347 : // we try to keep total message size below 10k
348 : const size_t MAX_MSGS_CNT_QSIGSESANN = 100;
349 : const size_t MAX_MSGS_CNT_QGETSIGSHARES = 200;
350 : const size_t MAX_MSGS_CNT_QSIGSHARESINV = 200;
351 : // 400 is the maximum quorum size, so this is also the maximum number of sigs we need to support
352 : const size_t MAX_MSGS_TOTAL_BATCHED_SIGS = 400;
353 :
354 : const int64_t EXP_SEND_FOR_RECOVERY_TIMEOUT = 2000;
355 : const int64_t MAX_SEND_FOR_RECOVERY_TIMEOUT = 10000;
356 : const size_t MAX_MSGS_SIG_SHARES = 32;
357 :
358 : private:
359 : RecursiveMutex cs;
360 :
361 : std::thread workThread;
362 : CThreadInterrupt interruptSigningShare;
363 :
364 : SigShareMap<CSigShare> sigShares;
365 : std::unordered_map<uint256, CSignedSession, StaticSaltedHasher> signedSessions;
366 :
367 : // stores time of last receivedSigShare. Used to detect timeouts
368 : std::unordered_map<uint256, int64_t, StaticSaltedHasher> timeSeenForSessions;
369 :
370 : std::unordered_map<NodeId, CSigSharesNodeState> nodeStates;
371 : SigShareMap<std::pair<NodeId, int64_t>> sigSharesRequested;
372 : SigShareMap<bool> sigSharesToAnnounce;
373 :
374 : std::vector<std::tuple<const CQuorumCPtr, uint256, uint256>> pendingSigns;
375 :
376 : // must be protected by cs
377 : FastRandomContext rnd;
378 :
379 : int64_t lastCleanupTime{0};
380 : std::atomic<uint32_t> recoveredSigsCounter{0};
381 :
382 : public:
383 : CSigSharesManager();
384 : ~CSigSharesManager();
385 :
386 : void StartWorkerThread();
387 : void StopWorkerThread();
388 : void Interrupt();
389 : void RegisterAsRecoveredSigsListener();
390 : void UnregisterAsRecoveredSigsListener();
391 :
392 : public:
393 : void ProcessMessage(CNode* pnode, const std::string& strCommand, CDataStream& vRecv, CConnman& connman);
394 :
395 : void AsyncSign(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash);
396 : void Sign(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash);
397 : void ForceReAnnouncement(const CQuorumCPtr& quorum, Consensus::LLMQType llmqType, const uint256& id, const uint256& msgHash);
398 :
399 : void HandleNewRecoveredSig(const CRecoveredSig& recoveredSig);
400 :
401 : static CDeterministicMNCPtr SelectMemberForRecovery(const CQuorumCPtr& quorum, const uint256& id, int attempt);
402 :
403 : private:
404 : // all of these return false when the currently processed message should be aborted (as each message actually contains multiple messages)
405 : bool ProcessMessageSigSesAnn(CNode* pfrom, const CSigSesAnn& ann, CConnman& connman);
406 : bool ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman);
407 : bool ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman);
408 : bool ProcessMessageBatchedSigShares(CNode* pfrom, const CBatchedSigShares& batchedSigShares, CConnman& connman);
409 : void ProcessMessageSigShare(NodeId fromId, const CSigShare& sigShare, CConnman& connman);
410 :
411 : bool VerifySigSharesInv(NodeId from, Consensus::LLMQType llmqType, const CSigSharesInv& inv);
412 : bool PreVerifyBatchedSigShares(NodeId nodeId, const CSigSharesNodeState::SessionInfo& session, const CBatchedSigShares& batchedSigShares, bool& retBan);
413 :
414 : void CollectPendingSigSharesToVerify(size_t maxUniqueSessions,
415 : std::unordered_map<NodeId, std::vector<CSigShare>>& retSigShares,
416 : std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& retQuorums);
417 : bool ProcessPendingSigShares(CConnman& connman);
418 :
419 : void ProcessPendingSigSharesFromNode(NodeId nodeId,
420 : const std::vector<CSigShare>& sigShares,
421 : const std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& quorums,
422 : CConnman& connman);
423 :
424 : void ProcessSigShare(NodeId nodeId, const CSigShare& sigShare, CConnman& connman, const CQuorumCPtr& quorum);
425 : void TryRecoverSig(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash, CConnman& connman);
426 :
427 : private:
428 : bool GetSessionInfoByRecvId(NodeId nodeId, uint32_t sessionId, CSigSharesNodeState::SessionInfo& retInfo);
429 : CSigShare RebuildSigShare(const CSigSharesNodeState::SessionInfo& session, const CBatchedSigShares& batchedSigShares, size_t idx);
430 :
431 : void Cleanup();
432 : void RemoveSigSharesForSession(const uint256& signHash);
433 : void RemoveBannedNodeStates();
434 :
435 : void BanNode(NodeId nodeId);
436 :
437 : bool SendMessages();
438 : void CollectSigSharesToRequest(std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>>& sigSharesToRequest);
439 : void CollectSigSharesToSend(std::unordered_map<NodeId, std::unordered_map<uint256, CBatchedSigShares, StaticSaltedHasher>>& sigSharesToSend);
440 : void CollectSigSharesToSend(std::unordered_map<NodeId, std::vector<CSigShare>>& sigSharesToSend, const std::vector<CNode*>& vNodes);
441 : void CollectSigSharesToAnnounce(std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>>& sigSharesToAnnounce);
442 : bool SignPendingSigShares();
443 : void WorkThreadMain();
444 : };
445 :
446 : extern std::unique_ptr<CSigSharesManager> quorumSigSharesManager;
447 :
448 : } // namespace llmq
449 :
450 : #endif // PIVX_LLMQ_QUORUMS_SIGNING_SHARES_H
|