irc.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671
  1. # Copyright (c) 2017–2018 crocoite contributors
  2. #
  3. # Permission is hereby granted, free of charge, to any person obtaining a copy
  4. # of this software and associated documentation files (the "Software"), to deal
  5. # in the Software without restriction, including without limitation the rights
  6. # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  7. # copies of the Software, and to permit persons to whom the Software is
  8. # furnished to do so, subject to the following conditions:
  9. #
  10. # The above copyright notice and this permission notice shall be included in
  11. # all copies or substantial portions of the Software.
  12. #
  13. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  14. # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  15. # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  16. # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  17. # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  18. # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  19. # THE SOFTWARE.
  20. """
  21. IRC bot “chromebot”
  22. """
  23. import asyncio, argparse, json, tempfile, time, random, os, shlex
  24. from datetime import datetime
  25. from urllib.parse import urlsplit
  26. from enum import IntEnum, unique
  27. from collections import defaultdict
  28. from abc import abstractmethod
  29. from functools import wraps
  30. import bottom
  31. import websockets
  32. from .util import StrJsonEncoder
  33. from .cli import cookie
  34. ### helper functions ###
  35. def prettyTimeDelta (seconds):
  36. """
  37. Pretty-print seconds to human readable string 1d 1h 1m 1s
  38. """
  39. seconds = int(seconds)
  40. days, seconds = divmod(seconds, 86400)
  41. hours, seconds = divmod(seconds, 3600)
  42. minutes, seconds = divmod(seconds, 60)
  43. s = [(days, 'd'), (hours, 'h'), (minutes, 'm'), (seconds, 's')]
  44. s = filter (lambda x: x[0] != 0, s)
  45. return ' '.join (map (lambda x: '{}{}'.format (*x), s))
  46. def prettyBytes (b):
  47. """
  48. Pretty-print bytes
  49. """
  50. prefixes = ['B', 'KiB', 'MiB', 'GiB', 'TiB']
  51. while b >= 1024 and len (prefixes) > 1:
  52. b /= 1024
  53. prefixes.pop (0)
  54. return f'{b:.1f} {prefixes[0]}'
  55. def isValidUrl (s):
  56. url = urlsplit (s)
  57. if url.scheme and url.netloc and url.scheme in {'http', 'https'}:
  58. return s
  59. raise TypeError ()
  60. class NonExitingArgumentParser (argparse.ArgumentParser):
  61. """ Argument parser that does not call exit(), suitable for interactive use """
  62. def exit (self, status=0, message=None):
  63. # should never be called
  64. pass
  65. def error (self, message):
  66. # if we use subparsers it’s important to return self, so we can show
  67. # the correct help
  68. raise Exception (self, message)
  69. def format_usage (self):
  70. return super().format_usage ().replace ('\n', ' ')
  71. class Status(IntEnum):
  72. """ Job status """
  73. undefined = 0
  74. pending = 1
  75. running = 2
  76. aborted = 3
  77. finished = 4
  78. # see https://arxiv.org/html/0901.4016 on how to build proquints (human
  79. # pronouncable unique ids)
  80. toConsonant = 'bdfghjklmnprstvz'
  81. toVowel = 'aiou'
  82. def u16ToQuint (v):
  83. """ Transform a 16 bit unsigned integer into a single quint """
  84. assert 0 <= v < 2**16
  85. # quints are “big-endian”
  86. return ''.join ([
  87. toConsonant[(v>>(4+2+4+2))&0xf],
  88. toVowel[(v>>(4+2+4))&0x3],
  89. toConsonant[(v>>(4+2))&0xf],
  90. toVowel[(v>>4)&0x3],
  91. toConsonant[(v>>0)&0xf],
  92. ])
  93. def uintToQuint (v, length=2):
  94. """ Turn any integer into a proquint with fixed length """
  95. assert 0 <= v < 2**(length*16)
  96. return '-'.join (reversed ([u16ToQuint ((v>>(x*16))&0xffff) for x in range (length)]))
  97. def makeJobId ():
  98. """ Create job id from time and randomness source """
  99. # allocate 48 bits for the time (in milliseconds) and add 16 random bits
  100. # at the end (just to be sure) for a total of 64 bits. Should be enough to
  101. # avoid collisions.
  102. randbits = 16
  103. stamp = (int (time.time ()*1000) << randbits) | random.randint (0, 2**randbits-1)
  104. return uintToQuint (stamp, 4)
  105. class Job:
  106. """ Archival job """
  107. __slots__ = ('id', 'stats', 'rstats', 'started', 'finished', 'nick', 'status', 'process', 'url')
  108. def __init__ (self, url, nick):
  109. self.id = makeJobId ()
  110. self.stats = {}
  111. self.rstats = {}
  112. self.started = datetime.utcnow ()
  113. self.finished = None
  114. self.url = url
  115. # user who scheduled this job
  116. self.nick = nick
  117. self.status = Status.pending
  118. self.process = None
  119. def formatStatus (self):
  120. stats = self.stats
  121. rstats = self.rstats
  122. return (f"{self.url} ({self.id}) {self.status.name}. "
  123. f"{rstats.get ('have', 0)} pages finished, "
  124. f"{rstats.get ('pending', 0)} pending; "
  125. f"{stats.get ('crashed', 0)} crashed, "
  126. f"{stats.get ('requests', 0)} requests, "
  127. f"{stats.get ('failed', 0)} failed, "
  128. f"{prettyBytes (stats.get ('bytesRcv', 0))} received.")
  129. @unique
  130. class NickMode(IntEnum):
  131. # the actual numbers don’t matter, but their order must be strictly
  132. # increasing (with priviledge level)
  133. operator = 100
  134. voice = 10
  135. @classmethod
  136. def fromMode (cls, mode):
  137. return {'v': cls.voice, 'o': cls.operator}[mode]
  138. @classmethod
  139. def fromNickPrefix (cls, mode):
  140. return {'@': cls.operator, '+': cls.voice}[mode]
  141. @property
  142. def human (self):
  143. return {self.operator: 'operator', self.voice: 'voice'}[self]
  144. class User:
  145. """ IRC user """
  146. __slots__ = ('name', 'modes')
  147. def __init__ (self, name, modes=None):
  148. self.name = name
  149. self.modes = modes or set ()
  150. def __eq__ (self, b):
  151. return self.name == b.name
  152. def __hash__ (self):
  153. return hash (self.name)
  154. def __repr__ (self):
  155. return f'<User {self.name} {self.modes}>'
  156. def hasPriv (self, p):
  157. if p is None:
  158. return True
  159. else:
  160. return self.modes and max (self.modes) >= p
  161. @classmethod
  162. def fromName (cls, name):
  163. """ Get mode and name from NAMES command """
  164. try:
  165. modes = {NickMode.fromNickPrefix (name[0])}
  166. name = name[1:]
  167. except KeyError:
  168. modes = set ()
  169. return cls (name, modes)
  170. class ReplyContext:
  171. __slots__ = ('client', 'target', 'user')
  172. def __init__ (self, client, target, user):
  173. self.client = client
  174. self.target = target
  175. self.user = user
  176. def __call__ (self, message):
  177. self.client.send ('PRIVMSG', target=self.target,
  178. message=f'{self.user.name}: {message}')
  179. class RefCountEvent:
  180. """
  181. Ref-counted event that triggers if a) armed and b) refcount drops to zero.
  182. Must be used as a context manager.
  183. """
  184. __slots__ = ('count', 'event', 'armed')
  185. def __init__ (self):
  186. self.armed = False
  187. self.count = 0
  188. self.event = asyncio.Event ()
  189. def __enter__ (self):
  190. self.count += 1
  191. self.event.clear ()
  192. def __exit__ (self, exc_type, exc_val, exc_tb):
  193. self.count -= 1
  194. if self.armed and self.count == 0:
  195. self.event.set ()
  196. async def wait (self):
  197. await self.event.wait ()
  198. def arm (self):
  199. self.armed = True
  200. if self.count == 0:
  201. self.event.set ()
  202. class ArgparseBot (bottom.Client):
  203. """
  204. Simple IRC bot using argparse
  205. Tracks user’s modes, reconnects on disconnect
  206. """
  207. __slots__ = ('channels', 'nick', 'parser', 'users', '_quit')
  208. def __init__ (self, host, port, ssl, nick, logger, channels=None, loop=None):
  209. super().__init__ (host=host, port=port, ssl=ssl, loop=loop)
  210. self.channels = channels or []
  211. self.nick = nick
  212. # map channel -> nick -> user
  213. self.users = defaultdict (dict)
  214. self.logger = logger.bind (context=type (self).__name__)
  215. self.parser = self.getParser ()
  216. # bot does not accept new queries in shutdown mode, unless explicitly
  217. # permitted by the parser
  218. self._quit = RefCountEvent ()
  219. # register bottom event handler
  220. self.on('CLIENT_CONNECT', self.onConnect)
  221. self.on('PING', self.onKeepalive)
  222. self.on('PRIVMSG', self.onMessage)
  223. self.on('CLIENT_DISCONNECT', self.onDisconnect)
  224. self.on('RPL_NAMREPLY', self.onNameReply)
  225. self.on('CHANNELMODE', self.onMode)
  226. self.on('PART', self.onPart)
  227. self.on('JOIN', self.onJoin)
  228. # XXX: we would like to handle KICK, but bottom does not support that at the moment
  229. @abstractmethod
  230. def getParser (self):
  231. pass
  232. def cancel (self):
  233. self.logger.info ('cancel', uuid='1eb34aea-a854-4fec-90b2-7f8a3812a9cd')
  234. self._quit.arm ()
  235. async def run (self):
  236. await self.connect ()
  237. await self._quit.wait ()
  238. self.send ('QUIT', message='Bye.')
  239. await self.disconnect ()
  240. async def onConnect (self, **kwargs):
  241. self.logger.info ('connect', nick=self.nick, uuid='01f7b138-ea53-4609-88e9-61f3eca3e7e7')
  242. self.send('NICK', nick=self.nick)
  243. self.send('USER', user=self.nick, realname='https://github.com/PromyLOPh/crocoite')
  244. # Don't try to join channels until the server has
  245. # sent the MOTD, or signaled that there's no MOTD.
  246. done, pending = await asyncio.wait(
  247. [self.wait('RPL_ENDOFMOTD'), self.wait('ERR_NOMOTD')],
  248. loop=self.loop, return_when=asyncio.FIRST_COMPLETED)
  249. # Cancel whichever waiter's event didn't come in.
  250. for future in pending:
  251. future.cancel()
  252. for c in self.channels:
  253. self.logger.info ('join', channel=c, uuid='367063a5-9069-4025-907c-65ba88af8593')
  254. self.send ('JOIN', channel=c)
  255. # no need for NAMES here, server sends this automatically
  256. async def onNameReply (self, channel, users, **kwargs):
  257. # channels may be too big for a single message
  258. addusers = dict (map (lambda x: (x.name, x), map (User.fromName, users)))
  259. if channel not in self.users:
  260. self.users[channel] = addusers
  261. else:
  262. self.users[channel].update (addusers)
  263. @staticmethod
  264. def parseMode (mode):
  265. """ Parse mode strings like +a, -b, +a-b, -b+a, … """
  266. action = '+'
  267. ret = []
  268. for c in mode:
  269. if c in {'+', '-'}:
  270. action = c
  271. else:
  272. ret.append ((action, c))
  273. return ret
  274. async def onMode (self, channel, modes, params, **kwargs):
  275. if channel not in self.channels:
  276. return
  277. for (action, mode), nick in zip (self.parseMode (modes), params):
  278. try:
  279. m = NickMode.fromMode (mode)
  280. u = self.users[channel].get (nick, User (nick))
  281. if action == '+':
  282. u.modes.add (m)
  283. elif action == '-':
  284. u.modes.remove (m)
  285. except KeyError:
  286. # unknown mode, ignore
  287. pass
  288. async def onPart (self, nick, channel, **kwargs):
  289. if channel not in self.channels:
  290. return
  291. try:
  292. self.users[channel].pop (nick)
  293. except KeyError:
  294. # gone already
  295. pass
  296. async def onJoin (self, nick, channel, **kwargs):
  297. if channel not in self.channels:
  298. return
  299. self.users[channel][nick] = User (nick)
  300. async def onKeepalive (self, message, **kwargs):
  301. """ Ping received """
  302. self.send('PONG', message=message)
  303. async def onMessage (self, nick, target, message, **kwargs):
  304. """ Message received """
  305. msgPrefix = self.nick + ':'
  306. if target in self.channels and message.startswith (msgPrefix):
  307. user = self.users[target].get (nick, User (nick))
  308. reply = ReplyContext (client=self, target=target, user=user)
  309. # shlex.split supports quoting arguments, which str.split() does not
  310. command = shlex.split (message[len (msgPrefix):])
  311. try:
  312. args = self.parser.parse_args (command)
  313. except Exception as e:
  314. reply (f'{e.args[1]} -- {e.args[0].format_usage ()}')
  315. return
  316. if not args or not hasattr (args, 'func'):
  317. reply (f'Sorry, I don’t understand {command}')
  318. return
  319. minPriv = getattr (args, 'minPriv', None)
  320. if self._quit.armed and not getattr (args, 'allowOnShutdown', False):
  321. reply ('Sorry, I’m shutting down and cannot accept your request right now.')
  322. elif not user.hasPriv (minPriv):
  323. reply (f'Sorry, you need the privilege {minPriv.human} to use this command.')
  324. else:
  325. with self._quit:
  326. await args.func (user=user, args=args, reply=reply)
  327. async def onDisconnect (self, **kwargs):
  328. """ Auto-reconnect """
  329. self.logger.info ('disconnect', uuid='4c74b2c8-2403-4921-879d-2279ad85db72')
  330. while True:
  331. if not self._quit.armed:
  332. await asyncio.sleep (10, loop=self.loop)
  333. self.logger.info ('reconnect', uuid='c53555cb-e1a4-4b69-b1c9-3320269c19d7')
  334. try:
  335. await self.connect ()
  336. finally:
  337. break
  338. def jobExists (func):
  339. """ Chromebot job exists """
  340. @wraps (func)
  341. async def inner (self, **kwargs):
  342. # XXX: not sure why it works with **kwargs, but not (user, args, reply)
  343. args = kwargs.get ('args')
  344. reply = kwargs.get ('reply')
  345. j = self.jobs.get (args.id, None)
  346. if not j:
  347. reply (f'Job {args.id} is unknown')
  348. else:
  349. ret = await func (self, job=j, **kwargs)
  350. return ret
  351. return inner
  352. class Chromebot (ArgparseBot):
  353. __slots__ = ('jobs', 'tempdir', 'destdir', 'processLimit', 'blacklist', 'needVoice')
  354. def __init__ (self, host, port, ssl, nick, logger, channels=None,
  355. tempdir=None, destdir='.', processLimit=1,
  356. blacklist={}, needVoice=False, loop=None):
  357. self.needVoice = needVoice
  358. super().__init__ (host=host, port=port, ssl=ssl, nick=nick,
  359. logger=logger, channels=channels, loop=loop)
  360. self.jobs = {}
  361. self.tempdir = tempdir or tempfile.gettempdir()
  362. self.destdir = destdir
  363. self.processLimit = asyncio.Semaphore (processLimit)
  364. self.blacklist = blacklist
  365. def getParser (self):
  366. parser = NonExitingArgumentParser (prog=self.nick + ': ', add_help=False)
  367. subparsers = parser.add_subparsers(help='Sub-commands')
  368. archiveparser = subparsers.add_parser('a', help='Archive a site', add_help=False)
  369. archiveparser.add_argument('--concurrency', '-j', default=1, type=int, help='Parallel workers for this job', choices=range (1, 5))
  370. archiveparser.add_argument('--recursive', '-r', help='Enable recursion', choices=['0', '1', 'prefix'], default='0')
  371. archiveparser.add_argument('--insecure', '-k',
  372. help='Disable certificate checking', action='store_true')
  373. # parsing the cookie here, so we can give an early feedback without
  374. # waiting for the job to crash on invalid arguments.
  375. archiveparser.add_argument('--cookie', '-b', type=cookie,
  376. help='Add a cookie', action='append', default=[])
  377. archiveparser.add_argument('url', help='Website URL', type=isValidUrl, metavar='URL')
  378. archiveparser.set_defaults (func=self.handleArchive,
  379. minPriv=NickMode.voice if self.needVoice else None)
  380. statusparser = subparsers.add_parser ('s', help='Get job status', add_help=False)
  381. statusparser.add_argument('id', help='Job id', metavar='UUID')
  382. statusparser.set_defaults (func=self.handleStatus, allowOnShutdown=True)
  383. abortparser = subparsers.add_parser ('r', help='Revoke/abort job', add_help=False)
  384. abortparser.add_argument('id', help='Job id', metavar='UUID')
  385. abortparser.set_defaults (func=self.handleAbort, allowOnShutdown=True,
  386. minPriv=NickMode.voice if self.needVoice else None)
  387. return parser
  388. def isBlacklisted (self, url):
  389. for k, v in self.blacklist.items():
  390. if k.match (url):
  391. return v
  392. return False
  393. async def handleArchive (self, user, args, reply):
  394. """ Handle the archive command """
  395. msg = self.isBlacklisted (args.url)
  396. if msg:
  397. reply (f'{args.url} cannot be queued: {msg}')
  398. return
  399. # make sure the job id is unique. Since ids are time-based we can just
  400. # wait.
  401. while True:
  402. j = Job (args.url, user.name)
  403. if j.id not in self.jobs:
  404. break
  405. time.sleep (0.01)
  406. self.jobs[j.id] = j
  407. logger = self.logger.bind (job=j.id)
  408. showargs = {
  409. 'recursive': args.recursive,
  410. 'concurrency': args.concurrency,
  411. }
  412. if args.insecure:
  413. showargs['insecure'] = args.insecure
  414. warcinfo = {'chromebot': {
  415. 'jobid': j.id,
  416. 'user': user.name,
  417. 'queued': j.started,
  418. 'url': args.url,
  419. 'recursive': args.recursive,
  420. 'concurrency': args.concurrency,
  421. }}
  422. grabCmd = ['crocoite-single']
  423. # prefix warcinfo with !, so it won’t get expanded
  424. grabCmd.extend (['--warcinfo',
  425. '!' + json.dumps (warcinfo, cls=StrJsonEncoder)])
  426. for v in args.cookie:
  427. grabCmd.extend (['--cookie', v.OutputString ()])
  428. if args.insecure:
  429. grabCmd.append ('--insecure')
  430. grabCmd.extend (['{url}', '{dest}'])
  431. cmdline = ['crocoite',
  432. '--tempdir', self.tempdir,
  433. '--recursion', args.recursive,
  434. '--concurrency', str (args.concurrency),
  435. args.url,
  436. os.path.join (self.destdir,
  437. j.id + '-{host}-{date}-{seqnum}.warc.gz'),
  438. '--'] + grabCmd
  439. strargs = ', '.join (map (lambda x: '{}={}'.format (*x), showargs.items ()))
  440. reply (f'{args.url} has been queued as {j.id} with {strargs}')
  441. logger.info ('queue', user=user.name, url=args.url, cmdline=cmdline,
  442. uuid='36cc34a6-061b-4cc5-84a9-4ab6552c8d75')
  443. async with self.processLimit:
  444. if j.status == Status.pending:
  445. # job was not aborted
  446. j.process = await asyncio.create_subprocess_exec (*cmdline,
  447. stdout=asyncio.subprocess.PIPE,
  448. stderr=asyncio.subprocess.DEVNULL,
  449. stdin=asyncio.subprocess.DEVNULL,
  450. start_new_session=True, limit=100*1024*1024)
  451. while True:
  452. data = await j.process.stdout.readline ()
  453. if not data:
  454. break
  455. # job is marked running after the first message is received from it
  456. if j.status == Status.pending:
  457. logger.info ('start', uuid='46e62d60-f498-4ab0-90e1-d08a073b10fb')
  458. j.status = Status.running
  459. data = json.loads (data)
  460. msgid = data.get ('uuid')
  461. if msgid == '24d92d16-770e-4088-b769-4020e127a7ff':
  462. j.stats = data
  463. elif msgid == '5b8498e4-868d-413c-a67e-004516b8452c':
  464. j.rstats = data
  465. # forward message, so the dashboard can use it
  466. logger.info ('message',
  467. uuid='5c0f9a11-dcd8-4182-a60f-54f4d3ab3687',
  468. data=data)
  469. code = await j.process.wait ()
  470. if j.status == Status.running:
  471. logger.info ('finish', uuid='7b40ffbb-faab-4224-90ed-cd4febd8f7ec')
  472. j.status = Status.finished
  473. j.finished = datetime.utcnow ()
  474. stats = j.stats
  475. rstats = j.rstats
  476. reply (j.formatStatus ())
  477. @jobExists
  478. async def handleStatus (self, user, args, reply, job):
  479. """ Handle status command """
  480. rstats = job.rstats
  481. reply (job.formatStatus ())
  482. @jobExists
  483. async def handleAbort (self, user, args, reply, job):
  484. """ Handle abort command """
  485. if job.status not in {Status.pending, Status.running}:
  486. reply ('This job is not running.')
  487. return
  488. job.status = Status.aborted
  489. self.logger.info ('abort', job=job.id, user=user.name,
  490. uuid='865b3b3e-a54a-4a56-a545-f38a37bac295')
  491. if job.process and job.process.returncode is None:
  492. job.process.terminate ()
  493. class Dashboard:
  494. __slots__ = ('fd', 'clients', 'loop', 'log', 'maxLog', 'pingInterval', 'pingTimeout')
  495. # these messages will not be forwarded to the browser
  496. ignoreMsgid = {
  497. # connect
  498. '01f7b138-ea53-4609-88e9-61f3eca3e7e7',
  499. # join
  500. '367063a5-9069-4025-907c-65ba88af8593',
  501. # disconnect
  502. '4c74b2c8-2403-4921-879d-2279ad85db72',
  503. # reconnect
  504. 'c53555cb-e1a4-4b69-b1c9-3320269c19d7',
  505. }
  506. def __init__ (self, fd, loop, maxLog=1000, pingInterval=30, pingTimeout=10):
  507. self.fd = fd
  508. self.clients = set ()
  509. self.loop = loop
  510. # log buffer
  511. self.log = []
  512. self.maxLog = maxLog
  513. self.pingInterval = pingInterval
  514. self.pingTimeout = pingTimeout
  515. async def client (self, websocket, path):
  516. self.clients.add (websocket)
  517. try:
  518. for l in self.log:
  519. buf = json.dumps (l)
  520. await websocket.send (buf)
  521. while True:
  522. try:
  523. msg = await asyncio.wait_for (websocket.recv(), timeout=self.pingInterval)
  524. except asyncio.TimeoutError:
  525. try:
  526. pong = await websocket.ping()
  527. await asyncio.wait_for (pong, timeout=self.pingTimeout)
  528. except asyncio.TimeoutError:
  529. break
  530. except websockets.exceptions.ConnectionClosed:
  531. break
  532. finally:
  533. self.clients.remove (websocket)
  534. def handleStdin (self):
  535. buf = self.fd.readline ()
  536. if not buf:
  537. return
  538. try:
  539. data = json.loads (buf)
  540. except json.decoder.JSONDecodeError:
  541. # ignore invalid
  542. return
  543. msgid = data['uuid']
  544. if msgid in self.ignoreMsgid:
  545. return
  546. # a few messages may contain sensitive information that we want to hide
  547. if msgid == '36cc34a6-061b-4cc5-84a9-4ab6552c8d75':
  548. # queue
  549. del data['cmdline']
  550. elif msgid == '5c0f9a11-dcd8-4182-a60f-54f4d3ab3687':
  551. nesteddata = data['data']
  552. nestedmsgid = nesteddata['uuid']
  553. if nestedmsgid == 'd1288fbe-8bae-42c8-af8c-f2fa8b41794f':
  554. del nesteddata['command']
  555. buf = json.dumps (data)
  556. for c in self.clients:
  557. # XXX can’t await here
  558. asyncio.ensure_future (c.send (buf))
  559. self.log.append (data)
  560. while len (self.log) > self.maxLog:
  561. self.log.pop (0)
  562. def run (self, host='localhost', port=6789):
  563. self.loop.add_reader (self.fd, self.handleStdin)
  564. return websockets.serve(self.client, host, port)