PIVX Core  5.6.99
P2P Digital Currency
zmqpublishnotifier.cpp
Go to the documentation of this file.
1 // Copyright (c) 2015 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #include "zmqpublishnotifier.h"
6 
7 #include "chainparams.h"
8 #include "util/system.h"
9 #include "crypto/common.h"
10 #include "validation.h" // cs_main
11 
12 static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
13 
14 static const char *MSG_HASHBLOCK = "hashblock";
15 static const char *MSG_HASHTX = "hashtx";
16 static const char *MSG_RAWBLOCK = "rawblock";
17 static const char *MSG_RAWTX = "rawtx";
18 
19 // Internal function to send multipart message
20 static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
21 {
22  va_list args;
23  va_start(args, size);
24 
25  while (1)
26  {
27  zmq_msg_t msg;
28 
29  int rc = zmq_msg_init_size(&msg, size);
30  if (rc != 0)
31  {
32  zmqError("Unable to initialize ZMQ msg");
33  va_end(args);
34  return -1;
35  }
36 
37  void *buf = zmq_msg_data(&msg);
38  memcpy(buf, data, size);
39 
40  data = va_arg(args, const void*);
41 
42  rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
43  if (rc == -1)
44  {
45  zmqError("Unable to send ZMQ msg");
46  zmq_msg_close(&msg);
47  va_end(args);
48  return -1;
49  }
50 
51  zmq_msg_close(&msg);
52 
53  if (!data)
54  break;
55 
56  size = va_arg(args, size_t);
57  }
58  va_end(args);
59  return 0;
60 }
61 
63 {
64  assert(!psocket);
65 
66  // check if address is being used by other publish notifier
67  std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
68 
69  if (i==mapPublishNotifiers.end())
70  {
71  psocket = zmq_socket(pcontext, ZMQ_PUB);
72  if (!psocket)
73  {
74  zmqError("Failed to create socket");
75  return false;
76  }
77 
78  int rc = zmq_bind(psocket, address.c_str());
79  if (rc!=0)
80  {
81  zmqError("Failed to bind address");
82  zmq_close(psocket);
83  return false;
84  }
85 
86  // register this notifier for the address, so it can be reused for other publish notifier
87  mapPublishNotifiers.emplace(address, this);
88  return true;
89  }
90  else
91  {
92  LogPrint(BCLog::ZMQ, "Reusing socket for address %s\n", address);
93 
94  psocket = i->second->psocket;
95  mapPublishNotifiers.emplace(address, this);
96 
97  return true;
98  }
99 }
100 
102 {
103  assert(psocket);
104 
105  int count = mapPublishNotifiers.count(address);
106 
107  // remove this notifier from the list of publishers using this address
108  typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator;
109  std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
110 
111  for (iterator it = iterpair.first; it != iterpair.second; ++it)
112  {
113  if (it->second==this)
114  {
115  mapPublishNotifiers.erase(it);
116  break;
117  }
118  }
119 
120  if (count == 1)
121  {
122  LogPrint(BCLog::ZMQ, "Close socket at address %s\n", address);
123  int linger = 0;
124  zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
125  zmq_close(psocket);
126  }
127 
128  psocket = 0;
129 }
130 
131 bool CZMQAbstractPublishNotifier::SendMessage(const char *command, const void* data, size_t size)
132 {
133  assert(psocket);
134 
135  /* send three parts, command & data & a LE 4byte sequence number */
136  unsigned char msgseq[sizeof(uint32_t)];
137  WriteLE32(&msgseq[0], nSequence);
138  int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), (void*)0);
139  if (rc == -1)
140  return false;
141 
142  /* increment memory only sequence number after sending */
143  nSequence++;
144 
145  return true;
146 }
147 
149 {
150  uint256 hash = pindex->GetBlockHash();
151  LogPrint(BCLog::ZMQ, "Publish hashblock %s\n", hash.GetHex());
152  char data[32];
153  for (unsigned int i = 0; i < 32; i++)
154  data[31 - i] = hash.begin()[i];
155  return SendMessage(MSG_HASHBLOCK, data, 32);
156 }
157 
159 {
160  uint256 hash = transaction.GetHash();
161  LogPrint(BCLog::ZMQ, "Publish hashtx %s\n", hash.GetHex());
162  char data[32];
163  for (unsigned int i = 0; i < 32; i++)
164  data[31 - i] = hash.begin()[i];
165  return SendMessage(MSG_HASHTX, data, 32);
166 }
167 
169 {
170  LogPrint(BCLog::ZMQ, "Publish rawblock %s\n", pindex->GetBlockHash().GetHex());
171 
172  CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
173  {
174  LOCK(cs_main);
175  CBlock block;
176  if(!ReadBlockFromDisk(block, pindex))
177  {
178  zmqError("Can't read block from disk");
179  return false;
180  }
181 
182  ss << block;
183  }
184 
185  return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
186 }
187 
189 {
190  uint256 hash = transaction.GetHash();
191  LogPrint(BCLog::ZMQ, "Publish rawtx %s\n", hash.GetHex());
192  CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
193  ss << transaction;
194  return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
195 }
const_iterator begin() const
Definition: streams.h:161
size_type size() const
Definition: streams.h:165
Definition: block.h:80
The block chain is a tree shaped structure starting with the genesis block at the root,...
Definition: chain.h:139
uint256 GetBlockHash() const
Definition: chain.h:215
The basic transaction that is broadcasted on the network and contained in blocks.
Definition: transaction.h:244
const uint256 & GetHash() const
Definition: transaction.h:301
bool SendMessage(const char *command, const void *data, size_t size)
bool Initialize(void *pcontext)
bool NotifyBlock(const CBlockIndex *pindex)
bool NotifyTransaction(const CTransaction &transaction)
bool NotifyBlock(const CBlockIndex *pindex)
bool NotifyTransaction(const CTransaction &transaction)
std::string GetHex() const
Definition: uint256.cpp:21
unsigned char * begin()
Definition: uint256.h:63
256-bit opaque blob.
Definition: uint256.h:138
void * memcpy(void *a, const void *b, size_t c)
@ LOCK
Definition: lockunlock.h:16
#define LogPrint(category,...)
Definition: logging.h:163
@ ZMQ
Definition: logging.h:46
RecursiveMutex cs_main
Global state.
Definition: validation.cpp:80
@ SER_NETWORK
Definition: serialize.h:174
bool ReadBlockFromDisk(CBlock &block, const FlatFilePos &pos)
Definition: validation.cpp:758
void zmqError(const char *str)