#!/usr/bin/env python3 # Contest Management System - http://cms-dev.github.io/ # Copyright © 2010-2013 Giovanni Mascellani # Copyright © 2010-2018 Stefano Maggiolo # Copyright © 2010-2012 Matteo Boscariol # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """Utility functions for importers""" import functools from cms.db import Contest, Dataset, Task __all__ = [ "contest_from_db", "task_from_db", "update_contest", "update_task" ] class ImportDataError(Exception): pass def contest_from_db(contest_id, session): """Return the contest object with the given id contest_id (int|None): the id of the contest, or None to return None. session (Session): SQLAlchemy session to use. return (Contest|None): None if contest_id is None, or the contest. raise (ImportDataError): if there is no contest with the given id. """ if contest_id is None: return None contest = Contest.get_from_id(contest_id, session) if contest is None: raise ImportDataError( "The specified contest (id %s) does not exist." % contest_id) return contest def task_from_db(task_name, session): """Return the task object with the given name task_name (string|None): the name of the task, or None to return None. session (Session): SQLAlchemy session to use. return (Task|None): None if task_name is None, or the task. raise (ImportDataError): if there is no task with the given name. """ if task_name is None: return None task = session.query(Task).filter(Task.name == task_name).first() if task is None: raise ImportDataError( "The specified task (name %s) does not exist." % task_name) return task def _update_columns(old_object, new_object, spec=None): """Update the scalar columns of the object Update all non-relationship columns of old_object with the values in new_object, unless spec[attribute] is False. """ assert type(old_object) == type(new_object) spec = spec if spec is not None else {} for prp in old_object._col_props: if spec.get(prp.class_attribute, True) is False: continue if hasattr(new_object, prp.key): setattr(old_object, prp.key, getattr(new_object, prp.key)) def _update_object(old_object, new_object, spec=None, parent=None): """Update old_object with the values in new_object Update all columns with this strategy: - for non-relationship columns, use _update_columns (in particular, all columns are updated by default, unless spec[attribute] is false); - for relationship columns: - if the name is equal to parent, then it is ignored; - otherwise, it needs to be defined in spec; if spec is False, the column is ignored; if it is True, it is updated with the default strategy (see _update_list and _update_dict); otherwise if spec is a function, that function is used to update. old_object (Base): object to update. new_object (Base): object whose values will be used. spec ( {sqlalchemy.orm.attributes.InstrumentedAttribute: boolean|function} |None): a dictionary mapping attributes to a boolean (if not updating or using the default strategy) or to an updating function, with signature fn(old_value, new_value, parent=None). parent (string|None): the name of the relationship in the parent object, which is ignored. """ assert type(old_object) == type(new_object) spec = spec if spec is not None else {} # Update all scalar columns by default, unless spec says otherwise. _update_columns(old_object, new_object, spec) for prp in old_object._rel_props: # Don't update the parent relationship (works both for backref and # back_populates relationships). if parent is not None: if (prp.backref is not None and prp.backref[0] == parent) \ or prp.back_populates == parent: continue # To avoid bugs when new relationships are introduced, we force the # caller to describe how to update all other relationships. assert prp.class_attribute in spec, ( "Programming error: update specification not complete, " "missing relationship for %s.%s" % (prp.parent.class_, prp.class_attribute)) # Spec is false, it means we should not update this relationship. if spec[prp.class_attribute] is False: continue old_value = getattr(old_object, prp.key) new_value = getattr(new_object, prp.key) if spec[prp.class_attribute] is True: # Spec is true, it means we update the relationship with the # default update method (for lists or dicts). Note that the # values cannot have other relationships than the parent's, # otherwise _update_object will complain it doesn't have the # spec for them. update_fn = functools.partial(_update_object, parent=prp.key) if isinstance(old_value, dict): _update_dict(old_value, new_value, update_fn) elif isinstance(old_value, list): _update_list(old_value, new_value, update_fn) else: raise AssertionError( "Programming error: unknown type of relationship for " "%s.%s." % (prp.parent.class_, prp.class_attribute)) else: # Spec is not true, then it must be an update function, which # we duly apply. spec[prp.class_attribute](old_value, new_value, parent=prp.key) def _update_list(old_list, new_list, update_value_fn=None): """Update a SQLAlchemy relationship with type list Make old_list look like new_list, by: - up to the minimum length, calling update_value_fn on each element, to overwrite the values; - deleting additional entries in old_list, if they exist; - moving additional entries in new_list, if they exist. """ if update_value_fn is None: update_value_fn = _update_object old_len = len(old_list) new_len = len(new_list) # Update common elements. for old_value, new_value in zip(old_list, new_list): update_value_fn(old_value, new_value) # Delete additional elements of old_list. del old_list[new_len:] # Move additional elements from new_list to old_list. for _ in range(old_len, new_len): # For some funny behavior of SQLAlchemy-instrumented collections when # copying values, that resulted in new objects being added to the # session. temp = new_list[old_len] del new_list[old_len] old_list.append(temp) def _update_dict(old_dict, new_dict, update_value_fn=None): """Update a SQLAlchemy relationship with type dict Make old_dict look like new_dict, by: - calling update_value_fn to overwrite the values of old_dict with a corresponding value in new_dict; - deleting all entries in old_dict whose key is not in new_dict; - moving all entries in new_dict whose key is not in old_dict. """ if update_value_fn is None: update_value_fn = _update_object for key in set(old_dict.keys()) | set(new_dict.keys()): if key in new_dict: if key not in old_dict: # Move the object from new_dict to old_dict. For some funny # behavior of SQLAlchemy-instrumented collections when # copying values, that resulted in new objects being added # to the session. temp = new_dict[key] del new_dict[key] old_dict[key] = temp else: # Update the old value with the new value. update_value_fn(old_dict[key], new_dict[key]) else: # Delete the old value if no new value for that key. del old_dict[key] def _update_list_with_key(old_list, new_list, key, preserve_old=False, update_value_fn=None): """Update a SQLAlchemy list-relationship, using key for identity Make old_list look like new_list, in a similar way to _update_dict, as if the list was a dictionary with key computed using the key function. If preserve_old is true, elements in old_list with a key not present in new_list will be preserved. """ if update_value_fn is None: update_value_fn = _update_object old_dict = dict((key(v), v) for v in old_list) new_dict = dict((key(v), v) for v in new_list) for k in set(old_dict.keys()) | set(new_dict.keys()): if k in new_dict: if k not in old_dict: # Add new value to the old dictionary. temp = new_dict[k] new_list.remove(temp) old_list.append(temp) else: # Update the value in old_dict with the new value. update_value_fn(old_dict[k], new_dict[k]) elif not preserve_old: # Remove the old value not anymore present. old_list.remove(old_dict[k]) def update_dataset(old_dataset, new_dataset, parent=None): """Update old_dataset with information from new_dataset""" _update_object(old_dataset, new_dataset, { # Since we know it, hardcode to ignore the parent relationship. Dataset.task: False, # Relationships to update (all others). Dataset.managers: True, Dataset.testcases: True, }, parent=parent) def update_task(old_task, new_task, parent=None, get_statements=True): """Update old_task with information from new_task""" def update_datasets_fn(o, n, parent=None): _update_list_with_key( o, n, key= d.description, preserve_old=True, update_value_fn=functools.partial(update_dataset, parent=parent)) _update_object(old_task, new_task, { # Since we know it, hardcode to ignore the parent relationship. Task.contest: False, # Relationships not to update because not provided by the loader. Task.active_dataset: False, Task.submissions: False, Task.user_tests: False, # Relationships to update. Task.statements: get_statements, Task.datasets: update_datasets_fn, Task.attachments: True, # Scalar columns exceptions. Task.num: False, Task.primary_statements: get_statements, }, parent=parent) def update_contest(old_contest, new_contest, parent=None): """Update old_contest with information from new_contest""" _update_object(old_contest, new_contest, { # Announcements are not provided by the loader, we should keep # those we have. Contest.announcements: False, # Tasks and participations are top level objects for the loader, so # must be handled differently. Contest.tasks: False, Contest.participations: False, }, parent=parent)