PIVX Core  5.6.99
P2P Digital Currency
zmqnotificationinterface.cpp
Go to the documentation of this file.
1 // Copyright (c) 2015 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
6 #include "zmqpublishnotifier.h"
7 
8 #include "version.h"
9 #include "streams.h"
10 #include "util/system.h"
11 
12 void zmqError(const char *str)
13 {
14  LogPrint(BCLog::ZMQ, "Error: %s, errno=%s\n", str, zmq_strerror(errno));
15 }
16 
18 {
19 }
20 
22 {
23  Shutdown();
24 
25  for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
26  {
27  delete *i;
28  }
29 }
30 
32 {
33  CZMQNotificationInterface* notificationInterface = nullptr;
34  std::map<std::string, CZMQNotifierFactory> factories;
35  std::list<CZMQAbstractNotifier*> notifiers;
36 
37  factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
38  factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
39  factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>;
40  factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
41 
42  for (const auto& entry : factories)
43  {
44  std::string arg("-zmq" + entry.first);
45  if (gArgs.IsArgSet(arg))
46  {
47  CZMQNotifierFactory factory = entry.second;
48  std::string address = gArgs.GetArg(arg, "");
49  CZMQAbstractNotifier *notifier = factory();
50  notifier->SetType(entry.first);
51  notifier->SetAddress(address);
52  notifiers.push_back(notifier);
53  }
54  }
55 
56  if (!notifiers.empty())
57  {
58  notificationInterface = new CZMQNotificationInterface();
59  notificationInterface->notifiers = notifiers;
60 
61  if (!notificationInterface->Initialize())
62  {
63  delete notificationInterface;
64  notificationInterface = nullptr;
65  }
66  }
67 
68  return notificationInterface;
69 }
70 
71 // Called at startup to conditionally set up ZMQ socket(s)
73 {
74  LogPrint(BCLog::ZMQ, "Initialize notification interface\n");
75  assert(!pcontext);
76 
77  pcontext = zmq_init(1);
78 
79  if (!pcontext)
80  {
81  zmqError("Unable to initialize context");
82  return false;
83  }
84 
85  std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin();
86  for (; i!=notifiers.end(); ++i)
87  {
88  CZMQAbstractNotifier *notifier = *i;
89  if (notifier->Initialize(pcontext))
90  {
91  LogPrint(BCLog::ZMQ, "Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
92  }
93  else
94  {
95  LogPrint(BCLog::ZMQ, "Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
96  break;
97  }
98  }
99 
100  if (i!=notifiers.end())
101  {
102  return false;
103  }
104 
105  return true;
106 }
107 
108 // Called during shutdown sequence
110 {
111  LogPrint(BCLog::ZMQ, "Shutdown notification interface\n");
112  if (pcontext)
113  {
114  for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
115  {
116  CZMQAbstractNotifier *notifier = *i;
117  LogPrint(BCLog::ZMQ, "Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
118  notifier->Shutdown();
119  }
120  zmq_ctx_destroy(pcontext);
121 
122  pcontext = 0;
123  }
124 }
125 
126 void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload)
127 {
128  if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones
129  return;
130 
131  for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
132  {
133  CZMQAbstractNotifier *notifier = *i;
134  if (notifier->NotifyBlock(pindexNew))
135  {
136  i++;
137  }
138  else
139  {
140  notifier->Shutdown();
141  i = notifiers.erase(i);
142  }
143  }
144 }
145 
147 {
148  // Used by BlockConnected and BlockDisconnected as well, because they're
149  // all the same external callback.
150  const CTransaction& tx = *ptx;
151 
152  for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
153  {
154  CZMQAbstractNotifier *notifier = *i;
155  if (notifier->NotifyTransaction(tx))
156  {
157  i++;
158  }
159  else
160  {
161  notifier->Shutdown();
162  i = notifiers.erase(i);
163  }
164  }
165 }
166 
167 void CZMQNotificationInterface::BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected)
168 {
169  for (const CTransactionRef& ptx : pblock->vtx) {
170  // Do a normal notify for each transaction added in the block
172  }
173 }
174 
175 void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const uint256& blockHash, int nBlockHeight, int64_t blockTime)
176 {
177  for (const CTransactionRef& ptx : pblock->vtx) {
178  // Do a normal notify for each transaction removed in block disconnection
180  }
181 }
bool IsArgSet(const std::string &strArg) const
Return true if the given argument has been manually set.
Definition: system.cpp:425
std::string GetArg(const std::string &strArg, const std::string &strDefault) const
Return string argument or default value.
Definition: system.cpp:449
The block chain is a tree shaped structure starting with the genesis block at the root,...
Definition: chain.h:139
The basic transaction that is broadcasted on the network and contained in blocks.
Definition: transaction.h:244
std::string GetType() const
virtual void Shutdown()=0
void SetAddress(const std::string &a)
virtual bool NotifyBlock(const CBlockIndex *pindex)
virtual bool Initialize(void *pcontext)=0
virtual bool NotifyTransaction(const CTransaction &transaction)
std::string GetAddress() const
void SetType(const std::string &t)
void BlockConnected(const std::shared_ptr< const CBlock > &pblock, const CBlockIndex *pindexConnected) override
Notifies listeners of a block being connected.
static CZMQNotificationInterface * Create()
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override
Notifies listeners when the block chain tip advances.
void TransactionAddedToMempool(const CTransactionRef &tx) override
Notifies listeners of a transaction having been added to mempool.
void BlockDisconnected(const std::shared_ptr< const CBlock > &pblock, const uint256 &blockHash, int nBlockHeight, int64_t blockTime) override
Notifies listeners of a block being disconnected.
std::list< CZMQAbstractNotifier * > notifiers
256-bit opaque blob.
Definition: uint256.h:138
#define LogPrint(category,...)
Definition: logging.h:163
@ ZMQ
Definition: logging.h:46
ArgsManager gArgs
Definition: system.cpp:89
std::shared_ptr< const CTransaction > CTransactionRef
Definition: transaction.h:456
CZMQAbstractNotifier *(* CZMQNotifierFactory)()
void zmqError(const char *str)