programstarter.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. #!/usr/bin/env python3
  2. # Contest Management System - http://cms-dev.github.io/
  3. # Copyright © 2012 Bernard Blackham <bernard@largestprime.net>
  4. # Copyright © 2013-2018 Stefano Maggiolo <s.maggiolo@gmail.com>
  5. # Copyright © 2013-2014 Luca Wehrstedt <luca.wehrstedt@gmail.com>
  6. # Copyright © 2014 Luca Versari <veluca93@gmail.com>
  7. # Copyright © 2014 William Di Luigi <williamdiluigi@gmail.com>
  8. #
  9. # This program is free software: you can redistribute it and/or modify
  10. # it under the terms of the GNU Affero General Public License as
  11. # published by the Free Software Foundation, either version 3 of the
  12. # License, or (at your option) any later version.
  13. #
  14. # This program is distributed in the hope that it will be useful,
  15. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  16. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  17. # GNU Affero General Public License for more details.
  18. #
  19. # You should have received a copy of the GNU Affero General Public License
  20. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  21. import atexit
  22. import json
  23. import logging
  24. import os
  25. import re
  26. import signal
  27. import socket
  28. import subprocess
  29. import threading
  30. import time
  31. from urllib.parse import urlsplit
  32. import psutil
  33. from cmstestsuite import CONFIG, TestException
  34. from cmstestsuite.coverage import coverage_cmdline
  35. from cmstestsuite.functionaltestframework import FunctionalTestFramework
  36. from cmstestsuite.profiling import profiling_cmdline
  37. logger = logging.getLogger(__name__)
  38. # Maximum number of attempts to check if a service becomes healthy.
  39. _MAX_ATTEMPTS = 20
  40. class RemoteService:
  41. """Class which implements the RPC protocol used by CMS.
  42. This is deliberately a re-implementation in order to catch or
  43. trigger bugs in the CMS services.
  44. """
  45. def __init__(self, cms_config, service_name, shard):
  46. address, port = cms_config["core_services"][service_name][shard]
  47. self.service_name = service_name
  48. self.shard = shard
  49. self.address = address
  50. self.port = port
  51. def call(self, function_name, data):
  52. """Perform a synchronous RPC call."""
  53. s = json.dumps({
  54. "__id": "foo",
  55. "__method": function_name,
  56. "__data": data,
  57. }).encode('utf-8')
  58. msg = s + b"\r\n"
  59. # Send message.
  60. sock = socket.socket()
  61. sock.connect((self.address, self.port))
  62. sock.send(msg)
  63. # Wait for response.
  64. s = b''
  65. while len(s) < 2 or s[-2:] != b"\r\n":
  66. s += sock.recv(1)
  67. s = s[:-2]
  68. sock.close()
  69. # Decode reply.
  70. reply = json.loads(s.decode('utf-8'))
  71. return reply
  72. class Program:
  73. """An instance of a program, which might be running or not."""
  74. def __init__(self, cms_config, service_name, shard=0, contest=None,
  75. cpu_limit=None):
  76. self.cms_config = cms_config
  77. self.service_name = service_name
  78. self.shard = shard
  79. self.contest = contest
  80. self.cpu_limit = cpu_limit
  81. self.instance = None
  82. self.healthy = False
  83. def start(self):
  84. """Start a CMS service."""
  85. logger.info("Starting %s.", self.service_name)
  86. executable = os.path.join(
  87. ".", "scripts", "cms%s" % (self.service_name))
  88. if CONFIG["TEST_DIR"] is None:
  89. executable = "cms%s" % self.service_name
  90. args = [executable]
  91. if self.shard is not None:
  92. args.append("%s" % self.shard)
  93. if self.contest is not None:
  94. args += ["-c", "%s" % self.contest]
  95. self.instance = self._spawn(args)
  96. # In case the test ends prematurely due to errors and stop() is not
  97. # called, this child process would continue running, so we register an
  98. # exit handler to kill it. atexit handlers are LIFO, so the first
  99. # handler tries to terminate gracefully, the second kills immediately.
  100. # This is useful in case the user hits Ctrl-C twice to avoid zombies.
  101. atexit.register(self.kill)
  102. atexit.register(self.wait_or_kill)
  103. t = threading.Thread(target=self._check_with_backoff)
  104. t.daemon = True
  105. t.start()
  106. @property
  107. def coord(self):
  108. return "%s/%s" % (self.service_name, self.shard)
  109. @property
  110. def running(self):
  111. """Return whether the program is live."""
  112. self._check()
  113. return self.healthy
  114. def log_cpu_times(self):
  115. """Log usr and sys CPU, and busy percentage over total running time."""
  116. try:
  117. p = psutil.Process(self.instance.pid)
  118. times = p.cpu_times()
  119. total_time_ratio = \
  120. (times.user + times.system) / (time.time() - p.create_time())
  121. logger.info(
  122. "Total CPU times for %s: %.2lf (user), %.2lf (sys) = %.2lf%%",
  123. self.coord, times.user, times.system, 100 * total_time_ratio)
  124. except psutil.NoSuchProcess:
  125. logger.info("Cannot compute CPU times for %s", self.coord)
  126. def stop(self):
  127. """Ask the program to quit via RPC.
  128. Callers should call wait_or_kill() after to make sure the program
  129. really terminates.
  130. """
  131. logger.info("Asking %s to terminate...", self.coord)
  132. if self.service_name != "RankingWebServer":
  133. # Try to terminate gracefully (RWS does not have a way to do it).
  134. rs = RemoteService(self.cms_config, self.service_name, self.shard)
  135. rs.call("quit", {"reason": "from test harness"})
  136. else:
  137. # For RWS, we use Ctrl-C.
  138. self.instance.send_signal(signal.SIGINT)
  139. def wait_or_kill(self):
  140. """Wait for the program to terminate, or kill it after 5s."""
  141. if self.instance.poll() is None:
  142. # We try one more time to kill gracefully using Ctrl-C.
  143. logger.info("Interrupting %s and waiting...", self.coord)
  144. self.instance.send_signal(signal.SIGINT)
  145. try:
  146. self.instance.wait(timeout=5)
  147. logger.info("Terminated %s.", self.coord)
  148. except subprocess.TimeoutExpired:
  149. self.kill()
  150. def kill(self):
  151. """Kill the program."""
  152. if self.instance.poll() is None:
  153. logger.info("Killing %s.", self.coord)
  154. self.instance.kill()
  155. def _check_with_backoff(self):
  156. """Check and wait that the service is healthy."""
  157. self.healthy = False
  158. for attempts in range(_MAX_ATTEMPTS):
  159. self._check()
  160. if not self.healthy:
  161. time.sleep(0.2 * (1.2 ** attempts))
  162. else:
  163. return
  164. # Service did not start.
  165. raise TestException("Failed to bring up service %s" % self.coord)
  166. def _check(self):
  167. """Check that the program is healthy and set the healthy bit.
  168. raise (TestException): when the state is weird, critical.
  169. """
  170. try:
  171. if self.service_name == "RankingWebServer":
  172. self._check_ranking_web_server()
  173. else:
  174. self._check_service()
  175. except ConnectionRefusedError:
  176. self.healthy = False
  177. except OSError:
  178. self.healthy = False
  179. raise TestException("Weird connection state.")
  180. else:
  181. self.healthy = True
  182. def _check_service(self):
  183. """Health checker for services and servers."""
  184. rs = RemoteService(self.cms_config, self.service_name, self.shard)
  185. reply = rs.call("echo", {"string": "hello"})
  186. if reply["__data"] != "hello":
  187. raise TestException("Strange response from service.")
  188. # In case it is a server, we also check HTTP is serving.
  189. if self.service_name == "AdminWebServer":
  190. port = self.cms_config["admin_listen_port"]
  191. elif self.service_name == "ContestWebServer":
  192. port = self.cms_config["contest_listen_port"][self.shard]
  193. else:
  194. return
  195. sock = socket.socket()
  196. sock.connect(("127.0.0.1", port))
  197. sock.close()
  198. def _check_ranking_web_server(self):
  199. """Health checker for RWS."""
  200. url = urlsplit(self.cms_config["rankings"][0])
  201. sock = socket.socket()
  202. sock.connect((url.hostname, url.port))
  203. sock.close()
  204. def _spawn(self, cmdline):
  205. """Execute a python application."""
  206. cmdline = coverage_cmdline(cmdline)
  207. cmdline = profiling_cmdline(
  208. cmdline, "%s-%d" % (self.service_name, self.shard or 0))
  209. if CONFIG["VERBOSITY"] >= 1:
  210. logger.info("$ %s", " ".join(cmdline))
  211. if CONFIG["VERBOSITY"] >= 3:
  212. stdout = None
  213. stderr = None
  214. else:
  215. stdout = subprocess.DEVNULL
  216. stderr = subprocess.STDOUT
  217. instance = subprocess.Popen(cmdline, stdout=stdout, stderr=stderr)
  218. if self.cpu_limit is not None:
  219. logger.info("Limiting %s to %d%% CPU time",
  220. self.coord, self.cpu_limit)
  221. # cputool terminates on its own when the main program terminates.
  222. subprocess.Popen(["cputool", "-c", str(self.cpu_limit),
  223. "-p", str(instance.pid)])
  224. return instance
  225. class ProgramStarter:
  226. """Utility to keep track of all programs started."""
  227. def __init__(self, cpu_limits=None):
  228. self.cpu_limits = cpu_limits if cpu_limits is not None else []
  229. self.framework = FunctionalTestFramework()
  230. self.cms_config = self.framework.get_cms_config()
  231. # Map of arguments to Program instances.
  232. self._programs = {}
  233. # Map Program: check_function
  234. self._check_to_perform = {}
  235. def _cpu_limit_for_service(self, service_name):
  236. limit = None
  237. for regex, l in self.cpu_limits:
  238. if re.match(regex, service_name):
  239. if limit is None:
  240. limit = l
  241. limit = min(limit, l)
  242. return limit
  243. def start(self, service_name, shard=0, contest=None):
  244. """Start a CMS service."""
  245. cpu_limit = self._cpu_limit_for_service(service_name)
  246. p = Program(self.cms_config, service_name, shard, contest,
  247. cpu_limit=cpu_limit)
  248. p.start()
  249. self._programs[(service_name, shard, contest)] = p
  250. def count_unhealthy(self):
  251. return len([p for p in self._programs.values() if not p.healthy])
  252. def wait(self):
  253. for attempts in range(_MAX_ATTEMPTS):
  254. unhealthy = self.count_unhealthy()
  255. if unhealthy == 0:
  256. logger.info("All healthy! Continuing.")
  257. return
  258. logger.info("Still %s unhealthy.", unhealthy)
  259. time.sleep(0.2 * (1.2 ** attempts))
  260. raise TestException(
  261. "Failed to bring up services: %s" % ", ".join(
  262. p.coord for p in self._programs.values() if not p.healthy))
  263. def stop_all(self):
  264. for p in self._programs.values():
  265. p.log_cpu_times()
  266. for p in self._programs.values():
  267. p.stop()
  268. for p in self._programs.values():
  269. p.wait_or_kill()