middleware.py 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. import logging
  2. import pathlib
  3. from django.core import signing
  4. from django.core.exceptions import PermissionDenied, SuspiciousFileOperation
  5. from django.utils.crypto import constant_time_compare
  6. from . import views
  7. from .storages import get_aws_location, local_dev, storage
  8. logger = logging.getLogger("s3file")
  9. class S3FileMiddleware:
  10. def __init__(self, get_response):
  11. self.get_response = get_response
  12. def __call__(self, request):
  13. file_fields = request.POST.getlist("s3file")
  14. for field_name in file_fields:
  15. paths = request.POST.getlist(field_name)
  16. if paths:
  17. try:
  18. signature = request.POST[f"{field_name}-s3f-signature"]
  19. except KeyError:
  20. raise PermissionDenied("No signature provided.")
  21. try:
  22. request.FILES.setlist(
  23. field_name, list(self.get_files_from_storage(paths, signature))
  24. )
  25. except SuspiciousFileOperation as e:
  26. raise PermissionDenied("Illegal file name!") from e
  27. if local_dev and request.path == "/__s3_mock__/":
  28. return views.S3MockView.as_view()(request)
  29. return self.get_response(request)
  30. @classmethod
  31. def get_files_from_storage(cls, paths, signature):
  32. """Return S3 file where the name does not include the path."""
  33. location = get_aws_location()
  34. for path in paths:
  35. path = pathlib.PurePosixPath(path)
  36. if not constant_time_compare(
  37. cls.sign_s3_key_prefix(path.parent), signature
  38. ):
  39. raise PermissionDenied("Illegal signature!")
  40. try:
  41. relative_path = str(path.relative_to(location))
  42. except ValueError as e:
  43. raise SuspiciousFileOperation(
  44. f"Path is outside the storage location: {path}"
  45. ) from e
  46. try:
  47. f = storage.open(relative_path)
  48. f.name = path.name
  49. yield f
  50. except (OSError, ValueError):
  51. logger.exception("File not found: %s", path)
  52. @classmethod
  53. def sign_s3_key_prefix(cls, path):
  54. """
  55. Signature to validate the S3 keys passed the middleware before fetching files.
  56. Return a base64-encoded HMAC-SHA256 of the upload folder aka the S3 key-prefix.
  57. """
  58. return signing.Signer(salt="s3file.middleware.S3FileMiddleware").signature(path)