interfaces.py 1.2 KB

1234567891011121314151617181920212223242526272829303132333435
  1. import logging
  2. import ckan.plugins as plugins
  3. from ckan.plugins.interfaces import Interface
  4. log = logging.getLogger(__name__)
  5. class IPipe(Interface):
  6. """
  7. Process data in a Data Pipeline.
  8. Inherit this to subscribe to events in the Data Pipeline and be able to
  9. broadcast the results for others to process next. In this way, a number of
  10. IPipes can be linked up in sequence to build up a data processing pipeline.
  11. When a resource is archived, it broadcasts its resource_id, perhaps
  12. triggering a process which transforms the data to another format, or loads
  13. it into a datastore, or checks it against a schema. These processes can in
  14. turn put the resulting data into the pipeline
  15. """
  16. def receive_data(self, operation, queue, **params):
  17. pass
  18. @classmethod
  19. def send_data(cls, operation, queue, **params):
  20. for observer in plugins.PluginImplementations(cls):
  21. try:
  22. observer.receive_data(operation, queue, **params)
  23. except Exception as ex:
  24. log.exception(ex)
  25. # We reraise all exceptions so they are obvious there
  26. # is something wrong
  27. raise