1234567891011121314151617181920212223242526272829303132333435 |
- import logging
- import ckan.plugins as plugins
- from ckan.plugins.interfaces import Interface
- log = logging.getLogger(__name__)
- class IPipe(Interface):
- """
- Process data in a Data Pipeline.
- Inherit this to subscribe to events in the Data Pipeline and be able to
- broadcast the results for others to process next. In this way, a number of
- IPipes can be linked up in sequence to build up a data processing pipeline.
- When a resource is archived, it broadcasts its resource_id, perhaps
- triggering a process which transforms the data to another format, or loads
- it into a datastore, or checks it against a schema. These processes can in
- turn put the resulting data into the pipeline
- """
- def receive_data(self, operation, queue, **params):
- pass
- @classmethod
- def send_data(cls, operation, queue, **params):
- for observer in plugins.PluginImplementations(cls):
- try:
- observer.receive_data(operation, queue, **params)
- except Exception as ex:
- log.exception(ex)
- # We reraise all exceptions so they are obvious there
- # is something wrong
- raise
|