control.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. import os
  2. import logging
  3. import time
  4. from queue import Empty as QueueEmpty
  5. from multiprocessing import Process, Queue
  6. from threading import Thread
  7. from itertools import count
  8. from .message import ShowroomMessage
  9. from .settings import ShowroomSettings
  10. from .index import ShowroomIndex
  11. from .core import WatchManager
  12. from .exceptions import ShowroomStopRequest
  13. control_logger = logging.getLogger("showroom.control")
  14. class BaseShowroomLiveController(object):
  15. def __init__(self, index: ShowroomIndex=None, settings: ShowroomSettings=None, record_all=False):
  16. # TODO: proper docstring
  17. super(BaseShowroomLiveController, self).__init__()
  18. self.command_queue = Queue()
  19. self.message_queue = Queue()
  20. if not settings:
  21. self.settings = ShowroomSettings.from_file()
  22. else:
  23. self.settings = settings
  24. # TODO: where is the best place for this? I put it here because
  25. # controller could potentially change output dir and so needs to be
  26. # the one to handle recreating the folders.
  27. # it could also be done in ShowroomSettings if I give that object more power
  28. # os.makedirs(self.settings.directory.temp, exist_ok=True)
  29. if not index:
  30. self.index = ShowroomIndex(self.settings.directory.index, record_all=record_all)
  31. else:
  32. self.index = index
  33. self._instance = None
  34. self.manager = WatchManager(self.index, self.settings)
  35. self.counter = count()
  36. # aliases
  37. self.send = self.send_command
  38. self.get = self.get_messages
  39. # TODO: make maintenance more intelligent
  40. # instance (thread or process) wrapping methods
  41. def start(self):
  42. # TODO: are there any other conditions required to restart the loop?
  43. raise NotImplementedError
  44. def is_alive(self):
  45. return self._instance.is_alive()
  46. def join(self, timeout=0):
  47. return self._instance.join(timeout=timeout)
  48. def run(self):
  49. control_logger.debug("Running ShowroomLiveController")
  50. # start index update tasks (runs in separate thread)
  51. self.index.start()
  52. while True:
  53. # TODO: check if time for maintenance, if so do maintenance then schedule next
  54. # if self.resume_time > self.time.time() > self.end_time:
  55. # sleep_seconds = (datetime.datetime.combine(self.time, self.resume_time)
  56. # - self.time).total_seconds() + 1.0
  57. # print('Time is {}, sleeping for {} seconds, until {}'.format(strftime(self.time, '%H:%M'),
  58. # sleep_seconds,
  59. # strftime(self.resume_time, '%H:%M')))
  60. # self.scheduler.reset_ticks()
  61. # time.sleep(sleep_seconds)
  62. self.manager.tick()
  63. while not self.command_queue.empty():
  64. control_logger.debug('Reading command queue')
  65. try:
  66. ident, command, args, kwargs = self.command_queue.get(block=False)
  67. except QueueEmpty:
  68. break
  69. else:
  70. # TODO: check that command is valid and allowed
  71. if command[0] == '_':
  72. control_logger.warn('Forbidden command: {}'.format(command))
  73. continue
  74. # TODO: lookup command in a dictionary instead of this
  75. msg = ShowroomMessage(ident, command)
  76. cmd, *args2 = command.replace('/', '_').split('_')
  77. try:
  78. msg = getattr(self, '_' + cmd)(*(list(args) + args2), msg=msg, **kwargs)
  79. except ShowroomStopRequest:
  80. self.index.stop()
  81. return
  82. except AttributeError as e:
  83. # invalid command
  84. control_logger.debug('Unknown command: {}, {}'.format(command, e))
  85. except TypeError as e:
  86. # trying to call something besides a method
  87. control_logger.debug('{} is not a command -- {}'.format(command, e))
  88. else:
  89. if msg is not None:
  90. self.message_queue.put(msg)
  91. time.sleep(0.2)
  92. def send_command(self, command, *args, **kwargs):
  93. ident = next(self.counter)
  94. self.command_queue.put((ident, command, args, kwargs))
  95. return ident
  96. def get_messages(self):
  97. messages = []
  98. while not self.message_queue.empty():
  99. try:
  100. msg = self.message_queue.get(block=False)
  101. except QueueEmpty:
  102. break
  103. else:
  104. if msg:
  105. messages.append(msg)
  106. return messages
  107. def stop(self):
  108. self.command_queue.put((next(self.counter), "stop", [], {}))
  109. def _stop(self, *args, msg=None, **kwargs):
  110. # TODO: log stopping
  111. self.manager.stop()
  112. self.manager.write_completed()
  113. raise ShowroomStopRequest
  114. # commands
  115. # all commands either return None or a message: either a dict or a showroom Message
  116. # index commands
  117. # do these need to be made thread-safe? they mutate rooms... but nothing else should.
  118. def _index(self, *args, msg=None, **kwargs):
  119. if not args or args[0] == 'list':
  120. if msg:
  121. # return index list in message
  122. pass
  123. elif args[0] == 'filter':
  124. self._index_filter(*args[1:], **kwargs)
  125. elif args[0] == 'update':
  126. self._index_update(*args[1:], **kwargs)
  127. def _index_filter(self, *args, msg=None, **kwargs):
  128. b_updated = False
  129. if not args or args[0] == 'list':
  130. if msg:
  131. """Returns a dict of all wanted and unwanted rooms, by name.
  132. {"index_filters": {"wanted": [...], "unwanted": [...]} }
  133. """
  134. return msg.set_content(self.index.filter_get_list())
  135. else:
  136. # raise message needed exception
  137. pass
  138. if "all" in args:
  139. """Turns downloading on for all rooms."""
  140. self.index.filter_all()
  141. # TODO: check if this actually changed anything
  142. # have index.filter return number of modified rooms?
  143. b_updated = True
  144. elif "none" in args:
  145. """Turns downloading off for all rooms."""
  146. self.index.filter_none()
  147. b_updated = True
  148. if "add" in kwargs:
  149. """Sets downloading on for specific rooms."""
  150. self.index.filter_add(kwargs["add"])
  151. b_updated = True
  152. if "remove" in kwargs:
  153. """Sets downloading off for specific rooms."""
  154. self.index.filter_remove(kwargs["remove"])
  155. b_updated = True
  156. if b_updated:
  157. self.manager.update_flag.set()
  158. def _index_update(self, *args, **kwargs):
  159. """Updates the index from either the local filesystem or a web source"""
  160. if 'web' in args:
  161. if 'src_url' in kwargs:
  162. self.index.update_from_web(kwargs['src_url'])
  163. else:
  164. self.index.update_from_web()
  165. self.index.update()
  166. def _index_update_from_web(self, src_url=None):
  167. """Updates the index from a remote source.
  168. Source must be a json like
  169. https://wlerin.github.io/showroom-index/list.json
  170. that points to a set of jdex files to update from.
  171. """
  172. self.index.update_from_web(src_url)
  173. # TODO: Messages require a unique identifier given them by the caller
  174. # room list commands
  175. def _get_rooms_by_mode(self, mode):
  176. rooms = []
  177. for watch in self.manager.watchers.get_by_mode(mode):
  178. rooms.append(watch.get_info())
  179. return sorted(rooms, key=lambda x: (x['start_time'], x['name']))
  180. # TODO: get these working again
  181. # "endpoints"
  182. # take arbitrary args and kwargs and parse through them for meaningful instructions
  183. def _schedule(self, *args, msg=None, **kwargs):
  184. if msg is not None:
  185. if args:
  186. pass
  187. if kwargs:
  188. # TODO: take other options
  189. pass
  190. msg.set_content(self._get_rooms_by_mode("working"))
  191. return msg
  192. def _lives(self, *args, msg=None, **kwargs):
  193. if msg is not None:
  194. msg.set_content(self._get_rooms_by_mode("live"))
  195. return msg
  196. def _downloads(self, *args, msg=None, **kwargs):
  197. if msg is not None:
  198. # the caller can then filter these to get links if desired...?
  199. msg.set_content(self._get_rooms_by_mode("download"))
  200. return msg
  201. class ShowroomLiveControllerThread(BaseShowroomLiveController):
  202. def start(self):
  203. # TODO: are there any other conditions required to restart the loop?
  204. if not self._instance or not self._instance.is_alive():
  205. self._instance = Thread(target=self.run, name="ShowroomLiveController")
  206. self._instance.start()
  207. class ShowroomLiveControllerProcess(BaseShowroomLiveController):
  208. def start(self):
  209. # TODO: are there any other conditions required to restart the loop?
  210. if not self._instance or not self._instance.is_alive():
  211. self._instance = Process(target=self.run, name="ShowroomLiveController")
  212. self._instance.start()
  213. Controller = ShowroomLiveControllerThread