controller.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541
  1. # Copyright (c) 2017–2018 crocoite contributors
  2. #
  3. # Permission is hereby granted, free of charge, to any person obtaining a copy
  4. # of this software and associated documentation files (the "Software"), to deal
  5. # in the Software without restriction, including without limitation the rights
  6. # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  7. # copies of the Software, and to permit persons to whom the Software is
  8. # furnished to do so, subject to the following conditions:
  9. #
  10. # The above copyright notice and this permission notice shall be included in
  11. # all copies or substantial portions of the Software.
  12. #
  13. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  14. # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  15. # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  16. # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  17. # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  18. # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  19. # THE SOFTWARE.
  20. """
  21. Controller classes, handling actions required for archival
  22. """
  23. import time, tempfile, asyncio, json, os, shutil, signal
  24. from itertools import islice
  25. from datetime import datetime
  26. from operator import attrgetter
  27. from abc import ABC, abstractmethod
  28. from yarl import URL
  29. from . import behavior as cbehavior
  30. from .browser import SiteLoader, RequestResponsePair, PageIdle, FrameNavigated
  31. from .util import getFormattedViewportMetrics, getSoftwareInfo
  32. from .behavior import ExtractLinksEvent
  33. from .devtools import toCookieParam
  34. class ControllerSettings:
  35. __slots__ = ('idleTimeout', 'timeout', 'insecure', 'cookies')
  36. def __init__ (self, idleTimeout=2, timeout=10, insecure=False, cookies=None):
  37. self.idleTimeout = idleTimeout
  38. self.timeout = timeout
  39. self.insecure = insecure
  40. self.cookies = cookies or []
  41. def __repr__ (self):
  42. return f'<ControllerSetting idleTimeout={self.idleTimeout!r}, timeout={self.timeout!r}, insecure={self.insecure!r}, cookies={self.cookies!r}>'
  43. defaultSettings = ControllerSettings ()
  44. class EventHandler (ABC):
  45. """ Abstract base class for event handler """
  46. __slots__ = ()
  47. @abstractmethod
  48. async def push (self, item):
  49. raise NotImplementedError ()
  50. class StatsHandler (EventHandler):
  51. __slots__ = ('stats', )
  52. def __init__ (self):
  53. self.stats = {'requests': 0, 'finished': 0, 'failed': 0, 'bytesRcv': 0}
  54. async def push (self, item):
  55. if isinstance (item, RequestResponsePair):
  56. self.stats['requests'] += 1
  57. if not item.response:
  58. self.stats['failed'] += 1
  59. else:
  60. self.stats['finished'] += 1
  61. self.stats['bytesRcv'] += item.response.bytesReceived
  62. class LogHandler (EventHandler):
  63. """ Handle items by logging information about them """
  64. __slots__ = ('logger', )
  65. def __init__ (self, logger):
  66. self.logger = logger.bind (context=type (self).__name__)
  67. async def push (self, item):
  68. if isinstance (item, ExtractLinksEvent):
  69. # limit number of links per message, so json blob won’t get too big
  70. it = iter (item.links)
  71. limit = 100
  72. while True:
  73. limitlinks = list (islice (it, 0, limit))
  74. if not limitlinks:
  75. break
  76. self.logger.info ('extracted links', context=type (item).__name__,
  77. uuid='8ee5e9c9-1130-4c5c-88ff-718508546e0c', links=limitlinks)
  78. class ControllerStart:
  79. __slots__ = ('payload', )
  80. def __init__ (self, payload):
  81. self.payload = payload
  82. class IdleStateTracker (EventHandler):
  83. """ Track SiteLoader’s idle state by listening to PageIdle events """
  84. __slots__ = ('_idle', '_loop', '_idleSince')
  85. def __init__ (self, loop):
  86. self._idle = True
  87. self._loop = loop
  88. self._idleSince = self._loop.time ()
  89. async def push (self, item):
  90. if isinstance (item, PageIdle):
  91. self._idle = bool (item)
  92. if self._idle:
  93. self._idleSince = self._loop.time ()
  94. async def wait (self, timeout):
  95. """ Wait until page has been idle for at least timeout seconds. If the
  96. page has been idle before calling this function it may return
  97. immediately. """
  98. assert timeout > 0
  99. while True:
  100. if self._idle:
  101. now = self._loop.time ()
  102. sleep = timeout-(now-self._idleSince)
  103. if sleep <= 0:
  104. break
  105. else:
  106. # not idle, check again after timeout expires
  107. sleep = timeout
  108. await asyncio.sleep (sleep)
  109. class InjectBehaviorOnload (EventHandler):
  110. """ Control behavior script injection based on frame navigation messages.
  111. When a page is reloaded (for whatever reason), the scripts need to be
  112. reinjected. """
  113. __slots__ = ('controller', '_loaded')
  114. def __init__ (self, controller):
  115. self.controller = controller
  116. self._loaded = False
  117. async def push (self, item):
  118. if isinstance (item, FrameNavigated):
  119. await self._runon ('load')
  120. self._loaded = True
  121. async def stop (self):
  122. if self._loaded:
  123. await self._runon ('stop')
  124. async def finish (self):
  125. if self._loaded:
  126. await self._runon ('finish')
  127. async def _runon (self, method):
  128. controller = self.controller
  129. for b in controller._enabledBehavior:
  130. f = getattr (b, 'on' + method)
  131. async for item in f ():
  132. await controller.processItem (item)
  133. class SinglePageController:
  134. """
  135. Archive a single page url.
  136. Dispatches between producer (site loader and behavior scripts) and consumer
  137. (stats, warc writer).
  138. """
  139. __slots__ = ('url', 'service', 'behavior', 'settings', 'logger', 'handler',
  140. 'warcinfo', '_enabledBehavior')
  141. def __init__ (self, url, logger, \
  142. service, behavior=cbehavior.available, \
  143. settings=defaultSettings, handler=None, \
  144. warcinfo=None):
  145. self.url = url
  146. self.service = service
  147. self.behavior = behavior
  148. self.settings = settings
  149. self.logger = logger.bind (context=type (self).__name__, url=url)
  150. self.handler = handler or []
  151. self.warcinfo = warcinfo
  152. async def processItem (self, item):
  153. for h in self.handler:
  154. await h.push (item)
  155. async def run (self):
  156. logger = self.logger
  157. async def processQueue ():
  158. async for item in l:
  159. await self.processItem (item)
  160. idle = IdleStateTracker (asyncio.get_event_loop ())
  161. self.handler.append (idle)
  162. behavior = InjectBehaviorOnload (self)
  163. self.handler.append (behavior)
  164. async with self.service as browser, SiteLoader (browser, logger=logger) as l:
  165. handle = asyncio.ensure_future (processQueue ())
  166. timeoutProc = asyncio.ensure_future (asyncio.sleep (self.settings.timeout))
  167. # configure browser
  168. tab = l.tab
  169. await tab.Security.setIgnoreCertificateErrors (ignore=self.settings.insecure)
  170. await tab.Network.setCookies (cookies=list (map (toCookieParam, self.settings.cookies)))
  171. # not all behavior scripts are allowed for every URL, filter them
  172. self._enabledBehavior = list (filter (lambda x: self.url in x,
  173. map (lambda x: x (l, logger), self.behavior)))
  174. version = await tab.Browser.getVersion ()
  175. payload = {
  176. 'software': getSoftwareInfo (),
  177. 'browser': {
  178. 'product': version['product'],
  179. 'useragent': version['userAgent'],
  180. 'viewport': await getFormattedViewportMetrics (tab),
  181. },
  182. 'tool': 'crocoite-single', # not the name of the cli utility
  183. 'parameters': {
  184. 'url': self.url,
  185. 'idleTimeout': self.settings.idleTimeout,
  186. 'timeout': self.settings.timeout,
  187. 'behavior': list (map (attrgetter('name'), self._enabledBehavior)),
  188. 'insecure': self.settings.insecure,
  189. 'cookies': list (map (lambda x: x.OutputString(), self.settings.cookies)),
  190. },
  191. }
  192. if self.warcinfo:
  193. payload['extra'] = self.warcinfo
  194. await self.processItem (ControllerStart (payload))
  195. await l.navigate (self.url)
  196. idleProc = asyncio.ensure_future (idle.wait (self.settings.idleTimeout))
  197. while True:
  198. try:
  199. finished, pending = await asyncio.wait([idleProc, timeoutProc, handle],
  200. return_when=asyncio.FIRST_COMPLETED)
  201. except asyncio.CancelledError:
  202. idleProc.cancel ()
  203. timeoutProc.cancel ()
  204. break
  205. if handle in finished:
  206. # something went wrong while processing the data
  207. logger.error ('fetch failed',
  208. uuid='43a0686a-a3a9-4214-9acd-43f6976f8ff3')
  209. idleProc.cancel ()
  210. timeoutProc.cancel ()
  211. handle.result ()
  212. assert False # previous line should always raise Exception
  213. elif timeoutProc in finished:
  214. # global timeout
  215. logger.debug ('global timeout',
  216. uuid='2f858adc-9448-4ace-94b4-7cd1484c0728')
  217. idleProc.cancel ()
  218. timeoutProc.result ()
  219. break
  220. elif idleProc in finished:
  221. # idle timeout
  222. logger.debug ('idle timeout',
  223. uuid='90702590-94c4-44ef-9b37-02a16de444c3')
  224. idleProc.result ()
  225. timeoutProc.cancel ()
  226. break
  227. await behavior.stop ()
  228. await tab.Page.stopLoading ()
  229. await asyncio.sleep (1)
  230. await behavior.finish ()
  231. # wait until loads from behavior scripts are done and browser is
  232. # idle for at least 1 second
  233. try:
  234. await asyncio.wait_for (idle.wait (1), timeout=1)
  235. except (asyncio.TimeoutError, asyncio.CancelledError):
  236. pass
  237. if handle.done ():
  238. handle.result ()
  239. else:
  240. handle.cancel ()
  241. class SetEntry:
  242. """ A object, to be used with sets, that compares equality only on its
  243. primary property. """
  244. def __init__ (self, value, **props):
  245. self.value = value
  246. for k, v in props.items ():
  247. setattr (self, k, v)
  248. def __eq__ (self, b):
  249. assert isinstance (b, SetEntry)
  250. return self.value == b.value
  251. def __hash__ (self):
  252. return hash (self.value)
  253. def __repr__ (self):
  254. return f'<SetEntry {self.value!r}>'
  255. class RecursionPolicy:
  256. """ Abstract recursion policy """
  257. __slots__ = ()
  258. def __call__ (self, urls):
  259. raise NotImplementedError
  260. class DepthLimit (RecursionPolicy):
  261. """
  262. Limit recursion by depth.
  263. depth==0 means no recursion, depth==1 is the page and outgoing links
  264. """
  265. __slots__ = ('maxdepth', )
  266. def __init__ (self, maxdepth=0):
  267. self.maxdepth = maxdepth
  268. def __call__ (self, urls):
  269. newurls = set ()
  270. for u in urls:
  271. if u.depth <= self.maxdepth:
  272. newurls.add (u)
  273. return newurls
  274. def __repr__ (self):
  275. return f'<DepthLimit {self.maxdepth}>'
  276. class PrefixLimit (RecursionPolicy):
  277. """
  278. Limit recursion by prefix
  279. i.e. prefix=http://example.com/foo
  280. ignored: http://example.com/bar http://offsite.example/foo
  281. accepted: http://example.com/foobar http://example.com/foo/bar
  282. """
  283. __slots__ = ('prefix', )
  284. def __init__ (self, prefix):
  285. self.prefix = prefix
  286. def __call__ (self, urls):
  287. return set (filter (lambda u: str(u.value).startswith (str (self.prefix)), urls))
  288. def hasTemplate (s):
  289. """ Return True if string s has string templates """
  290. return '{' in s and '}' in s
  291. class RecursiveController:
  292. """
  293. Simple recursive controller
  294. Visits links acording to policy
  295. """
  296. __slots__ = ('url', 'output', 'command', 'logger', 'policy', 'have',
  297. 'pending', 'stats', 'tempdir', 'running', 'concurrency',
  298. 'copyLock')
  299. SCHEME_WHITELIST = {'http', 'https'}
  300. def __init__ (self, url, output, command, logger,
  301. tempdir=None, policy=DepthLimit (0), concurrency=1):
  302. self.url = url
  303. self.output = output
  304. self.command = command
  305. self.logger = logger.bind (context=type(self).__name__, seedurl=url)
  306. self.policy = policy
  307. self.tempdir = tempdir
  308. # A lock if only a single output file (no template) is requested
  309. self.copyLock = None if hasTemplate (output) else asyncio.Lock ()
  310. # some sanity checks. XXX move to argparse?
  311. if self.copyLock and os.path.exists (self.output):
  312. raise ValueError ('Output file exists')
  313. # tasks currently running
  314. self.running = set ()
  315. # max number of tasks running
  316. self.concurrency = concurrency
  317. # keep in sync with StatsHandler
  318. self.stats = {'requests': 0, 'finished': 0, 'failed': 0, 'bytesRcv': 0, 'crashed': 0, 'ignored': 0}
  319. async def fetch (self, entry, seqnum):
  320. """
  321. Fetch a single URL using an external command
  322. command is usually crocoite-single
  323. """
  324. assert isinstance (entry, SetEntry)
  325. url = entry.value
  326. depth = entry.depth
  327. logger = self.logger.bind (url=url)
  328. def formatCommand (e):
  329. # provide means to disable variable expansion
  330. if e.startswith ('!'):
  331. return e[1:]
  332. else:
  333. return e.format (url=url, dest=dest.name)
  334. def formatOutput (p):
  335. return p.format (host=url.host,
  336. date=datetime.utcnow ().isoformat (), seqnum=seqnum)
  337. def logStats ():
  338. logger.info ('stats', uuid='24d92d16-770e-4088-b769-4020e127a7ff', **self.stats)
  339. if url.scheme not in self.SCHEME_WHITELIST:
  340. self.stats['ignored'] += 1
  341. logStats ()
  342. self.logger.warning ('scheme not whitelisted', url=url,
  343. uuid='57e838de-4494-4316-ae98-cd3a2ebf541b')
  344. return
  345. dest = tempfile.NamedTemporaryFile (dir=self.tempdir,
  346. prefix=os.path.basename (self.output) + '-', suffix='.warc.gz',
  347. delete=False)
  348. command = list (map (formatCommand, self.command))
  349. logger.info ('fetch', uuid='d1288fbe-8bae-42c8-af8c-f2fa8b41794f',
  350. command=command)
  351. try:
  352. process = await asyncio.create_subprocess_exec (*command,
  353. stdout=asyncio.subprocess.PIPE,
  354. stderr=asyncio.subprocess.DEVNULL,
  355. stdin=asyncio.subprocess.DEVNULL,
  356. start_new_session=True, limit=100*1024*1024)
  357. while True:
  358. data = await process.stdout.readline ()
  359. if not data:
  360. break
  361. data = json.loads (data)
  362. uuid = data.get ('uuid')
  363. if uuid == '8ee5e9c9-1130-4c5c-88ff-718508546e0c':
  364. links = set (self.policy (map (lambda x: SetEntry (URL(x).with_fragment(None), depth=depth+1), data.get ('links', []))))
  365. links.difference_update (self.have)
  366. self.pending.update (links)
  367. elif uuid == '24d92d16-770e-4088-b769-4020e127a7ff':
  368. for k in self.stats.keys ():
  369. self.stats[k] += data.get (k, 0)
  370. logStats ()
  371. except asyncio.CancelledError:
  372. # graceful cancellation
  373. process.send_signal (signal.SIGINT)
  374. except Exception as e:
  375. process.kill ()
  376. raise e
  377. finally:
  378. code = await process.wait()
  379. if code == 0:
  380. if self.copyLock is None:
  381. # atomically move once finished
  382. lastDestpath = None
  383. while True:
  384. # XXX: must generate a new name every time, otherwise
  385. # this loop never terminates
  386. destpath = formatOutput (self.output)
  387. assert destpath != lastDestpath
  388. lastDestpath = destpath
  389. # python does not have rename(…, …, RENAME_NOREPLACE),
  390. # but this is safe nontheless, since we’re
  391. # single-threaded
  392. if not os.path.exists (destpath):
  393. # create the directory, so templates like
  394. # /{host}/{date}/… are possible
  395. os.makedirs (os.path.dirname (destpath), exist_ok=True)
  396. os.rename (dest.name, destpath)
  397. break
  398. else:
  399. # atomically (in the context of this process) append to
  400. # existing file
  401. async with self.copyLock:
  402. with open (dest.name, 'rb') as infd, \
  403. open (self.output, 'ab') as outfd:
  404. shutil.copyfileobj (infd, outfd)
  405. os.unlink (dest.name)
  406. else:
  407. self.stats['crashed'] += 1
  408. logStats ()
  409. async def run (self):
  410. def log ():
  411. # self.have includes running jobs
  412. self.logger.info ('recursing',
  413. uuid='5b8498e4-868d-413c-a67e-004516b8452c',
  414. pending=len (self.pending),
  415. have=len (self.have)-len(self.running),
  416. running=len (self.running))
  417. seqnum = 1
  418. try:
  419. self.have = set ()
  420. self.pending = set ([SetEntry (self.url, depth=0)])
  421. while self.pending:
  422. # since pending is a set this picks a random item, which is fine
  423. u = self.pending.pop ()
  424. self.have.add (u)
  425. t = asyncio.ensure_future (self.fetch (u, seqnum))
  426. self.running.add (t)
  427. seqnum += 1
  428. log ()
  429. if len (self.running) >= self.concurrency or not self.pending:
  430. done, pending = await asyncio.wait (self.running,
  431. return_when=asyncio.FIRST_COMPLETED)
  432. self.running.difference_update (done)
  433. # propagate exceptions
  434. for r in done:
  435. r.result ()
  436. except asyncio.CancelledError:
  437. self.logger.info ('cancel',
  438. uuid='d58154c8-ec27-40f2-ab9e-e25c1b21cd88',
  439. pending=len (self.pending),
  440. have=len (self.have)-len (self.running),
  441. running=len (self.running))
  442. finally:
  443. done = await asyncio.gather (*self.running,
  444. return_exceptions=True)
  445. # propagate exceptions
  446. for r in done:
  447. if isinstance (r, Exception):
  448. raise r
  449. self.running = set ()
  450. log ()