client.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721
  1. # Copyright 2016-2018 The NATS Authors
  2. # Licensed under the Apache License, Version 2.0 (the "License");
  3. # you may not use this file except in compliance with the License.
  4. # You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software
  9. # distributed under the License is distributed on an "AS IS" BASIS,
  10. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. # See the License for the specific language governing permissions and
  12. # limitations under the License.
  13. #
  14. import asyncio
  15. import logging
  16. import random
  17. from time import time as now
  18. import stan.pb.protocol_pb2 as protocol
  19. from nats import NATS
  20. from nats.aio.errors import ErrConnectionClosed
  21. from stan.aio.errors import *
  22. __version__ = '0.4.0'
  23. # Subject namespaces for clients to ack and connect
  24. DEFAULT_ACKS_SUBJECT = "_STAN.acks.%s"
  25. DEFAULT_DISCOVER_SUBJECT = "_STAN.discover.%s"
  26. DEFAULT_INBOX_SUBJECT = "_INBOX.%s"
  27. # Ack timeout in seconds
  28. DEFAULT_ACK_WAIT = 30
  29. # Max number of inflight acks from received messages
  30. DEFAULT_MAX_INFLIGHT = 1024
  31. # Connect timeout in seconds
  32. DEFAULT_CONNECT_TIMEOUT = 2
  33. # Max number of inflight pub acks
  34. DEFAULT_MAX_PUB_ACKS_INFLIGHT = 16384
  35. # Max number of pending messages awaiting
  36. # to be processed on a single subscriptions.
  37. DEFAULT_PENDING_LIMIT = 8192
  38. PROTOCOL_ONE = 1
  39. # Default interval (in seconds) at which a connection sends a PING to the server
  40. DEFAULT_PING_INTERVAL = 5
  41. # Default number of PINGs without a response before the connection is considered lost.
  42. DEFAULT_PING_MAX_OUT = 3
  43. logger = logging.getLogger(__name__)
  44. class Client:
  45. """
  46. Asyncio Client for NATS Streaming.
  47. """
  48. def __init__(self):
  49. # NATS transport.
  50. self._nc = None
  51. self._loop = None
  52. self._nats_conn_is_borrowed = False
  53. # Options
  54. self._connect_timeout = None
  55. self._max_pub_acks_inflight = None
  56. # Inbox subscription for periodical heartbeat messages.
  57. self._hb_inbox = None
  58. self._hb_inbox_sid = None
  59. # Subscription for processing received acks from the server.
  60. self._ack_subject = None
  61. self._ack_subject_sid = None
  62. # Publish prefix set by stan to which we append our subject on publish.
  63. self._pub_prefix = None
  64. self._sub_req_subject = None
  65. self._unsub_req_subject = None
  66. self._close_req_subject = None
  67. self._sub_close_req_subject = None
  68. # Map of guid to futures which are signaled when the ack is processed.
  69. self._pub_ack_map = {}
  70. # Map of subscriptions related to the NATS Streaming session.
  71. self._sub_map = {}
  72. self._conn_lost_cb = None
  73. self._ping_sub = None
  74. self._ping_bytes = None
  75. self._ping_requests = None
  76. self._ping_inbox = None
  77. self._ping_interval = None
  78. self._ping_max_out = None
  79. self._ping_out = 0
  80. self._ping_server_task = None
  81. def __repr__(self):
  82. return "<nats streaming client v{}>".format(__version__)
  83. async def connect(self, cluster_id, client_id,
  84. nats=None,
  85. connect_timeout=DEFAULT_CONNECT_TIMEOUT,
  86. max_pub_acks_inflight=DEFAULT_MAX_PUB_ACKS_INFLIGHT,
  87. ping_interval=DEFAULT_PING_INTERVAL,
  88. ping_max_out=DEFAULT_PING_MAX_OUT,
  89. conn_lost_cb=None,
  90. loop=None,
  91. nats_kwargs=None,
  92. ):
  93. """
  94. Starts a session with a NATS Streaming cluster.
  95. :param cluster: Name of the cluster to which we will connect.
  96. :param nats: NATS connection to be borrowed for NATS Streaming.
  97. """
  98. self._cluster_id = cluster_id
  99. self._client_id = client_id
  100. self._loop = loop
  101. self._connect_timeout = connect_timeout
  102. self._conn_id = bytes(new_guid(), "utf-8")
  103. self._conn_lost_cb = conn_lost_cb
  104. if nats is not None:
  105. if nats_kwargs is not None:
  106. raise ValueError('nats_kwargs cannot be set when using a '
  107. 'borrowed NATS connection')
  108. self._nats_conn_is_borrowed = True
  109. self._nc = nats
  110. # NATS Streaming client should use same event loop
  111. # as the borrowed NATS connection.
  112. self._loop = self._nc._loop
  113. else:
  114. if self._nc is None:
  115. self._nc = NATS()
  116. if nats_kwargs is None:
  117. nats_kwargs = {}
  118. nats_kwargs['io_loop'] = self._loop
  119. await self._nc.connect(**nats_kwargs)
  120. # Subjects
  121. self._discover_subject = DEFAULT_DISCOVER_SUBJECT % self._cluster_id
  122. self._hb_inbox = DEFAULT_INBOX_SUBJECT % new_guid()
  123. self._ack_subject = DEFAULT_ACKS_SUBJECT % new_guid()
  124. self._ping_inbox = DEFAULT_INBOX_SUBJECT % new_guid()
  125. # Pending pub acks inflight
  126. self._pending_pub_acks_queue = asyncio.Queue(
  127. maxsize=max_pub_acks_inflight, loop=self._loop)
  128. # Heartbeats subscription
  129. self._hb_inbox_sid = await self._nc.subscribe(
  130. self._hb_inbox,
  131. cb=self._process_heartbeats,
  132. )
  133. # Acks processing subscription
  134. self._ack_subject_sid = await self._nc.subscribe(
  135. self._ack_subject,
  136. cb=self._process_ack,
  137. )
  138. await self._nc.flush()
  139. # Ping subscription
  140. self._ping_sub = await self._nc.subscribe(
  141. self._ping_inbox,
  142. cb=self._process_ping_response,
  143. )
  144. # Start NATS Streaming session by sending ConnectRequest
  145. creq = protocol.ConnectRequest()
  146. creq.clientID = self._client_id
  147. creq.heartbeatInbox = self._hb_inbox
  148. creq.connID = self._conn_id
  149. creq.protocol = PROTOCOL_ONE
  150. creq.pingInterval = ping_interval
  151. creq.pingMaxOut = ping_max_out
  152. payload = creq.SerializeToString()
  153. msg = None
  154. try:
  155. msg = await self._nc.request(
  156. self._discover_subject,
  157. payload,
  158. timeout=self._connect_timeout,
  159. )
  160. except:
  161. await self._close()
  162. raise ErrConnectReqTimeout("stan: failed connecting to '{}'".format(cluster_id))
  163. # We should get the NATS Streaming subject from the
  164. # response from the ConnectRequest.
  165. resp = protocol.ConnectResponse()
  166. resp.ParseFromString(msg.data)
  167. if resp.error != "":
  168. try:
  169. await self._close()
  170. except:
  171. pass
  172. raise StanError(resp.error)
  173. self._pub_prefix = resp.pubPrefix
  174. self._sub_req_subject = resp.subRequests
  175. self._unsub_req_subject = resp.unsubRequests
  176. self._close_req_subject = resp.closeRequests
  177. self._sub_close_req_subject = resp.subCloseRequests
  178. unsub_ping_sub = True
  179. if resp.protocol >= PROTOCOL_ONE:
  180. if resp.pingInterval != 0:
  181. unsub_ping_sub = False
  182. self._ping_requests = resp.pingRequests
  183. self._ping_interval = resp.pingInterval
  184. self._ping_max_out = resp.pingMaxOut
  185. ping = protocol.Ping()
  186. ping.connID = self._conn_id
  187. self._ping_bytes = ping.SerializeToString()
  188. self._ping_server_task = self._loop.create_task(
  189. self._ping_server())
  190. if unsub_ping_sub:
  191. await self._nc.unsubscribe(self._ping_sub)
  192. self._ping_sub = None
  193. self._conn_id = b''
  194. async def _process_heartbeats(self, msg):
  195. """
  196. Receives heartbeats sent to the client by the server.
  197. """
  198. await self._nc.publish(msg.reply, b'')
  199. async def _process_ack(self, msg):
  200. """
  201. Receives acks from the publishes via the _STAN.acks subscription.
  202. """
  203. pub_ack = protocol.PubAck()
  204. pub_ack.ParseFromString(msg.data)
  205. # Unblock pending acks queue if required.
  206. if not self._pending_pub_acks_queue.empty():
  207. await self._pending_pub_acks_queue.get()
  208. try:
  209. cb = self._pub_ack_map[pub_ack.guid]
  210. await cb(pub_ack)
  211. del self._pub_ack_map[pub_ack.guid]
  212. except KeyError:
  213. # Just skip the pub ack
  214. return
  215. except:
  216. # TODO: Check for protocol error
  217. return
  218. async def _ping_server(self):
  219. """
  220. Sends a PING (contianing connection's ID) to the server at intervals specified
  221. by ping_interval. Everytime a PING is sent, the number of outstanding PINGs is increased.
  222. If the total number is > than the ping_max_out option, then the connection is closed,
  223. and conn_lost_cb callback is invoked if one was specified.
  224. """
  225. while True:
  226. try:
  227. await asyncio.sleep(self._ping_interval)
  228. self._ping_out += 1
  229. if self._ping_out > self._ping_max_out:
  230. await self._close_due_to_ping(StanError("stan: connection lost due to PING failure"))
  231. break
  232. try:
  233. await self._nc.publish_request(self._ping_requests, self._ping_inbox, self._ping_bytes, )
  234. except ErrConnectionClosed as e:
  235. await self._close_due_to_ping(StanError(e))
  236. break
  237. except asyncio.CancelledError:
  238. break
  239. async def _process_msg(self, sub):
  240. """
  241. Receives the msgs from the STAN subscriptions and replies.
  242. By default it will reply back with an ack unless manual acking
  243. was specified in one of the subscription options.
  244. """
  245. while True:
  246. try:
  247. raw_msg = await sub._msgs_queue.get()
  248. msg = Msg()
  249. msg_proto = protocol.MsgProto()
  250. msg_proto.ParseFromString(raw_msg.data)
  251. msg.proto = msg_proto
  252. msg.sub = sub
  253. # Yield the message to the subscription callback.
  254. await sub.cb(msg)
  255. if not sub.manual_acks:
  256. # Process auto-ack if not done manually in the callback,
  257. # by publishing into the ack inbox from the subscription.
  258. msg_ack = protocol.Ack()
  259. msg_ack.subject = msg.proto.subject
  260. msg_ack.sequence = msg.proto.sequence
  261. await self._nc.publish(sub.ack_inbox, msg_ack.SerializeToString())
  262. except asyncio.CancelledError:
  263. break
  264. except Exception as ex:
  265. if sub.error_cb:
  266. try:
  267. await sub.error_cb(ex)
  268. except:
  269. logger.exception(
  270. "Exception in error callback for subscription to '%s'",
  271. sub.subject
  272. )
  273. continue
  274. async def _process_ping_response(self, msg):
  275. """
  276. Receives PING responses from the server.
  277. If the response contains an error message, the connection is closed
  278. and the conn_lost_cb callback is invoked if one was specified.
  279. Otherwise _ping_out is reset to 0 indicating that connection is fine
  280. """
  281. ping_resp = protocol.PingResponse()
  282. ping_resp.ParseFromString(msg.data)
  283. if ping_resp.error != "":
  284. await self._close_due_to_ping(StanError(ping_resp.error))
  285. return
  286. self._ping_out = 0
  287. async def ack(self, msg):
  288. """
  289. Used to manually acks a message.
  290. :param msg: Message which is pending to be acked by client.
  291. """
  292. ack_proto = protocol.Ack()
  293. ack_proto.subject = msg.proto.subject
  294. ack_proto.sequence = msg.proto.sequence
  295. await self._nc.publish(msg.sub.ack_inbox, ack_proto.SerializeToString())
  296. async def publish(self, subject, payload,
  297. ack_handler=None,
  298. ack_wait=DEFAULT_ACK_WAIT,
  299. ):
  300. """
  301. Publishes a payload onto a subject. By default, it will block
  302. until the message which has been published has been acked back.
  303. An optional async handler can be publi
  304. :param subject: Subject of the message.
  305. :param payload: Payload of the message which wil be published.
  306. :param ack_handler: Optional handler for async publishing.
  307. :param ack_wait: How long in seconds to wait for an ack to be received.
  308. """
  309. stan_subject = ''.join([self._pub_prefix, '.', subject])
  310. guid = new_guid()
  311. pe = protocol.PubMsg()
  312. pe.clientID = self._client_id
  313. pe.guid = guid
  314. pe.subject = subject
  315. pe.data = payload
  316. pe.connID = self._conn_id
  317. # Control max inflight pubs for the client with a buffered queue.
  318. await self._pending_pub_acks_queue.put(None)
  319. # Process asynchronously if a handler is given.
  320. if ack_handler is not None:
  321. self._pub_ack_map[guid] = ack_handler
  322. try:
  323. await self._nc.publish_request(
  324. stan_subject,
  325. self._ack_subject,
  326. pe.SerializeToString(),
  327. )
  328. return
  329. except Exception as e:
  330. del self._pub_ack_map[guid]
  331. raise e
  332. else:
  333. # Synchronous wait for ack handling.
  334. future = asyncio.Future(loop=self._loop)
  335. async def cb(pub_ack):
  336. nonlocal future
  337. future.set_result(pub_ack)
  338. self._pub_ack_map[guid] = cb
  339. try:
  340. await self._nc.publish_request(
  341. stan_subject,
  342. self._ack_subject,
  343. pe.SerializeToString(),
  344. )
  345. await asyncio.wait_for(future, ack_wait, loop=self._loop)
  346. return future.result()
  347. except Exception as e:
  348. # Remove pending future before raising error.
  349. future.cancel()
  350. del self._pub_ack_map[guid]
  351. raise e
  352. async def subscribe(self, subject,
  353. cb=None,
  354. error_cb=None,
  355. start_at=None,
  356. deliver_all_available=False,
  357. sequence=None,
  358. time=None,
  359. manual_acks=False,
  360. queue=None,
  361. ack_wait=DEFAULT_ACK_WAIT,
  362. max_inflight=DEFAULT_MAX_INFLIGHT,
  363. durable_name=None,
  364. pending_limits=DEFAULT_PENDING_LIMIT,
  365. ):
  366. """
  367. :param subject: Subject for the NATS Streaming subscription.
  368. :param cb: Callback which will be dispatched the
  369. :param error_cb: Async callback called on error, with the exception as
  370. the sole argument.
  371. :param start_at: One of the following options:
  372. - 'new_only' (default)
  373. - 'first'
  374. - 'sequence'
  375. - 'last_received'
  376. - 'time'
  377. :param deliver_all_available: Signals to receive all messages.
  378. :param sequence: Start sequence position from which we will be
  379. receiving the messages.
  380. :param time: Unix timestamp after which the messages will be delivered.
  381. :param manual_acks: Disables auto ack functionality in the subscription
  382. callback so that it is implemented by the user instead.
  383. :param ack_wait: How long to wait for an ack before being redelivered
  384. previous messages.
  385. :param durable_name: Name of the durable subscription.
  386. :param: pending_limits: Max number of messages to await in subscription
  387. before it becomes a slow consumer.
  388. """
  389. sub = Subscription(
  390. subject=subject,
  391. queue=queue,
  392. cb=cb,
  393. error_cb=error_cb,
  394. manual_acks=manual_acks,
  395. stan=self,
  396. )
  397. self._sub_map[sub.inbox] = sub
  398. # Have the message processing queue ready before making the subscription.
  399. sub._msgs_queue = asyncio.Queue(maxsize=pending_limits, loop=self._loop)
  400. # Helper coroutine which will just put messages in to the queue,
  401. # whenever the NATS client receives a message.
  402. async def cb(raw_msg):
  403. nonlocal sub
  404. await sub._msgs_queue.put(raw_msg)
  405. # Should create the NATS Subscription before making the request.
  406. sid = await self._nc.subscribe(sub.inbox, cb=cb)
  407. sub.sid = sid
  408. req = protocol.SubscriptionRequest()
  409. req.clientID = self._client_id
  410. req.maxInFlight = max_inflight
  411. req.ackWaitInSecs = ack_wait
  412. if queue is not None:
  413. req.qGroup = queue
  414. if durable_name is not None:
  415. req.durableName = durable_name
  416. # Normalize start position options.
  417. if deliver_all_available:
  418. req.startPosition = protocol.First
  419. elif start_at is None or start_at == 'new_only':
  420. req.startPosition = protocol.NewOnly
  421. elif start_at == 'last_received':
  422. req.startPosition = protocol.LastReceived
  423. elif start_at == 'time':
  424. req.startPosition = protocol.TimeDeltaStart
  425. req.startTimeDelta = int(now() - time) * 1000000000
  426. elif start_at == 'sequence':
  427. req.startPosition = protocol.SequenceStart
  428. req.startSequence = sequence
  429. elif start_at == 'first':
  430. req.startPosition = protocol.First
  431. # Set STAN subject and NATS inbox where we will be awaiting
  432. # for the messages to be delivered.
  433. req.subject = subject
  434. req.inbox = sub.inbox
  435. msg = await self._nc.request(
  436. self._sub_req_subject,
  437. req.SerializeToString(),
  438. self._connect_timeout,
  439. )
  440. resp = protocol.SubscriptionResponse()
  441. resp.ParseFromString(msg.data)
  442. # If there is an error here, then rollback the
  443. # subscription which we have sent already.
  444. if resp.error != "":
  445. try:
  446. await self._nc.unsubscribe(sid)
  447. except:
  448. pass
  449. raise StanError(resp.error)
  450. sub.ack_inbox = resp.ackInbox
  451. sub._msgs_task = self._loop.create_task(self._process_msg(sub))
  452. return sub
  453. async def _close(self):
  454. """
  455. Removes any present internal state from the client.
  456. """
  457. # Remove the core NATS Streaming subscriptions.
  458. try:
  459. if self._ping_sub is not None:
  460. await self._nc.unsubscribe(self._ping_sub)
  461. self._ping_sub = None
  462. self._ping_inbox = None
  463. if self._ping_server_task is not None:
  464. self._ping_server_task.cancel()
  465. self._ping_server_task = None
  466. if self._hb_inbox_sid is not None:
  467. await self._nc.unsubscribe(self._hb_inbox_sid)
  468. self._hb_inbox = None
  469. self._hb_inbox_sid = None
  470. if self._ack_subject_sid is not None:
  471. await self._nc.unsubscribe(self._ack_subject_sid)
  472. self._ack_subject = None
  473. self._ack_subject_sid = None
  474. except:
  475. # FIXME: async error in case these fail?
  476. pass
  477. # Remove all the related subscriptions
  478. for _, sub in self._sub_map.items():
  479. if sub._msgs_task is not None:
  480. sub._msgs_task.cancel()
  481. try:
  482. await self._nc.unsubscribe(sub.sid)
  483. except:
  484. continue
  485. self._sub_map = {}
  486. async def _close_due_to_ping(self, err):
  487. await self._close()
  488. if self._conn_lost_cb is not None:
  489. await self._conn_lost_cb(err)
  490. self._conn_lost_cb = None
  491. async def close(self):
  492. """
  493. Close terminates a session with NATS Streaming.
  494. """
  495. # Remove the core NATS Streaming subscriptions.
  496. await self._close()
  497. req = protocol.CloseRequest()
  498. req.clientID = self._client_id
  499. msg = await self._nc.request(
  500. self._close_req_subject,
  501. req.SerializeToString(),
  502. self._connect_timeout,
  503. )
  504. resp = protocol.CloseResponse()
  505. resp.ParseFromString(msg.data)
  506. if not self._nats_conn_is_borrowed:
  507. await self._nc.close()
  508. if resp.error != "":
  509. raise StanError(resp.error)
  510. class Subscription(object):
  511. def __init__(self,
  512. subject='',
  513. queue='',
  514. cb=None,
  515. error_cb=None,
  516. sid=None,
  517. durable_name=None,
  518. ack_inbox=None,
  519. manual_acks=False,
  520. stan=None,
  521. msgs_queue=None,
  522. msgs_task=None,
  523. ):
  524. self.subject = subject
  525. self.queue = queue
  526. self.cb = cb
  527. self.error_cb = error_cb
  528. self.inbox = DEFAULT_INBOX_SUBJECT % new_guid()
  529. self.sid = sid
  530. self.ack_inbox = ack_inbox
  531. self.durable_name = durable_name
  532. self.manual_acks = manual_acks
  533. self._sc = stan
  534. self._nc = stan._nc
  535. self._msgs_queue = msgs_queue
  536. self._msgs_task = msgs_task
  537. @property
  538. def pending_queue_size(self):
  539. return self._msgs_queue.qsize()
  540. async def unsubscribe(self):
  541. """
  542. Remove subscription on a topic in this client.
  543. """
  544. await self._nc.unsubscribe(self.sid)
  545. try:
  546. # Stop the processing task for the subscription.
  547. sub = self._sc._sub_map[self.inbox]
  548. sub._msgs_task.cancel()
  549. del self._sc._sub_map[self.inbox]
  550. except KeyError:
  551. pass
  552. req = protocol.UnsubscribeRequest()
  553. req.clientID = self._sc._client_id
  554. req.subject = self.subject
  555. req.inbox = self.ack_inbox
  556. if self.durable_name is not None:
  557. req.durableName = self.durable_name
  558. msg = await self._nc.request(
  559. self._sc._unsub_req_subject,
  560. req.SerializeToString(),
  561. self._sc._connect_timeout,
  562. )
  563. resp = protocol.SubscriptionResponse()
  564. resp.ParseFromString(msg.data)
  565. if resp.error != "":
  566. raise StanError(resp.error)
  567. async def close(self):
  568. """
  569. Closes a NATS streaming subscription.
  570. """
  571. await self._nc.unsubscribe(self.sid)
  572. try:
  573. # Stop the processing task for the subscription.
  574. sub = self._sc._sub_map[self.inbox]
  575. sub._msgs_task.cancel()
  576. del self._sc._sub_map[self.inbox]
  577. except KeyError:
  578. pass
  579. req = protocol.UnsubscribeRequest()
  580. req.clientID = self._sc._client_id
  581. req.subject = self.subject
  582. req.inbox = self.ack_inbox
  583. if self.durable_name is not None:
  584. req.durableName = self.durable_name
  585. msg = await self._nc.request(
  586. self._sc._sub_close_req_subject,
  587. req.SerializeToString(),
  588. self._sc._connect_timeout,
  589. )
  590. resp = protocol.SubscriptionResponse()
  591. resp.ParseFromString(msg.data)
  592. if resp.error != "":
  593. raise StanError(resp.error)
  594. class Msg(object):
  595. def __init__(self):
  596. self.proto = None
  597. self.sub = None
  598. @property
  599. def data(self):
  600. return self.proto.data
  601. @property
  602. def sequence(self):
  603. return self.proto.sequence
  604. @property
  605. def seq(self):
  606. return self.proto.sequence
  607. @property
  608. def timestamp(self):
  609. return self.proto.timestamp
  610. def __repr__(self):
  611. return "<nats streaming msg sequence={}, time={}>".format(self.proto.sequence, self.proto.timestamp)
  612. def new_guid():
  613. return "%x" % random.SystemRandom().getrandbits(0x58)