16 #include "validation.h"
22 maxMessagesPerNode(_maxMessagesPerNode)
29 auto pm = std::make_shared<CDataStream>(std::move(vRecv));
36 LogPrint(
BCLog::NET,
"CDKGPendingMessages::%s -- too many messages, peer=%d\n", __func__, from);
43 hw.
write(pm->data(), pm->size());
62 std::list<BinaryMessage> ret;
90 dkgManager(_dkgManager),
91 curSession(
std::make_shared<
CDKGSession>(_params, _blsWorker, _dkgManager)),
92 pendingContributions((size_t)_params.size * 2),
93 pendingComplaints((size_t)_params.size * 2),
94 pendingJustifications((size_t)_params.size * 2),
95 pendingPrematureCommitments((size_t)_params.size * 2)
98 throw std::runtime_error(
"Can't initialize CDKGSessionHandler with LLMQ_NONE type.");
126 LogPrint(
BCLog::DKG,
"CDKGSessionHandler::%s -- %s - currentHeight=%d, quorumHeight=%d, oldPhase=%d, newPhase=%d\n", __func__,
147 throw std::runtime_error(
"Tried to start an already started CDKGSessionHandler thread.");
174 LogPrintf(
"CDKGSessionHandler::%s -- quorum initialization failed for %s\n", __func__,
curSession->params.name);
192 const uint256& expectedQuorumHash,
195 LogPrint(
BCLog::DKG,
"CDKGSessionHandler::%s -- %s - starting, curPhase=%d, nextPhase=%d\n", __func__,
params.
name, curPhase, nextPhase);
202 if (!expectedQuorumHash.
IsNull() && currState.quorumHash != expectedQuorumHash) {
205 if (currState.phase == nextPhase) {
212 if (!runWhileWaiting()) {
221 bool changed = status.phase != (uint8_t) nextPhase;
222 status.phase = (uint8_t) nextPhase;
240 if (currState.quorumHash != oldQuorumHash) {
251 const uint256& expectedQuorumHash,
252 double randomSleepFactor,
255 if (
Params().IsRegTestNet()) {
273 double phaseSleepTimePerMember = phaseSleepTime /
params.
size;
275 double adjustedPhaseSleepTimePerMember = phaseSleepTimePerMember * randomSleepFactor;
277 int64_t sleepTime = (int64_t)(adjustedPhaseSleepTimePerMember *
curSession->GetMyMemberIndex());
286 LogPrint(
BCLog::DKG,
"CDKGSessionHandler::%s -- %s - starting sleep for %d ms, curPhase=%d\n", __func__,
params.
name, sleepTime, curPhase);
297 int64_t expectedBlockTime = (
currentHeight - heightStart) * nTargetSpacing * 1000;
298 if (expectedBlockTime > sleepTime) {
306 LogPrint(
BCLog::DKG,
"CDKGSessionHandler::%s -- %s - aborting due unexpected phase/expectedQuorumHash change\n", __func__,
params.
name);
310 if (!runWhileWaiting()) {
320 const uint256& expectedQuorumHash,
321 double randomSleepFactor,
325 LogPrint(
BCLog::DKG,
"CDKGSessionHandler::%s -- %s - starting, curPhase=%d, nextPhase=%d\n", __func__,
params.
name, curPhase, nextPhase);
327 SleepBeforePhase(curPhase, expectedQuorumHash, randomSleepFactor, runWhileWaiting);
335 template<
typename Message>
338 if (messages.empty()) {
342 std::set<NodeId> ret;
343 bool revertToSingleVerification =
false;
346 std::vector<CBLSPublicKey> pubKeys;
347 std::vector<uint256> messageHashes;
348 std::set<uint256> messageHashesSet;
349 pubKeys.reserve(messages.size());
350 messageHashes.reserve(messages.size());
352 for (
const auto& p : messages ) {
353 const auto& msg = *p.second;
355 auto member = session.
GetMember(msg.proTxHash);
358 ret.emplace(p.first);
369 auto msgHash = msg.GetSignHash();
370 if (!messageHashesSet.emplace(msgHash).second) {
377 revertToSingleVerification =
true;
381 pubKeys.emplace_back(member->dmn->pdmnState->pubKeyOperator.Get());
382 messageHashes.emplace_back(msgHash);
384 if (!revertToSingleVerification) {
394 bool nodeIdsAllSame =
true;
395 for (
auto it = messages.begin(); it != messages.end(); ++it) {
397 firstNodeId = it->first;
400 if (it->first != firstNodeId) {
401 nodeIdsAllSame =
false;
407 if (nodeIdsAllSame) {
408 ret.emplace(firstNodeId);
414 for (
const auto& p : messages) {
415 if (ret.count(p.first)) {
419 const auto& msg = *p.second;
420 auto member = session.
GetMember(msg.proTxHash);
421 bool valid = msg.sig.VerifyInsecure(member->dmn->pdmnState->pubKeyOperator.Get(), msg.GetSignHash());
423 ret.emplace(p.first);
429 template<
typename Message>
430 static bool ProcessPendingMessageBatch(CDKGSession& session, CDKGPendingMessages& pendingMessages,
size_t maxCount)
432 auto msgs = pendingMessages.PopAndDeserializeMessages<
Message>(maxCount);
437 std::vector<uint256> hashes;
438 std::vector<std::pair<NodeId, std::shared_ptr<Message>>> preverifiedMessages;
439 hashes.reserve(msgs.size());
440 preverifiedMessages.reserve(msgs.size());
442 for (
const auto& p : msgs) {
444 LogPrint(
BCLog::NET,
"%s -- failed to deserialize message, peer=%d\n", __func__, p.first);
451 const auto& msg = *p.second;
454 if (!session.PreVerifyMessage(msg, ban)) {
456 LogPrint(
BCLog::NET,
"%s -- banning node due to failed preverification, peer=%d\n", __func__, p.first);
462 LogPrint(
BCLog::NET,
"%s -- skipping message due to failed preverification, peer=%d\n", __func__, p.first);
466 preverifiedMessages.emplace_back(p);
468 if (preverifiedMessages.empty()) {
473 if (!badNodes.empty()) {
475 for (
auto nodeId : badNodes) {
476 LogPrint(
BCLog::NET,
"%s -- failed to verify signature, peer=%d\n", __func__, nodeId);
481 for (
size_t i = 0; i < preverifiedMessages.size(); i++) {
482 NodeId nodeId = preverifiedMessages[i].first;
483 if (badNodes.count(nodeId)) {
486 const auto& msg = *preverifiedMessages[i].second;
488 session.ReceiveMessage(hashes[i], msg, ban);
490 LogPrint(
BCLog::NET,
"%s -- banning node after ReceiveMessage failed, peer=%d\n", __func__, nodeId);
493 badNodes.emplace(nodeId);
518 LogPrintf(
"%s: ERROR: Unable to find block %s\n", __func__, curQuorumHash.
ToString());
529 bool changed = status.phase != (uint8_t) QuorumPhase_Initialized;
530 status.phase = (uint8_t) QuorumPhase_Initialized;
542 auto fContributeStart = [
this]() {
545 auto fContributeWait = [
this] {
551 auto fComplainStart = [
this]() {
554 auto fComplainWait = [
this] {
560 auto fJustifyStart = [
this]() {
563 auto fJustifyWait = [
this] {
569 auto fCommitStart = [
this]() {
572 auto fCommitWait = [
this] {
577 auto finalCommitments =
curSession->FinalizeCommitments();
578 for (
const auto& fqc : finalCommitments) {
590 status.aborted = true;
593 LogPrintf(
"CDKGSessionHandler::%s -- aborted current DKG session for llmq=%s\n", __func__,
params.
name);
CActiveDeterministicMasternodeManager * activeMasternodeManager
const CChainParams & Params()
Return the currently selected parameters.
const uint256 GetProTx() const
void AggregateInsecure(const CBLSSignature &o)
bool VerifyInsecureAggregated(const std::vector< CBLSPublicKey > &pubKeys, const std::vector< uint256 > &hashes) const
The block chain is a tree shaped structure starting with the genesis block at the root,...
uint256 GetBlockHash() const
int nHeight
height of the entry in the chain. The genesis block has height 0
const Consensus::Params & GetConsensus() const
A writer stream (for serialization) that computes a 256-bit hash.
void write(const char *pch, size_t size)
Information about a peer.
std::string ToString() const
size_t maxMessagesPerNode
bool HasSeen(const uint256 &hash) const
void PushPendingMessage(NodeId from, CDataStream &vRecv, int invType)
std::set< uint256 > seenMessages
std::list< BinaryMessage > PopPendingMessages(size_t maxCount)
std::list< BinaryMessage > pendingMessages
std::map< NodeId, size_t > messagesPerNode
CDKGPendingMessages(size_t _maxMessagesPerNode)
QuorumPhaseAndHash GetPhaseAndQuorumHash() const
std::atomic< bool > stopRequested
std::function< bool()> WhileWaitFunc
bool InitNewQuorum(const CBlockIndex *pindexQuorum)
void ProcessMessage(CNode *pfrom, const std::string &strCommand, CDataStream &vRecv)
void HandlePhase(QuorumPhase curPhase, QuorumPhase nextPhase, const uint256 &expectedQuorumHash, double randomSleepFactor, const StartPhaseFunc &startPhaseFunc, const WhileWaitFunc &runWhileWaiting)
void SleepBeforePhase(QuorumPhase curPhase, const uint256 &expectedQuorumHash, double randomSleepFactor, const WhileWaitFunc &runWhileWaiting)
std::shared_ptr< CDKGSession > curSession
CDKGSessionManager & dkgManager
CDKGPendingMessages pendingPrematureCommitments
CDKGPendingMessages pendingContributions
void UpdatedBlockTip(const CBlockIndex *pindexNew)
const Consensus::LLMQParams & params
void WaitForNewQuorum(const uint256 &oldQuorumHash)
void PhaseHandlerThread()
CDKGPendingMessages pendingComplaints
std::thread phaseHandlerThread
CDKGSessionHandler(const Consensus::LLMQParams &_params, CBLSWorker &blsWorker, CDKGSessionManager &_dkgManager)
void WaitForNextPhase(QuorumPhase curPhase, QuorumPhase nextPhase, const uint256 &expectedQuorumHash, const WhileWaitFunc &runWhileWaiting)
CDKGPendingMessages pendingJustifications
std::function< void()> StartPhaseFunc
The DKG session is a single instance of the DKG process.
CDKGMember * GetMember(const uint256 &proTxHash) const
std::unique_ptr< CDeterministicMNManager > deterministicMNManager
uint256 SerializeHash(const T &obj, int nType=SER_GETHASH, int nVersion=PROTOCOL_VERSION)
Compute the 256-bit hash of an object's serialization.
std::unique_ptr< CConnman > g_connman
#define LogPrint(category,...)
const char * QPCOMMITMENT
const char * QJUSTIFICATION
@ QuorumPhase_Initialized
std::set< NodeId > BatchVerifyMessageSigs(CDKGSession &session, const std::vector< std::pair< NodeId, std::shared_ptr< Message >>> &messages)
std::unique_ptr< CQuorumBlockProcessor > quorumBlockProcessor
void EnsureQuorumConnections(Consensus::LLMQType llmqType, const CBlockIndex *pindexQuorum, const uint256 &myProTxHash)
std::unique_ptr< CDKGDebugManager > quorumDKGDebugManager
void AddQuorumProbeConnections(Consensus::LLMQType llmqType, const CBlockIndex *pindexQuorum, const uint256 &myProTxHash)
void Misbehaving(NodeId pnode, int howmuch, const std::string &message) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
Increase a node's misbehavior score.
RecursiveMutex cs_main
Global state.
@ MSG_QUORUM_JUSTIFICATION
@ MSG_QUORUM_PREMATURE_COMMITMENT
#define WITH_LOCK(cs, code)
Run code while locking a mutex.
#define AssertLockHeld(cs)
void TraceThread(const std::string name, Callable func)
const uint256 UINT256_ZERO
constant uint256 instances
int64_t GetTimeMillis()
Returns the system time (not mockable)
void MilliSleep(int64_t n)
CChain chainActive
The currently-connected chain of blocks (protected by cs_main).
CBlockIndex * LookupBlockIndex(const uint256 &hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main)