tasks.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. import os
  2. import sys
  3. from django.forms import model_to_dict
  4. import requests
  5. import boto3
  6. from botocore.exceptions import ClientError
  7. import logging
  8. import datetime
  9. from .amazon_access import CARTIER_BUCKET, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY
  10. from .models import Run
  11. from celery import Task, shared_task
  12. """
  13. upload file to s3 bucket
  14. """
  15. class UploadToBucket:
  16. def __init__(self, run_id):
  17. self.run = Run.objects.filter(run = run_id).first()
  18. print("run in class", self.run)
  19. run_object = model_to_dict(self.run)
  20. self.run_id = run_id
  21. self.filename = run_object["name"]
  22. self.run_url = run_object["url"]
  23. self.total = 0
  24. self.uploaded = 0
  25. self.bucket = CARTIER_BUCKET
  26. self.session = boto3.Session(
  27. aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
  28. aws_access_key_id=AWS_ACCESS_KEY_ID
  29. )
  30. self.download_run()
  31. def update_status(self, attri, status):
  32. if attri == "status":
  33. self.run.status = status
  34. elif attri == "started_at":
  35. self.run.started_at = status
  36. elif attri == "done_at":
  37. self.run.done_at = status
  38. else:
  39. self.run.upload_status = status
  40. self.run.save()
  41. def download_run(self, dest_folder="runs"):
  42. """
  43. this function downloads the file from url into a destination folder called runs
  44. """
  45. if not os.path.exists(dest_folder):
  46. os.makedirs(dest_folder)
  47. file_path = os.path.join(dest_folder, self.filename)
  48. response = requests.get(self.run_url, stream=True)
  49. print("response", response)
  50. total_length = int(response.headers.get('content-length'))
  51. if response.ok:
  52. self.update_status("status", "Downloading")
  53. print("file is downloading...", os.path.abspath(file_path))
  54. with open(file_path, 'wb') as f:
  55. dl = 0
  56. for chunk in response.iter_content(chunk_size=1024 * 8):
  57. if chunk:
  58. dl += len(chunk)
  59. f.write(chunk)
  60. done = int(50 * dl / total_length)
  61. sys.stdout.write('\r[{}{}]'.format(
  62. '█' * done, '.' * (50-done)))
  63. sys.stdout.flush()
  64. f.flush()
  65. os.fsync(f.fileno())
  66. print(f"file {self.filename} is downloaded")
  67. self.upload_to_s3_bucket(file_path)
  68. else:
  69. print(
  70. f"Download failed: status code {response.status_code}\n{response.text}")
  71. def upload_progress(self, size):
  72. self.update_status("status", "Uploading")
  73. if self.total == 0:
  74. return
  75. self.uploaded += size
  76. upload_status = int(self.uploaded / self.total * 100)
  77. self.update_status('upload_status', upload_status)
  78. sys.stdout.write("uploading..... {} %".format(upload_status))
  79. sys.stdout.flush()
  80. def upload_to_s3_bucket(self, file_path):
  81. self.total = os.stat(file_path).st_size
  82. print("Uploading to s3 bucket....")
  83. s3 = self.session.resource('s3')
  84. try:
  85. self.update_status("started_at", datetime.datetime.now())
  86. s3.meta.client.upload_file(
  87. file_path, self.bucket, self.filename, Callback=self.upload_progress)
  88. self.update_status("status", "Done")
  89. self.update_status("done_at", datetime.datetime.now())
  90. print("Uploading to s3 bucket done")
  91. except ClientError as e:
  92. logging.error(e)
  93. self.update_status("Failed")
  94. self.delete_downloaded_file(file_path)
  95. def delete_downloaded_file(self, file_path):
  96. print(f"deleting {self.filename}....")
  97. os.remove(file_path)
  98. print(f"{self.filename} deleted..")
  99. # if __name__ == "__main__":
  100. # instance = UploadToBucket()
  101. @shared_task
  102. def celery_task(run_id):
  103. UploadToBucket(run_id)