Test.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. #!/usr/bin/env python3
  2. # Contest Management System - http://cms-dev.github.io/
  3. # Copyright © 2012 Bernard Blackham <bernard@largestprime.net>
  4. # Copyright © 2014-2017 Stefano Maggiolo <s.maggiolo@gmail.com>
  5. # Copyright © 2020-2021 Andrey Vihrov <andrey.vihrov@gmail.com>
  6. #
  7. # This program is free software: you can redistribute it and/or modify
  8. # it under the terms of the GNU Affero General Public License as
  9. # published by the Free Software Foundation, either version 3 of the
  10. # License, or (at your option) any later version.
  11. #
  12. # This program is distributed in the hope that it will be useful,
  13. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. # GNU Affero General Public License for more details.
  16. #
  17. # You should have received a copy of the GNU Affero General Public License
  18. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  19. import os
  20. import re
  21. from abc import ABC, abstractmethod
  22. from cms.grading.languagemanager import get_language
  23. from cmstestsuite.functionaltestframework import FunctionalTestFramework
  24. class TestFailure(Exception):
  25. pass
  26. class Check(ABC):
  27. @abstractmethod
  28. def check(self, *args, **kwargs):
  29. pass
  30. class CheckOverallScore(Check):
  31. # This check searches for a string such :
  32. # Scored (100.0 / 100.0)
  33. # in status and checks the score.
  34. score_re = re.compile(r'^Scored \(([0-9.]+) / ([0-9/.]+)\)')
  35. def __init__(self, expected_score, expected_total):
  36. self.expected_score = expected_score
  37. self.expected_total = expected_total
  38. def check(self, result_info):
  39. g = CheckOverallScore.score_re.match(result_info['status'])
  40. if not g:
  41. raise TestFailure(
  42. "Expected total score, got status: %s\n"
  43. "Compilation output:\n%s" %
  44. (result_info['status'], result_info['compile_output']))
  45. score, total = g.groups()
  46. try:
  47. score = float(score)
  48. total = float(total)
  49. except ValueError:
  50. raise TestFailure("Expected readable score, got: %s/%s" %
  51. (score, total))
  52. if score != self.expected_score or \
  53. total != self.expected_total:
  54. raise TestFailure("Expected score of %g/%g, but got %g/%g" %
  55. (self.expected_score, self.expected_total,
  56. score, total))
  57. class CheckCompilationFail(Check):
  58. def check(self, result_info):
  59. if 'Compilation failed' not in result_info['status']:
  60. raise TestFailure("Expected compilation to fail, got: %s" %
  61. result_info['status'])
  62. class CheckAbstractEvaluationFailure(Check):
  63. def __init__(self, short_adjective, failure_string):
  64. self.short_adjective = short_adjective
  65. self.failure_string = failure_string
  66. def check(self, result_info):
  67. if 'Scored' not in result_info['status']:
  68. raise TestFailure("Expected a successful evaluation, got: %s" %
  69. result_info['status'])
  70. if not result_info['evaluations']:
  71. raise TestFailure("No evaluations found.")
  72. for evaluation in result_info['evaluations']:
  73. score = float(evaluation['outcome'])
  74. text = evaluation['text']
  75. if score != 0.0:
  76. raise TestFailure("Should have %s. Scored %g." %
  77. (self.short_adjective, score))
  78. if self.failure_string not in text:
  79. raise TestFailure("Should have %s, got %s" %
  80. (self.short_adjective, text))
  81. class CheckTimeout(CheckAbstractEvaluationFailure):
  82. def __init__(self):
  83. CheckAbstractEvaluationFailure.__init__(
  84. self, "timed out", "Execution timed out")
  85. class CheckTimeoutWall(CheckAbstractEvaluationFailure):
  86. def __init__(self):
  87. CheckAbstractEvaluationFailure.__init__(
  88. self, "wall timed out",
  89. "Execution timed out (wall clock limit exceeded)")
  90. class CheckForbiddenSyscall(CheckAbstractEvaluationFailure):
  91. def __init__(self, syscall_name=''):
  92. CheckAbstractEvaluationFailure.__init__(
  93. self, "executed a forbidden syscall",
  94. "Execution killed because of forbidden syscall %s" % syscall_name)
  95. class CheckSignal(CheckAbstractEvaluationFailure):
  96. def __init__(self, signal_number):
  97. CheckAbstractEvaluationFailure.__init__(
  98. self, "died on a signal",
  99. "Execution killed with signal %s" % signal_number)
  100. class CheckNonzeroReturn(CheckAbstractEvaluationFailure):
  101. def __init__(self):
  102. CheckAbstractEvaluationFailure.__init__(
  103. self, "nonzero return",
  104. "Execution failed because the return code was nonzero")
  105. class CheckUserTest(ABC):
  106. @abstractmethod
  107. def check(self, *args, **kwargs):
  108. pass
  109. class CheckUserTestEvaluated(CheckUserTest):
  110. def check(self, result_info):
  111. if "Evaluated" not in result_info["status"]:
  112. raise TestFailure("Expected a successful evaluation, got: %s" %
  113. result_info["status"])
  114. class Test:
  115. def __init__(self, name, *, task, filenames, alt_filenames={}, languages,
  116. checks, user_tests=False, user_managers=[], user_checks=[]):
  117. self.framework = FunctionalTestFramework()
  118. self.name = name
  119. self.task_module = task
  120. self.filenames = filenames
  121. self.alt_filenames = alt_filenames
  122. self.languages = languages
  123. self.checks = checks
  124. submission_format = list(
  125. e.strip() for e in task.task_info["submission_format"].split(","))
  126. self.submission_format = submission_format
  127. self.user_tests = user_tests
  128. self.user_checks = user_checks
  129. # Some tasks may require additional user test managers.
  130. self.user_managers = user_managers
  131. user_manager_format = list(e.strip()
  132. for e in task.task_info.get("user_manager_format", "").split(","))
  133. self.user_manager_format = user_manager_format
  134. self.submission_id = {}
  135. self.user_test_id = {}
  136. def _filenames_for_language(self, language, filenames, alt_filenames={}):
  137. if language is not None:
  138. # First check if language is in alt_filenames. This allows to
  139. # submit different sources for languages that would otherwise
  140. # have matching source extensions.
  141. filenames = alt_filenames.get(language, filenames)
  142. ext = get_language(language).source_extension
  143. return [filename.replace(".%l", ext) for filename in filenames]
  144. else:
  145. return filenames
  146. def _sources_names(self, language):
  147. # Source files are stored under cmstestsuite/code/.
  148. path = os.path.join(os.path.dirname(__file__), 'code')
  149. filenames = self._filenames_for_language(language, self.filenames,
  150. self.alt_filenames)
  151. full_paths = [os.path.join(path, filename) for filename in filenames]
  152. return full_paths
  153. def _user_managers_names(self, language):
  154. # Currently we select the same managers that are used for submission
  155. # evaluation. These are located under <task>/code/.
  156. path = os.path.join(os.path.dirname(self.task_module.__file__), "code")
  157. filenames = self._filenames_for_language(language, self.user_managers)
  158. full_paths = [os.path.join(path, filename) for filename in filenames]
  159. return full_paths
  160. def submit(self, task_id, user_id, language):
  161. full_paths = self._sources_names(language)
  162. self.submission_id[language] = self.framework.cws_submit(
  163. task_id, user_id,
  164. self.submission_format, full_paths, language)
  165. def wait(self, contest_id, language):
  166. # This means we were not able to submit, hence the error
  167. # should have been already noted.
  168. if self.submission_id.get(language) is None:
  169. return
  170. # Wait for evaluation to complete.
  171. result_info = self.framework.get_evaluation_result(
  172. contest_id, self.submission_id[language])
  173. # Run checks.
  174. for check in self.checks:
  175. try:
  176. check.check(result_info)
  177. except TestFailure:
  178. # Our caller can deal with these.
  179. raise
  180. def submit_user_test(self, task_id, user_id, language):
  181. submission_format = self.submission_format + self.user_manager_format
  182. full_paths = self._sources_names(language) + \
  183. self._user_managers_names(language)
  184. self.user_test_id[language] = self.framework.cws_submit_user_test(
  185. task_id, user_id, submission_format, full_paths, language)
  186. def wait_user_test(self, contest_id, language):
  187. # This means we were not able to submit, hence the error
  188. # should have been already noted.
  189. if self.user_test_id.get(language) is None:
  190. return
  191. # Wait for evaluation to complete.
  192. result_info = self.framework.get_user_test_result(
  193. contest_id, self.user_test_id[language])
  194. # Run checks.
  195. for check in self.user_checks:
  196. check.check(result_info)