12345678910111213141516171819202122232425262728293031323334353637383940414243 |
- #!/usr/bin/env python3
- import boto3
- import concurrent.futures
- from datetime import datetime, timedelta
- from gzip import GzipFile
- from io import BytesIO
- import os
- import requests
- from rx import Observable
- THREADS = 30
- S3 = boto3.resource('s3')
- S3_BUCKET = S3.Bucket(os.environ['S3_BUCKET'])
- def json_file_name_for_datetime(_datetime):
- return '{year}-{month:02}-{day:02}-{hour}.json'.format(year=_datetime.year, month=_datetime.month,
- day=_datetime.day, hour=_datetime.hour)
- def upload_hour_data_to_s3(_datetime, url):
- print("Requesting", url)
- response = requests.get(url)
- if response.status_code != 200:
- print(response)
- json_string = GzipFile(fileobj=BytesIO(response.content)).read()
- S3_BUCKET.put_object(Key=json_file_name_for_datetime(_datetime), Body=json_string)
- with concurrent.futures.ProcessPoolExecutor(THREADS) as executor:
- # noinspection PyUnresolvedReferences
- Observable \
- .just(datetime(2016, 1, 1, hour=0)) \
- .repeat() \
- .scan(lambda last_datetime, _: last_datetime + timedelta(hours=1)) \
- .take_while(lambda _datetime: _datetime.month < 11) \
- .map(lambda _datetime: (_datetime, 'http://data.githubarchive.org/{}.gz'.format(json_file_name_for_datetime(
- _datetime)))) \
- .flat_map(lambda datetime_url_tuple: executor.submit(upload_hour_data_to_s3, *datetime_url_tuple)) \
- .subscribe()
|