importers.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. #!/usr/bin/env python3
  2. # Contest Management System - http://cms-dev.github.io/
  3. # Copyright © 2016 Peyman Jabbarzade Ganje <peyman.jabarzade@gmail.com>
  4. #
  5. # This program is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU Affero General Public License as
  7. # published by the Free Software Foundation, either version 3 of the
  8. # License, or (at your option) any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU Affero General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU Affero General Public License
  16. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. import logging
  18. import zipfile
  19. from cms.db import Testcase
  20. logger = logging.getLogger(__name__)
  21. def import_testcases_from_zipfile(
  22. session, file_cacher, dataset,
  23. archive, input_re, output_re, overwrite, public):
  24. """Import testcases from a zipped archive
  25. session (Session): session to use to add the testcases.
  26. file_cacher (FileCacher): interface to access the files in the DB.
  27. dataset (Dataset): dataset where to add the testcases.
  28. archive (File): file-like object representing a zip file.
  29. input_re (_sre.SRE_Pattern): regular expression matching the input
  30. filenames (e.g., re.compile(r"input_(.*).txt)).
  31. output_re (_sre.SRE_Pattern): regular expression matching the output
  32. filenames (e.g., re.compile(r"output_(.*).txt)).
  33. overwrite (bool): whether to overwrite existing testcases.
  34. public (bool): whether to mark the new testcases as public.
  35. return ((unicode, unicode)): subject and text of a message describing
  36. the outcome of the operation.
  37. """
  38. task_name = dataset.task.name
  39. try:
  40. with zipfile.ZipFile(archive, "r") as archive_zfp:
  41. tests = dict()
  42. # Match input/output file names to testcases' codenames.
  43. for filename in archive_zfp.namelist():
  44. match = input_re.match(filename)
  45. if match:
  46. codename = match.group(1)
  47. if codename not in tests:
  48. tests[codename] = [None, None]
  49. tests[codename][0] = filename
  50. else:
  51. match = output_re.match(filename)
  52. if match:
  53. codename = match.group(1)
  54. if codename not in tests:
  55. tests[codename] = [None, None]
  56. tests[codename][1] = filename
  57. skipped_tc = []
  58. overwritten_tc = []
  59. added_tc = []
  60. for codename, testdata in tests.items():
  61. # If input or output file isn't found, skip it.
  62. if not testdata[0] or not testdata[1]:
  63. continue
  64. # Check, whether current testcase already exists.
  65. if codename in dataset.testcases:
  66. # If we are allowed, remove existing testcase.
  67. # If not - skip this testcase.
  68. if overwrite:
  69. testcase = dataset.testcases[codename]
  70. session.delete(testcase)
  71. try:
  72. session.commit()
  73. except Exception:
  74. skipped_tc.append(codename)
  75. continue
  76. overwritten_tc.append(codename)
  77. else:
  78. skipped_tc.append(codename)
  79. continue
  80. # Add current testcase.
  81. try:
  82. input_ = archive_zfp.read(testdata[0])
  83. output = archive_zfp.read(testdata[1])
  84. except Exception:
  85. raise Exception("Reading testcase %s failed" % codename)
  86. try:
  87. input_digest = file_cacher.put_file_content(
  88. input_, "Testcase input for task %s" % task_name)
  89. output_digest = file_cacher.put_file_content(
  90. output, "Testcase output for task %s" % task_name)
  91. except Exception:
  92. raise Exception("Testcase storage failed")
  93. testcase = Testcase(codename, public, input_digest,
  94. output_digest, dataset=dataset)
  95. session.add(testcase)
  96. try:
  97. session.commit()
  98. except Exception:
  99. raise Exception("Couldn't add test %s" % codename)
  100. if codename not in overwritten_tc:
  101. added_tc.append(codename)
  102. except zipfile.BadZipfile:
  103. raise Exception(
  104. "The selected file is not a zip file. "
  105. "Please select a valid zip file.")
  106. return (
  107. "Successfully added %d and overwritten %d testcase(s)" %
  108. (len(added_tc), len(overwritten_tc)),
  109. "Added: %s; overwritten: %s; skipped: %s" %
  110. (", ".join(added_tc) if added_tc else "none",
  111. ", ".join(overwritten_tc) if overwritten_tc else "none",
  112. ", ".join(skipped_tc) if skipped_tc else "none"))