123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130 |
- import requests
- import config
- from azure import storage
- from PackageInformationWorker.PyPIPackageInformation import PyPIPackageInformation
- import json
- import azure.storage.queue as queue
- import traceback
- import urllib
- import logging
- logger = logging.getLogger()
- account_name = config.STORAGE_ACCOUNT_NAME
- account_key = config.STORAGE_ACCOUNT_KEY
- STATIC_ROW_KEY = 'ROWKEY'
- table_service = storage.CloudStorageAccount(account_name, account_key).create_table_service()
- table_service.create_table(config.PACKAGE_VERSION_DATA_TABLENAME)
- table_service.create_table(config.PACKAGE_SUMMARY_TABLENAME)
- def main():
-
-
-
-
-
-
-
-
-
-
-
-
- try:
- qs = queue.QueueService(config.STORAGE_ACCOUNT_NAME, config.STORAGE_ACCOUNT_KEY)
- messages_in_batch = 5
- while True:
- messages = qs.get_messages(config.PACKAGE_QUEUE_NAME,numofmessages=messages_in_batch, visibilitytimeout=messages_in_batch*60)
- for message in messages:
- entity = json.loads(message.message_text)
- _process_one_package(entity["package"], entity["version"])
-
- qs.delete_message(config.PACKAGE_QUEUE_NAME, message.message_id, message.pop_receipt)
- except Exception as e:
-
-
-
-
- logger.error(traceback.format_exc())
-
- def _process_one_package(package_name, version):
- logger.info("Worker: Package:{} Version:{}".format(package_name, version))
- if not package_name or not version:
- logger.warn("Package_name or version was empty. Moving on as the queue had bad data")
- return
-
- package_info = PyPIPackageInformation.get_package_specific_version_info(package_name, version)
- if not package_info:
- logger.error("Worker: Package:{} Version:{} failed to get package info".format(package_name, version))
- return
- supports_python_2 = len([x for x in package_info['classifiers'] if x.startswith('Programming Language :: Python :: 2')]) > 0
- supports_python_3 = len([x for x in package_info['classifiers'] if x.startswith('Programming Language :: Python :: 3')]) > 0
- uploaded = package_info['uploaded']
- try:
- summary_entity = table_service.get_entity(config.PACKAGE_SUMMARY_TABLENAME, package_name, STATIC_ROW_KEY)
- except:
-
- summary_entity = {
- 'PartitionKey':package_name, 'RowKey':STATIC_ROW_KEY, 'First_Published':None,
- 'Python2_Start':None, 'Python2_End':None, 'Python3_Start':None
- }
- table_service.insert_or_replace_entity(config.PACKAGE_SUMMARY_TABLENAME, package_name, STATIC_ROW_KEY, summary_entity)
- summary_entity = table_service.get_entity(config.PACKAGE_SUMMARY_TABLENAME, package_name, STATIC_ROW_KEY)
-
-
- if uploaded is not None:
- if not hasattr(summary_entity, 'First_Published') or summary_entity.First_Published is None or summary_entity.First_Published > uploaded:
-
-
- summary_entity.First_Published = uploaded
- if supports_python_2 and \
- (not hasattr(summary_entity, 'Python2_Start') or summary_entity.Python2_Start is None or summary_entity.Python2_Start > uploaded):
-
-
- summary_entity.Python2_Start = uploaded
-
- if supports_python_2 and hasattr(summary_entity, 'Python2_End') and summary_entity.Python2_End is not None and summary_entity.Python2_End < uploaded:
-
-
- summary_entity.Python2_End = None
- elif hasattr(summary_entity, 'Python2_Start') and hasattr(summary_entity, 'Python2_End') and \
- summary_entity.Python2_Start is not None and summary_entity.Python2_End is not None and \
- (summary_entity.Python2_End > uploaded and summary_entity.Python2_Start < uploaded):
-
-
-
- summary_entity.Python2_End = uploaded
- if supports_python_3 and \
- (not hasattr(summary_entity, 'Python3_Start') or summary_entity.Python3_Start is None or summary_entity.Python3_Start > uploaded):
-
-
- summary_entity.Python3_Start = uploaded
- version_entity = _insert_entity_to_package_version_table(package_name, version, supports_python_2, supports_python_3, package_info['downloads'], uploaded)
- summary_entity = table_service.insert_or_replace_entity(config.PACKAGE_SUMMARY_TABLENAME, package_name, STATIC_ROW_KEY, summary_entity)
- def _insert_entity_to_package_version_table(package, version, python2, python3, downloads, upload_time):
-
- package_sanitized = urllib.parse.quote_plus(package)
- version_sanitized = urllib.parse.quote_plus(version)
- try:
- result = table_service.insert_or_replace_entity(config.PACKAGE_VERSION_DATA_TABLENAME, package_sanitized, version_sanitized,
- {'PartitionKey' : package_sanitized,
- 'RowKey': version_sanitized,
- 'Python2': python2,
- 'Python3': python3,
- 'Downloads': downloads,
- 'UploadTime': upload_time})
- return result
- except Exception as e:
- logger.error("Failed to insert Package:{} Version:{} Python2:{} Python3:{} Downloads:{} UploadTime:{} Exception:{}".format(
- package, version, python2, python3, downloads, upload_time, traceback.format_exc()))
- raise e
|