1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- from datetime import datetime, timedelta
- from azure.storage.blob import BlobSasPermissions, generate_blob_sas
- from azurebatchload.checks import Checks
- class Base(Checks):
- def __init__(
- self,
- destination,
- folder,
- extension=None,
- modified_since=None,
- method="batch",
- list_files=None,
- expiry_download_links=7,
- ):
- super().__init__(directory=folder)
- self.destination = destination
- self.folder = folder
- self.extension = extension
- self.modified_since = modified_since
- if not self._check_azure_cli_installed():
- self.method = "single"
- else:
- self.method = method
- self.list_files = list_files
- credentials = self._check_connection_credentials()
- self.connection_string = credentials[0]
- self.account_name = credentials[1]
- self.account_key = credentials[2]
- self.expiry_download_links = expiry_download_links
- def checks(self):
- allowed_methods = ("batch", "single")
- if self.method not in allowed_methods:
- raise ValueError(f"Method {self.method} is not a valid method. Choose from {' or '.join(allowed_methods)}.")
- if self.list_files and self.method == "batch":
- raise ValueError("list_files is only allowed with method='single'.")
- if self.list_files and not isinstance(self.list_files, list):
- raise ValueError(f"Argument list_files was set, but is not of type list, but type {type(self.list_files)}")
- def create_blob_link(self, blob_folder, blob_name) -> str:
- if blob_folder:
- full_path_blob = f"{blob_folder}/{blob_name}"
- else:
- full_path_blob = blob_name
- url = f"https://{self.account_name}.blob.core.windows.net/{self.destination}/{full_path_blob}"
- sas_token = generate_blob_sas(
- account_name=self.account_name,
- account_key=self.account_key,
- container_name=self.destination,
- blob_name=full_path_blob,
- permission=BlobSasPermissions(read=True, delete_previous_version=False),
- expiry=datetime.utcnow() + timedelta(days=self.expiry_download_links),
- )
- url_with_sas = f"{url}?{sas_token}"
- return url_with_sas
- @staticmethod
- def create_not_case_sensitive_extension(extension):
- """
- We create in-case sensitive fnmatch
- .pdf -> .[Pp][Dd][Ff]
- .csv -> .[Cc][Ss][Vv]
- """
- new_extension = ""
- for letter in extension:
- if not letter.isalpha():
- new_extension += letter
- else:
- new_extension += f"[{letter.upper()}{letter}]"
- if not new_extension.startswith("*"):
- new_extension = "*" + new_extension
- return new_extension
- def define_pattern(self):
- self.extension = self.create_not_case_sensitive_extension(self.extension)
- if self.folder and not self.extension:
- if self.folder.endswith("/"):
- pattern = self.folder + "*"
- else:
- pattern = self.folder + "/*"
- elif self.folder and self.extension:
- pattern = self.folder.rstrip("/") + "/" + "*" + self.extension
- elif not self.folder and self.extension:
- pattern = "*" + self.extension
- else:
- pattern = None
- return pattern
|