gh2s3.py 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243
  1. #!/usr/bin/env python3
  2. import boto3
  3. import concurrent.futures
  4. from datetime import datetime, timedelta
  5. from gzip import GzipFile
  6. from io import BytesIO
  7. import os
  8. import requests
  9. from rx import Observable
  10. THREADS = 30
  11. S3 = boto3.resource('s3')
  12. S3_BUCKET = S3.Bucket(os.environ['S3_BUCKET'])
  13. def json_file_name_for_datetime(_datetime):
  14. return '{year}-{month:02}-{day:02}-{hour}.json'.format(year=_datetime.year, month=_datetime.month,
  15. day=_datetime.day, hour=_datetime.hour)
  16. def upload_hour_data_to_s3(_datetime, url):
  17. print("Requesting", url)
  18. response = requests.get(url)
  19. if response.status_code != 200:
  20. print(response)
  21. json_string = GzipFile(fileobj=BytesIO(response.content)).read()
  22. S3_BUCKET.put_object(Key=json_file_name_for_datetime(_datetime), Body=json_string)
  23. with concurrent.futures.ProcessPoolExecutor(THREADS) as executor:
  24. # noinspection PyUnresolvedReferences
  25. Observable \
  26. .just(datetime(2016, 1, 1, hour=0)) \
  27. .repeat() \
  28. .scan(lambda last_datetime, _: last_datetime + timedelta(hours=1)) \
  29. .take_while(lambda _datetime: _datetime.month < 11) \
  30. .map(lambda _datetime: (_datetime, 'http://data.githubarchive.org/{}.gz'.format(json_file_name_for_datetime(
  31. _datetime)))) \
  32. .flat_map(lambda datetime_url_tuple: executor.submit(upload_hour_data_to_s3, *datetime_url_tuple)) \
  33. .subscribe()