command_celery.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. from __future__ import print_function
  2. from future import standard_library
  3. import sys
  4. import os
  5. from pkg_resources import iter_entry_points, VersionConflict
  6. import configparser
  7. from celery import Celery
  8. from ckan.lib.cli import CkanCommand
  9. standard_library.install_aliases() # noqa
  10. class CeleryCmd(CkanCommand):
  11. '''
  12. Manages the Celery daemons. This is an improved version of CKAN core's
  13. 'celeryd' command.
  14. Usage:
  15. paster celeryd2 run [all|bulk|priority]
  16. - Runs a celery daemon to run tasks on the bulk or priority queue
  17. '''
  18. summary = __doc__.split('\n')[0]
  19. usage = __doc__
  20. min_args = 0
  21. max_args = 2
  22. def __init__(self, name):
  23. super(CeleryCmd, self).__init__(name)
  24. self.parser.add_option('--loglevel',
  25. action='store',
  26. dest='loglevel',
  27. default='INFO',
  28. help='Celery logging - choose between DEBUG, INFO, WARNING, ERROR, CRITICAL or FATAL')
  29. self.parser.add_option('--concurrency',
  30. action='store',
  31. dest='concurrency',
  32. default='1',
  33. help='Number of concurrent processes to run')
  34. self.parser.add_option('-n', '--hostname',
  35. action='store',
  36. dest='hostname',
  37. help="Set custom hostname")
  38. def command(self):
  39. """
  40. Parse command line arguments and call appropriate method.
  41. """
  42. if not self.args or self.args[0] in ['--help', '-h', 'help']:
  43. print(self.usage)
  44. sys.exit(1)
  45. cmd = self.args[0]
  46. # Don't need to load the config as the db is generally not needed
  47. # self._load_config()
  48. # But we do want to get the filename of the ini
  49. try:
  50. self._get_config()
  51. except AttributeError:
  52. from ckan.lib.cli import _get_config
  53. _get_config(self.options.config)
  54. # Initialise logger after the config is loaded, so it is not disabled.
  55. # self.log = logging.getLogger(__name__)
  56. if cmd == 'run':
  57. queue = self.args[1]
  58. if queue == 'all':
  59. queue = 'priority,bulk'
  60. self.run_(loglevel=self.options.loglevel,
  61. queue=queue,
  62. concurrency=int(self.options.concurrency),
  63. hostname=self.options.hostname)
  64. else:
  65. print('Command %s not recognized' % cmd)
  66. sys.exit(1)
  67. def run_(self, loglevel='INFO', queue=None, concurrency=None,
  68. hostname=None):
  69. default_ini = os.path.join(os.getcwd(), 'development.ini')
  70. if self.options.config:
  71. os.environ['CKAN_CONFIG'] = os.path.abspath(self.options.config)
  72. elif os.path.isfile(default_ini):
  73. os.environ['CKAN_CONFIG'] = default_ini
  74. else:
  75. print('No .ini specified and none was found in current directory')
  76. sys.exit(1)
  77. # from ckan.lib.celery_app import celery
  78. celery_args = []
  79. if concurrency:
  80. celery_args.append('--concurrency=%d' % concurrency)
  81. if queue:
  82. celery_args.append('--queues=%s' % queue)
  83. if self.options.hostname:
  84. celery_args.append('--hostname=%s' % hostname)
  85. celery_args.append('--loglevel=%s' % loglevel)
  86. argv = ['celeryd'] + celery_args
  87. print('Running: %s' % ' '.join(argv))
  88. celery_app = self._celery_app()
  89. celery_app.worker_main(argv=argv)
  90. def _celery_app(self):
  91. # reread the ckan ini using ConfigParser so that we can get at the
  92. # non-pylons sections
  93. config = configparser.ConfigParser()
  94. config.read(self.options.config)
  95. celery_config = dict(
  96. CELERY_RESULT_SERIALIZER='json',
  97. CELERY_TASK_SERIALIZER='json',
  98. CELERY_IMPORTS=[],
  99. )
  100. for entry_point in iter_entry_points(group='ckan.celery_task'):
  101. try:
  102. celery_config['CELERY_IMPORTS'].extend(
  103. entry_point.load()()
  104. )
  105. except VersionConflict as e:
  106. error = 'ERROR in entry point load: %s %s' % (entry_point, e)
  107. print(error)
  108. pass
  109. LIST_PARAMS = 'CELERY_IMPORTS ADMINS ROUTES'.split()
  110. try:
  111. for key, value in config.items('app:celery'):
  112. celery_config[key.upper()] = value.split() \
  113. if key in LIST_PARAMS else value
  114. except configparser.NoSectionError:
  115. error = 'Could not find celery config in your ckan ini file (a section headed "[app:celery]".'
  116. print(error)
  117. sys.exit(1)
  118. celery_app = Celery()
  119. # Thes update of configuration means it is only possible to set each
  120. # key once so this is done once all of the options have been decided.
  121. celery_app.conf.update(celery_config)
  122. celery_app.loader.conf.update(celery_config)
  123. return celery_app