table-service.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. import requests
  2. import config
  3. from azure import storage
  4. from PackageInformationWorker.PyPIPackageInformation import PyPIPackageInformation
  5. import json
  6. import azure.storage.queue as queue
  7. import traceback
  8. import urllib
  9. import logging
  10. logger = logging.getLogger()
  11. account_name = config.STORAGE_ACCOUNT_NAME
  12. account_key = config.STORAGE_ACCOUNT_KEY
  13. STATIC_ROW_KEY = 'ROWKEY'
  14. table_service = storage.CloudStorageAccount(account_name, account_key).create_table_service()
  15. table_service.create_table(config.PACKAGE_VERSION_DATA_TABLENAME)
  16. table_service.create_table(config.PACKAGE_SUMMARY_TABLENAME)
  17. def main():
  18. # package, version = ('azure', '1.0.0')
  19. # get a package to look at
  20. # check that package and version.
  21. # version data just gets filled in
  22. # summary trickier.
  23. # summary -> name,
  24. # first_published (might be different than python2_start if
  25. # not using trove classifier)
  26. # python2_start (change if we find earlier),
  27. # python2_end (change if we find earlier, remove if package
  28. # after this come in and has python2),
  29. # python3_start (change if we find earlier)
  30. try:
  31. qs = queue.QueueService(config.STORAGE_ACCOUNT_NAME, config.STORAGE_ACCOUNT_KEY)
  32. messages_in_batch = 5
  33. while True:
  34. messages = qs.get_messages(config.PACKAGE_QUEUE_NAME,numofmessages=messages_in_batch, visibilitytimeout=messages_in_batch*60)
  35. for message in messages:
  36. entity = json.loads(message.message_text)
  37. _process_one_package(entity["package"], entity["version"])
  38. # once completed delete the message
  39. qs.delete_message(config.PACKAGE_QUEUE_NAME, message.message_id, message.pop_receipt)
  40. except Exception as e:
  41. # swallow exception here. we will just reprocess and delete the message.
  42. # known failures:
  43. # - connection aborted by get_messages sometimes. this happens with a connectionreseterror (10054)
  44. # - Random json errors. Could add retry.
  45. logger.error(traceback.format_exc())
  46. def _process_one_package(package_name, version):
  47. logger.info("Worker: Package:{} Version:{}".format(package_name, version))
  48. if not package_name or not version:
  49. logger.warn("Package_name or version was empty. Moving on as the queue had bad data")
  50. return
  51. # .6684 seconds to run. 74577 total packages
  52. package_info = PyPIPackageInformation.get_package_specific_version_info(package_name, version)
  53. if not package_info:
  54. logger.error("Worker: Package:{} Version:{} failed to get package info".format(package_name, version))
  55. return
  56. supports_python_2 = len([x for x in package_info['classifiers'] if x.startswith('Programming Language :: Python :: 2')]) > 0
  57. supports_python_3 = len([x for x in package_info['classifiers'] if x.startswith('Programming Language :: Python :: 3')]) > 0
  58. uploaded = package_info['uploaded']
  59. try:
  60. summary_entity = table_service.get_entity(config.PACKAGE_SUMMARY_TABLENAME, package_name, STATIC_ROW_KEY)
  61. except:
  62. # we don't have a summary for this entry.
  63. summary_entity = {
  64. 'PartitionKey':package_name, 'RowKey':STATIC_ROW_KEY, 'First_Published':None,
  65. 'Python2_Start':None, 'Python2_End':None, 'Python3_Start':None
  66. }
  67. table_service.insert_or_replace_entity(config.PACKAGE_SUMMARY_TABLENAME, package_name, STATIC_ROW_KEY, summary_entity)
  68. summary_entity = table_service.get_entity(config.PACKAGE_SUMMARY_TABLENAME, package_name, STATIC_ROW_KEY)
  69. # set fields using upload. Upload is none if the version has never been uploaded
  70. # Basically just filter out packages that never have content from our records.
  71. if uploaded is not None:
  72. if not hasattr(summary_entity, 'First_Published') or summary_entity.First_Published is None or summary_entity.First_Published > uploaded:
  73. # if the published date is empty or later than the current release we
  74. # are viewing update
  75. summary_entity.First_Published = uploaded
  76. if supports_python_2 and \
  77. (not hasattr(summary_entity, 'Python2_Start') or summary_entity.Python2_Start is None or summary_entity.Python2_Start > uploaded):
  78. # if the published date is empty or later than the date and it supports
  79. # python 2
  80. summary_entity.Python2_Start = uploaded
  81. if supports_python_2 and hasattr(summary_entity, 'Python2_End') and summary_entity.Python2_End is not None and summary_entity.Python2_End < uploaded:
  82. # we support python2 but it is after the date we thought python 2
  83. # support ended we must not have really ended
  84. summary_entity.Python2_End = None
  85. elif hasattr(summary_entity, 'Python2_Start') and hasattr(summary_entity, 'Python2_End') and \
  86. summary_entity.Python2_Start is not None and summary_entity.Python2_End is not None and \
  87. (summary_entity.Python2_End > uploaded and summary_entity.Python2_Start < uploaded):
  88. # if we don't support python2, and we have started supporting python2
  89. # at some point
  90. # and if the date we are saying we ended is after the start
  91. summary_entity.Python2_End = uploaded
  92. if supports_python_3 and \
  93. (not hasattr(summary_entity, 'Python3_Start') or summary_entity.Python3_Start is None or summary_entity.Python3_Start > uploaded):
  94. # if the published date is empty or later than the current release we
  95. # are viewing update
  96. summary_entity.Python3_Start = uploaded
  97. version_entity = _insert_entity_to_package_version_table(package_name, version, supports_python_2, supports_python_3, package_info['downloads'], uploaded)
  98. summary_entity = table_service.insert_or_replace_entity(config.PACKAGE_SUMMARY_TABLENAME, package_name, STATIC_ROW_KEY, summary_entity)
  99. def _insert_entity_to_package_version_table(package, version, python2, python3, downloads, upload_time):
  100. # TODO: issue with python azure storage. Version can't have '~' in it. https://github.com/Azure/azure-storage-python/issues/76
  101. package_sanitized = urllib.parse.quote_plus(package)
  102. version_sanitized = urllib.parse.quote_plus(version)
  103. try:
  104. result = table_service.insert_or_replace_entity(config.PACKAGE_VERSION_DATA_TABLENAME, package_sanitized, version_sanitized,
  105. {'PartitionKey' : package_sanitized,
  106. 'RowKey': version_sanitized,
  107. 'Python2': python2,
  108. 'Python3': python3,
  109. 'Downloads': downloads,
  110. 'UploadTime': upload_time})
  111. return result
  112. except Exception as e:
  113. logger.error("Failed to insert Package:{} Version:{} Python2:{} Python3:{} Downloads:{} UploadTime:{} Exception:{}".format(
  114. package, version, python2, python3, downloads, upload_time, traceback.format_exc()))
  115. raise e