ImportTeam.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. #!/usr/bin/env python3
  2. # Contest Management System - http://cms-dev.github.io/
  3. # Copyright © 2015 William Di Luigi <williamdiluigi@gmail.com>
  4. # Copyright © 2016-2018 Stefano Maggiolo <s.maggiolo@gmail.com>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU Affero General Public License as
  8. # published by the Free Software Foundation, either version 3 of the
  9. # License, or (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU Affero General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU Affero General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """This script imports a team from disk using one of the available
  19. loaders.
  20. The data parsed by the loader is used to create a new Team in the
  21. database.
  22. """
  23. # We enable monkey patching to make many libraries gevent-friendly
  24. # (for instance, urllib3, used by requests)
  25. import gevent.monkey
  26. gevent.monkey.patch_all() # noqa
  27. import argparse
  28. import logging
  29. import os
  30. import sys
  31. from cms import utf8_decoder
  32. from cms.db import SessionGen, Team
  33. from cms.db.filecacher import FileCacher
  34. from cmscontrib.importing import ImportDataError
  35. from cmscontrib.loaders import choose_loader, build_epilog
  36. logger = logging.getLogger(__name__)
  37. class TeamImporter:
  38. """Script to create a team in the database."""
  39. def __init__(self, path, loader_class):
  40. self.file_cacher = FileCacher()
  41. self.loader = loader_class(os.path.realpath(path), self.file_cacher)
  42. def do_import(self):
  43. """Get the team from the TeamLoader and store it."""
  44. # Get the team
  45. team = self.loader.get_team()
  46. if team is None:
  47. return False
  48. # Store
  49. logger.info("Creating team on the database.")
  50. with SessionGen() as session:
  51. try:
  52. team = self._team_to_db(session, team)
  53. except ImportDataError as e:
  54. logger.error(str(e))
  55. logger.info("Error while importing, no changes were made.")
  56. return False
  57. session.commit()
  58. team_id = team.id
  59. logger.info("Import finished (new team id: %s).", team_id)
  60. return True
  61. def do_import_all(self, base_path, get_loader):
  62. """Get the participation list from the ContestLoader and then
  63. try to import the needed teams.
  64. """
  65. added = set()
  66. _, _, participations = self.loader.get_contest()
  67. for p in participations:
  68. if "team" in p:
  69. team_path = os.path.join(base_path, p["team"])
  70. if team_path not in added:
  71. added.add(team_path)
  72. importer = TeamImporter(
  73. path=team_path,
  74. loader_class=get_loader(team_path)
  75. )
  76. importer.do_import()
  77. return True
  78. @staticmethod
  79. def _team_to_db(session, team):
  80. old_team = session.query(Team).filter(Team.code == team.code).first()
  81. if old_team is not None:
  82. raise ImportDataError("Team \"%s\" already exists." % team.code)
  83. session.add(team)
  84. return team
  85. def main():
  86. """Parse arguments and launch process.
  87. """
  88. parser = argparse.ArgumentParser(
  89. description="Import a team to the database.",
  90. epilog=build_epilog(),
  91. formatter_class=argparse.RawDescriptionHelpFormatter
  92. )
  93. parser.add_argument(
  94. "-L", "--loader",
  95. action="store", type=utf8_decoder,
  96. default=None,
  97. help="use the specified loader (default: autodetect)"
  98. )
  99. parser.add_argument(
  100. "target",
  101. action="store", type=utf8_decoder, nargs="?",
  102. default=os.getcwd(),
  103. help="target file/directory from where to import team(s)"
  104. )
  105. parser.add_argument(
  106. "-A", "--all",
  107. action="store_true",
  108. help="try to import the needed teams inside target (not "
  109. "necessarily all of them)"
  110. )
  111. args = parser.parse_args()
  112. def get_loader(path):
  113. return choose_loader(args.loader, path, parser.error)
  114. importer = TeamImporter(
  115. path=args.target,
  116. loader_class=get_loader(args.target)
  117. )
  118. if args.all:
  119. success = importer.do_import_all(args.target, get_loader)
  120. else:
  121. success = importer.do_import()
  122. return 0 if success is True else 1
  123. if __name__ == "__main__":
  124. sys.exit(main())