123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831 |
- #!/usr/bin/env python
- from azure.storage.blob import BlobSasPermissions, BlobServiceClient, generate_blob_sas, RetentionPolicy
- from argparse import ArgumentParser
- from io import StringIO
- import pandas as pd
- import numpy as np
- import coloredlogs
- import datetime
- import logging
- import getpass
- import keyring
- import pathlib
- import pandas
- import azure
- import time
- import os
- import re
- def create_parent_parser(parser, container=True):
- """
- Create a parent parser with arguments common to multiple scripts
- :param parser: type ArgumentParser object
- :param container: type bool: Boolean on whether the container argument should be added to the parser
- :return: subparsers: type ArgumentParser.add_subparsers
- :return: parent_parser: type ArgumentParser: Populated ArgumentParser object
- """
- subparsers = parser.add_subparsers(title='Available functionality')
- # Create a parental parser that can be inherited by subparsers
- parent_parser = ArgumentParser(add_help=False)
- if container:
- parent_parser.add_argument('-c', '--container_name',
- required=True,
- type=str,
- default=str(),
- help='Name of the Azure storage container. Note that container names must be '
- 'lowercase, between 3 and 63 characters, start with a letter or number, and '
- 'can contain only letters, numbers, and the dash (-) character. Consecutive '
- 'dashes are not permitted.')
- parent_parser.add_argument('-a', '--account_name',
- required=True,
- type=str,
- help='Name of the Azure storage account')
- parent_parser.add_argument('-p', '--passphrase',
- default='AzureStorage',
- type=str,
- help='The passphrase to use when encrypting the azure storage-specific connection '
- 'string to the system keyring. Default is "AzureStorage".')
- parent_parser.add_argument('-v', '--verbosity',
- choices=['debug', 'info', 'warning', 'error', 'critical'],
- metavar='VERBOSITY',
- default='info',
- help='Set the logging level. Options are debug, info, warning, error, and critical. '
- 'Default is info.')
- return subparsers, parent_parser
- def setup_logging(arguments):
- """
- Set the custom colour scheme and message format to used by coloredlogs
- :param arguments: type parsed ArgumentParser object
- """
- # Set up logging
- coloredlogs.DEFAULT_LEVEL_STYLES = {'debug': {'bold': True, 'color': 'green'},
- 'info': {'bold': True, 'color': 'blue'},
- 'warning': {'bold': True, 'color': 'yellow'},
- 'error': {'bold': True, 'color': 'red'},
- 'critical': {'bold': True, 'background': 'red'}
- }
- coloredlogs.DEFAULT_LOG_FORMAT = '%(asctime)s %(levelname)s %(message)s'
- coloredlogs.install(level=arguments.verbosity.upper())
- def setup_arguments(parser):
- """
- Finalise setting up the ArgumentParser arguments into an object, and running subparser functions, or displaying the
- help message
- :param parser: type: ArgumentParser object
- :return: parsed ArgumentParser object
- """
- # Get the arguments into an object
- arguments = parser.parse_args()
- # Run the appropriate function for each sub-parser.
- if hasattr(arguments, 'func'):
- # Set up logging
- setup_logging(arguments=arguments)
- arguments.func(arguments)
- # If the 'func' attribute doesn't exist, display the basic help for the appropriate subparser (if any)
- else:
- try:
- # Determine which subparser was called by extracting it from the arguments. Note that this requires the
- # use of the desc keyword when creating subparsers
- command = list(vars(arguments).keys())[0]
- # If the extracted command exists, use the command-specific subparser help
- if command:
- parser.parse_args([command, '-h'])
- # Otherwise, use the basic help
- else:
- parser.parse_args(['-h'])
- # If the were no subparsers specified (the list of keys in the arguments is empty), use the basic help
- except IndexError:
- parser.parse_args(['-h'])
- return arguments
- def set_account_name(passphrase, account_name=None):
- """
- Store the account name in the system keyring
- :param passphrase: type str: Simple passphrase to use to store the connection string in the system keyring
- :param account_name: type str: Name of the Azure storage account
- """
- # Only prompt the user for the account name if it was not provided
- if account_name is None:
- # Prompt the user for the account name
- account_name = input('Please enter your account name\n').encode('utf-8').decode()
- # Set the account name into the keyring. Treat it as a password, and use the passphrase as both the service ID,
- # and the username
- keyring.set_password(passphrase, passphrase, account_name)
- return account_name
- def set_connection_string(passphrase, account_name):
- """
- Prompt the user for the connection string, and store it the system keyring
- Uses logic from https://stackoverflow.com/a/31882203
- :param passphrase: type str: Simple passphrase to use to store the connection string in the system keyring
- :param account_name: type str: Name of the Azure storage account
- :return: connect_str: String of the connection string
- """
- # Prompt the user for the connection string. Use decode to convert from bytes. Use getpass, so the plain text
- # password isn't printed to the screen
- connect_str = getpass.getpass(prompt='Please enter the connection string for your Azure storage account:\n')\
- .encode('utf-8').decode()
- # Ensure that the account name provided and the account name specified in the connection string match
- confirm_account_match(account_name=account_name,
- connect_str=connect_str)
- # Set the password in the keyring. Use the passphrase as the service ID, the account name as the username,
- # and the connection string as the password
- keyring.set_password(passphrase, account_name, connect_str)
- logging.info('Successfully entered credentials into keyring')
- return connect_str
- def confirm_account_match(account_name, connect_str):
- """
- Ensure that the account name provided matches the account name stored in the connection string
- :param connect_str: type str: Connection string for the Azure storage account
- :param account_name: type str: Name of the Azure storage account
- """
- # Attempt to extract the account name from the connection string
- try:
- connect_str_account_name = connect_str.split(';')[1].split('AccountName=')[-1]
- # Ensure that the account name provided matches the account name found in the connection string
- if account_name != connect_str_account_name:
- logging.error(f'The supplied account name {account_name} does not match the account name stored in the '
- f'connection string ({connect_str_account_name}). Please ensure that you are providing the '
- f'appropriate connection string for your account.')
- raise SystemExit
- # If splitting on 'AccountName=' fails, the connection string is either malformed or invalid
- except IndexError:
- logging.error('Could not parse the account key from the connection string in the keyring. Please ensure that '
- 'it has been entered, and the it conforms to the proper format: '
- 'DefaultEndpointsProtocol=https;AccountName=[REDACTED];AccountKey=[REDACTED];'
- 'EndpointSuffix=core.windows.net')
- raise SystemExit
- return True
- def extract_connection_string(passphrase, account_name):
- """
- Extract the connection string from the keyring using the account name and passphrase
- :param passphrase: type str: Simple passphrase to use to store the connection string in the system keyring
- :param account_name: type str: Name of the Azure storage account
- :return: connect_str: String of the connection string
- """
- # Use the passphrase and the account name to extract the connection string from the keyring
- connect_str = keyring.get_password(passphrase,
- account_name)
- # If the connection string can't be found in the keyring using the supplied passphrase, prompt the user for
- # the passphrase, and store it
- if not connect_str:
- logging.warning(f'Connection string linked to the provided passphrase {passphrase} and account name '
- f'{account_name} was not found in the system keyring. You will now be prompted to enter it.')
- connect_str = set_connection_string(passphrase=passphrase,
- account_name=account_name)
- # Confirm that the account name provided matches the one found in the connection string
- confirm_account_match(account_name=account_name,
- connect_str=connect_str)
- return connect_str
- def extract_account_name(passphrase):
- """
- Extract the account name from the system keyring
- :param passphrase: type str: Simple passphrase to use to store the connection string in the system keyring
- :return: account_name: Name of the Azure storage account
- """
- # Use the passphrase to extract the account name
- account_name = keyring.get_password(passphrase,
- passphrase)
- # If the account name hasn't been entered into the keyring, prompt the user to enter it
- if not account_name:
- logging.warning(f'Account name linked to the provided passphrase {passphrase} was not found in the system '
- f'keyring. You will now be prompted to enter it.')
- account_name = set_account_name(passphrase=passphrase)
- return account_name
- def extract_account_key(connect_str):
- """
- Extract the account key from the connection string. This is necessary for the method that creates the blob SAS,
- as it doesn't accept connection strings
- :param connect_str: type str: Connection string for the Azure storage account
- :return account_key: String of the account key extracted from the connection string
- """
- # Split the connection string on ';', use the entry corresponding to the account key, and strip off the
- # 'AccountKey='
- # DefaultEndpointsProtocol=https;AccountName=[REDACTED];AccountKey=[REDACTED];EndpointSuffix=core.windows.net
- try:
- account_key = connect_str.split(';')[2].split('AccountKey=')[-1]
- except IndexError:
- logging.error('Could not parse the account key from the connection string in the keyring. Please ensure that '
- 'it has been entered, and the it conforms to the proper format: '
- 'DefaultEndpointsProtocol=https;AccountName=[REDACTED];AccountKey=[REDACTED];'
- 'EndpointSuffix=core.windows.net')
- raise SystemExit
- return account_key
- def delete_keyring_credentials(passphrase, account_name=None):
- """
- Delete the password associated with the passphrase and account name from the system keyring
- :param passphrase: type str: Simple passphrase to use to store the connection string in the system keyring
- :param account_name: type str: Name of the Azure storage account
- :return: account_name: Name of the Azure storage account
- """
- if not account_name:
- # Prompt the user for the account name
- account_name = input('Please enter your account name\n').encode('utf-8').decode()
- try:
- # Delete the password from the system keyring
- keyring.delete_password(passphrase, account_name)
- except keyring.errors.PasswordDeleteError:
- logging.error(f'Connection string associated with passphrase {passphrase} and account name {account_name} '
- f'not found in system keyring. Please ensure that you supplied the correct arguments.')
- raise SystemExit
- return account_name
- def validate_container_name(container_name, object_type='container'):
- """
- Use a regex to check if the supplied name follows the guidelines for Azure nomenclature. If it doesn't, attempt to
- rename the container/object
- :param container_name: type str: Name of the container/object of interest
- :param object_type: type str: Name of the object being validated. Default is container, but target container, and
- target path are other options
- :return: container_name: String of sanitised container name
- """
- if not re.match('^[a-z0-9](?!.*--)[a-z0-9-]{1,61}[a-z0-9]$', container_name):
- logging.warning(f'{object_type.capitalize()} name, {container_name} is invalid. {object_type.capitalize()} '
- f'names must be between 3 and 63 characters, start with a letter or number, and can contain '
- f'only letters, numbers, and the dash (-) character. Every dash (-) character must be '
- f'immediately preceded and followed by a letter or number; consecutive dashes are not '
- f'permitted in {object_type} names. All letters in a {object_type} name must be lowercase.')
- logging.info(f'Attempting to fix the {object_type} name')
- # Swap out dashes for underscores, as they will be removed in the following regex
- container_name = container_name.replace('-', '_')
- # Use re to remove all non-word characters (including dashes)
- container_name = re.sub(r'[^\w]', '', container_name)
- # Replace multiple underscores with a single one. Uses logic from: https://stackoverflow.com/a/46701355
- # Also ensure that the container name is in lowercase
- container_name = re.sub(r'[^\w\s]|(_)(?=\1)', '', container_name).lower()
- # Swap out underscores for dashes
- container_name = container_name.replace('_', '-')
- # Ensure that the container name doesn't start or end with a dash
- container_name = re.sub(r'^-+', '', container_name)
- container_name = re.sub(r'-+$', '', container_name)
- # Ensure that the container name isn't length zero, or the while loop below will be infinite
- if len(container_name) == 0:
- logging.error(f'Attempting to fix the {object_type} name left zero valid characters! '
- 'Please enter a new name.')
- raise SystemExit
- # If the container name is too long, slice it to be 63 characters
- if len(container_name) >= 63:
- logging.warning(f'{object_type.capitalize()} name {container_name} was too long. Using {container_name[:62]} '
- f'instead')
- container_name = container_name[:62]
- # If the container name is too short, keep adding the container name to itself to bump up the length
- while len(container_name) < 3:
- logging.warning(f'{object_type.capitalize()} name {container_name} was too short (only {len(container_name)} '
- f'characters). Using {container_name + container_name} instead')
- container_name = container_name + container_name
- # Use the validated container name
- logging.info(f'Using {container_name} as the {object_type} name')
- return container_name
- def create_blob_service_client(connect_str):
- """
- Create a blob service client using the connection string
- :param connect_str: type str: Connection string for Azure storage
- :return: blob_service_client: type azure.storage.blob.BlobServiceClient
- """
- try:
- blob_service_client = BlobServiceClient.from_connection_string(connect_str)
- return blob_service_client
- except ValueError:
- logging.error('Your connection string was rejected. Please ensure that you entered it properly, and that it '
- 'is valid')
- raise SystemExit
- def create_container(blob_service_client, container_name):
- """
- Create a new container and container-specific client from the blob service client
- :param blob_service_client: type: type azure.storage.blob.BlobServiceClient
- :param container_name: type str: Name of the container of interest
- :return: container_client: type azure.storage.blob.BlobServiceClient.ContainerClient
- """
- # Hide the INFO-level messages sent to the logger from Azure by increasing the logging level to WARNING
- logging.getLogger().setLevel(logging.WARNING)
- try:
- container_client = blob_service_client.create_container(container_name)
- except azure.core.exceptions.ResourceExistsError as e:
- if 'The specified container already exists.' in str(e):
- container_client = create_container_client(blob_service_client=blob_service_client,
- container_name=container_name)
- elif 'The specified container is being deleted. Try operation later.' in str(e):
- logging.error(f'Could not create the requested container {container_name}. As it has recently been '
- f'deleted, please try again in a few moments')
- raise SystemExit
- else:
- logging.error(f'Could not create container {container_name}')
- raise SystemExit
- return container_client
- def create_container_client(blob_service_client, container_name, create=True):
- """
- Create a container-specific client from the blob service client
- :param blob_service_client: type: azure.storage.blob.BlobServiceClient
- :param container_name: type str: Name of the container of interest
- :param create: type bool: Boolean whether to create a container if it doesn't exist
- :return: container_client: type azure.storage.blob.BlobServiceClient.ContainerClient
- """
- # Create the container client from the blob service client with the get container client method
- # and the container name
- container_client = blob_service_client.get_container_client(container_name)
- # Create the container if it does not exist
- if not container_client.exists() and create:
- container_client = create_container(blob_service_client=blob_service_client,
- container_name=container_name)
- return container_client
- def create_blob_client(blob_service_client, container_name, blob_file):
- """
- Create a blob-specific client
- :param blob_service_client: type: azure.storage.blob.BlobServiceClient
- :param container_name: type str: Name of the container of interest
- :param blob_file: type iterable from azure.storage.blob.BlobServiceClient.ContainerClient.list_blobs
- :return: blob_client: type azure.storage.blob.BlobServiceClient.BlobClient
- """
- # Create a blob client for the current blob
- blob_client = blob_service_client.get_blob_client(container=container_name,
- blob=blob_file)
- return blob_client
- def create_blob_sas(blob_file, account_name, container_name, account_key, expiry, sas_urls):
- """
- Create SAS URL for blob
- :param blob_file: type container_client.list_blobs() object
- :param account_name: type str: Name of Azure storage account
- :param container_name: type str: Name of container in Azure storage in which the file is located
- :param account_key: type str: Account key of Azure storage account
- :param expiry: type int: Number of days that the SAS URL will be valid
- :param sas_urls: type dict: Dictionary of file name: SAS URL (empty)
- :return: populated sas_urls
- """
- # Set the name of file by removing any path information
- file_name = os.path.basename(blob_file.name)
- # Create the blob SAS. Use a start time 15 minutes in the past, and the requested expiry
- sas_token = generate_blob_sas(
- account_name=account_name,
- container_name=container_name,
- blob_name=blob_file.name,
- account_key=account_key,
- permission=BlobSasPermissions(read=True),
- start=datetime.datetime.utcnow() - datetime.timedelta(minutes=15),
- expiry=datetime.datetime.utcnow() + datetime.timedelta(days=expiry))
- # Create the SAS URL, and add it to the dictionary with the file_name as the key
- sas_urls[file_name] = create_sas_url(account_name=account_name,
- container_name=container_name,
- blob_name=blob_file.name,
- sas_token=sas_token)
- return sas_urls
- def client_prep(container_name, passphrase, account_name, create=True):
- """
- Validate the container name, and prepare the necessary clients
- :param container_name: type str: Name of the container of interest
- :param passphrase: type str: Simple passphrase to use to store the connection string in the system keyring
- :param account_name: type str: Name of the Azure storage account
- :param create: type bool: Boolean whether to create a container if it doesn't exist
- :return: container_name: Validated container name
- :return: connect_str: String of the connection string
- :return: blob_service_client: azure.storage.blob.BlobServiceClient
- :return: container_client: azure.storage.blob.BlobServiceClient.ContainerClient
- """
- # Validate the container name
- container_name = validate_container_name(container_name=container_name)
- # Extract the connection string from the system keyring
- connect_str = extract_connection_string(passphrase=passphrase,
- account_name=account_name)
- # Create the blob service client using the connection string
- blob_service_client = create_blob_service_client(connect_str=connect_str)
- # Create the container client for the desired container with the blob service client
- container_client = create_container_client(blob_service_client=blob_service_client,
- container_name=container_name,
- create=create)
- return container_name, connect_str, blob_service_client, container_client
- def sas_prep(container_name, passphrase, account_name, create=True):
- """
- Validate container names, extract connection strings, and account keys, and create necessary clients for
- SAS URL creation
- :param container_name: type str: Name of the container of interest
- :param passphrase: type str: Simple passphrase to use to store the connection string in the system keyring
- :param account_name: type str: Name of the Azure storage account
- :param create: type bool: Boolean whether to create a container if it doesn't exist
- :return: container_name: Validated container name
- :return: connect_str: Connection string for Azure storage
- :return: account_key: Account key for Azure storage
- :return: blob_service_client: azure.storage.blob.BlobServiceClient
- :return: container_client: azure.storage.blob.BlobServiceClient.ContainerClient
- """
- # Validate the container name
- container_name = validate_container_name(container_name=container_name)
- # Retrieve the connection string from the system keyring
- connect_str = extract_connection_string(passphrase=passphrase,
- account_name=account_name)
- # Extract the account key from the connection string
- account_key = extract_account_key(connect_str=connect_str)
- # Create the blob service client
- blob_service_client = create_blob_service_client(connect_str=connect_str)
- # Create the container client from the blob service client
- container_client = create_container_client(blob_service_client=blob_service_client,
- container_name=container_name,
- create=create)
- return container_name, connect_str, account_key, blob_service_client, container_client
- def create_sas_url(account_name, container_name, blob_name, sas_token):
- """
- Create the SAS URL from the required components
- :param account_name: type str: Name of Azure storage account
- :param container_name: type str: Name of the container of interest
- :param blob_name: type str: Name and path of the file of interest
- :param sas_token: type azure.storage.blob.generate_blob_sas
- :return: sas_url: String of the SAS URL
- """
- # Generate the SAS URL using the account name, the domain, the container name, the blob name, and the
- # SAS token in the following format:
- # 'https://' + account_name + '.blob.core.windows.net/' + container_name + '/' + blob_name + '?' + blob
- sas_url = f'https://{account_name}.blob.core.windows.net/{container_name}/{blob_name}?{sas_token}'
- return sas_url
- def write_sas(output_file, sas_urls):
- """
- Write the SAS URLs to the output file
- :param output_file: type str: Name and path of the file in which the SAS URLs are to be written
- :param sas_urls: type dict: Dictionary of file name: SAS URL
- """
- # Create the output file
- with open(output_file, 'w') as output:
- for file_name, sas_url in sas_urls.items():
- # Write the SAS URL to the output file
- output.write(f'{sas_url}\n')
- # Print the file name and SAS URL to the terminal
- logging.info(f'{file_name}\t{sas_url}')
- def set_blob_retention_policy(blob_service_client, days=8):
- """
- Set the retention policy for a blob
- :param blob_service_client: type: azure.storage.blob.BlobServiceClient
- :param days: type int: Number of days to retain deleted blobs. Default is 8
- :return: blob_service_client: Client with the retention policy implemented
- """
- # Create a retention policy to retain deleted blobs
- delete_retention_policy = RetentionPolicy(enabled=True, days=days)
- # Set the retention policy on the service
- blob_service_client.set_service_properties(delete_retention_policy=delete_retention_policy)
- return blob_service_client
- def move_prep(passphrase, account_name, container_name, target_container):
- """
- Prepare all the necessary clients for moving container/files/folders in Azure storage
- :param passphrase: type str: Simple passphrase to use to store the connection string in the system keyring
- :param account_name: type str: Name of Azure storage account
- :param container_name: type str: Name of the container of interest
- :param target_container: type str: Name of the new container into which the container/file/folder is to be copied
- :return: blob_service_client: type azure.storage.blob.BlobServiceClient
- :return: source_container_client: type azure.storage.blob.BlobServiceClient.ContainerClient for source container
- :return: target_container_client: type azure.storage.blob.BlobServiceClient.ContainerClient for target container
- """
- # Validate the container names
- container_name = validate_container_name(container_name=container_name)
- target_container = validate_container_name(container_name=target_container,
- object_type='target container')
- # Extract the connection string from the system keyring
- connect_str = extract_connection_string(passphrase=passphrase,
- account_name=account_name)
- blob_service_client = create_blob_service_client(connect_str=connect_str)
- source_container_client = create_container_client(blob_service_client=blob_service_client,
- container_name=container_name)
- # Hide the INFO-level messages sent to the logger from Azure by increasing the logging level to WARNING
- logging.getLogger().setLevel(logging.WARNING)
- target_container_client = create_container(blob_service_client=blob_service_client,
- container_name=target_container)
- return container_name, target_container, blob_service_client, source_container_client, target_container_client
- def copy_blob(blob_file, blob_service_client, container_name, target_container, path, storage_tier,
- object_name=None, category=None, common_path=None):
- """
- Copy a blob from one container to another
- :param blob_file: type iterable from azure.storage.blob.BlobServiceClient.ContainerClient.list_blobs
- :param container_name: type str: Name of the container in which the file is located
- :param blob_service_client: type: azure.storage.blob.BlobServiceClient
- :param target_container: type str: Name of the new container into which the file is to be copied
- :param path: type str: Path of folders in which the files are to be placed
- :param storage_tier: type str: Storage tier to use for the copied file/folder
- :param object_name: type str: Name and path of file/folder to download from Azure storage
- :param category: type str: Category of object to be copied. Limited to file or folder
- :param common_path: type str: Calculated common path between the specified file/folder and the blob_file.name
- """
- # Create the blob client
- blob_client = create_blob_client(blob_service_client=blob_service_client,
- container_name=container_name,
- blob_file=blob_file)
- # Extract the folder structure of the blob e.g. 220202-m05722/InterOp
- folder_structure = list(os.path.split(os.path.dirname(blob_file.name)))
- # Add the nested folder to the path as requested
- target_path = os.path.join(path, os.path.join(*folder_structure))
- # Set the name of file by removing any path information
- file_name = os.path.basename(blob_file.name)
- # Finally, set the name and the path of the output file
- if category is None:
- target_file = os.path.join(path, file_name)
- # If a folder is being moved, join the path, the common path between the blob file and the supplied folder name
- # with the file name
- else:
- if object_name is not None:
- target_file = os.path.join(path, common_path, os.path.basename(blob_file.name))
- # If a container is being moved, join the target path and the name of the directory of the blob_file to the
- # file name
- else:
- # Create a pathlib.Path object from the blob file
- file_path = pathlib.Path(blob_file.name)
- # Determine the parental path of the file. If the file is in the root, it will be a dot. This won't work
- # with the joining logic, so change it to ''
- nested_path = file_path.parent if file_path.parent == '.' else ''
- # Join the target path, nested path, and file name
- target_file = os.path.join(target_path, nested_path, file_name)
- # Create a blob client for the target blob
- target_blob_client = blob_service_client.get_blob_client(target_container, target_file)
- # Copy the source file to the target file - allow up to 1000 seconds total
- target_blob_client.start_copy_from_url(blob_client.url)
- # Set the storage tier
- target_blob_client.set_standard_blob_tier(standard_blob_tier=storage_tier)
- # Ensure that the copy is complete before proceeding
- for i in range(100):
- # Extract the properties of the target blob client
- target_blob_properties = target_blob_client.get_blob_properties()
- # Break when the status is set to 'success'. The copy is successful
- if target_blob_properties.copy.status == 'success':
- # Copy finished
- break
- # Sleep for 10 seconds
- time.sleep(10)
- def delete_container(blob_service_client, container_name, account_name):
- """
- Delete a container in Azure storage
- :param blob_service_client: type: azure.storage.blob.BlobServiceClient
- :param container_name: type str: Name of the container of interest
- :param account_name: type str: Name of the Azure storage account
- """
- # Delete container if it exists
- try:
- blob_service_client.delete_container(container_name)
- except azure.core.exceptions.ResourceNotFoundError:
- logging.error(f'Could not locate {container_name} in {account_name}. Perhaps it has already been deleted?')
- raise SystemExit
- def extract_common_path(object_name, blob_file):
- """
- Extract the common path (if any) between a file in Azure storage, and a user-supplied folder name
- :param object_name: type str: Name and path of file/folder to download from Azure storage
- :param blob_file: type iterable from azure.storage.blob.BlobServiceClient.ContainerClient.list_blobs
- :return: common_path: The calculated common path between the folder and the file in blob storage (can be None)
- """
- # Create the pathlib.Path objects for both the folder and the blob file
- object_path = pathlib.Path(os.path.normpath(object_name))
- blob_path = pathlib.Path(blob_file.name).parent
- # If there is a common path between the folder and the blob file path, then there is a match
- try:
- common_path = blob_path.relative_to(object_path)
- # Change the dot returned by an exact match to the directory with ''
- common_path = common_path if str(common_path) != '.' else ''
- except ValueError:
- common_path = None
- return common_path
- def delete_file(container_client, object_name, blob_service_client, container_name):
- """
- Delete a file from Azure storage
- :param container_client: type azure.storage.blob.BlobServiceClient.ContainerClient
- :param object_name: type str: Name and path of file/folder to download from Azure storage
- :param blob_service_client: type: azure.storage.blob.BlobServiceClient
- :param container_name: type str: Name of the container of interest
- """
- # Create a generator containing all the blobs in the container
- generator = container_client.list_blobs()
- # Create a boolean to determine if the blob has been located
- present = False
- for blob_file in generator:
- # Filter for the blob name
- if os.path.join(blob_file.name) == object_name:
- # Update the blob presence variable
- present = True
- # Create the blob client
- blob_client = create_blob_client(blob_service_client=blob_service_client,
- container_name=container_name,
- blob_file=blob_file)
- # Soft delete the blob
- blob_client.delete_blob()
- # Send a warning to the user that the blob could not be found
- if not present:
- logging.error(f'Could not locate the desired file {object_name}')
- raise SystemExit
- def delete_folder(container_client, object_name, blob_service_client, container_name, account_name):
- """
- Delete a folder from Azure storage
- :param container_client: type azure.storage.blob.BlobServiceClient.ContainerClient
- :param object_name: type str: Name and path of file/folder to download from Azure storage
- :param blob_service_client: type: azure.storage.blob.BlobServiceClient
- :param container_name: type str: Name of the container of interest
- :param account_name: type str: Name of the Azure storage account
- """
- # Create a generator containing all the blobs in the container
- generator = container_client.list_blobs()
- # Create a boolean to determine if the blob has been located
- present = False
- for blob_file in generator:
- common_path = extract_common_path(object_name=object_name,
- blob_file=blob_file)
- # Only copy the file if there is a common path between the object path and the blob path (they match)
- if common_path is not None:
- # Update the folder presence boolean
- present = True
- # Create the blob client
- blob_client = create_blob_client(blob_service_client=blob_service_client,
- container_name=container_name,
- blob_file=blob_file)
- # Soft delete the blob
- blob_client.delete_blob()
- # Log an error that the folder could not be found
- if not present:
- logging.error(
- f'There was an error deleting folder {object_name} in container {container_name}, '
- f'in Azure storage account {account_name}. Please ensure that all arguments have been '
- f'entered correctly')
- raise SystemExit
- def arg_dict_cleanup(arg_dict):
- """
- Clean up the argument dictionary to be consistent with the format required for the AzureStorage classes
- :param arg_dict: type dict: Dictionary of argument name: value e.g. storage tier: nan
- :return: arg_dict: Cleaned argument dictionary
- """
- try:
- # Double single quotes are not automatically changed into an empty string
- arg_dict['reset_path'] = arg_dict['reset_path'] if arg_dict['reset_path'] != "''" else str()
- except KeyError:
- pass
- # For optional argument, the nan value supplied for empty values will not work with downstream code; find and
- # change them to the appropriate empty/default value
- try:
- arg_dict['reset_path'] = arg_dict['reset_path'] if str(arg_dict['reset_path']) != str(np.nan) else None
- except KeyError:
- pass
- try:
- arg_dict['storage_tier'] = arg_dict['storage_tier'] if str(arg_dict['storage_tier']) != str(np.nan) else 'Hot'
- except KeyError:
- pass
- try:
- arg_dict['output_file'] = arg_dict['output_file'] if str(arg_dict['output_file']) != str(np.nan) \
- else os.path.join(os.getcwd(), 'sas_urls.txt')
- except KeyError:
- pass
- try:
- arg_dict['output_path'] = arg_dict['output_path'] if str(arg_dict['output_path']) != str(np.nan) \
- else os.getcwd()
- except KeyError:
- pass
- try:
- arg_dict['expiry'] = arg_dict['expiry'] if str(arg_dict['expiry']) != str(np.nan) else 10
- except KeyError:
- pass
- try:
- arg_dict['retention_time'] = arg_dict['retention_time'] if str(arg_dict['retention_time']) != str(np.nan) else 8
- except KeyError:
- pass
- # Reading in numerical container names e.g. 220202 returns an integer, so typecast it to string
- arg_dict['container'] = str(arg_dict['container'])
- try:
- arg_dict['target'] = str(arg_dict['target'])
- except KeyError:
- pass
- return arg_dict
- def create_batch_dict(batch_file, headers):
- """
- Read in the supplied file of arguments with pandas. Create a dictionary of the arguments from a transposed dataframe
- :param batch_file: type str: Name and path of file containing requested operations
- :param headers: type list: Names of all the headers present in the file
- :return: Pandas dataframe.transpose().to_dict() of header: value extracted from the desired operation
- """
- # Ensure that the batch file exists
- try:
- assert os.path.isfile(batch_file)
- except AssertionError:
- logging.error(f'Could not locate the supplied batch file {batch_file}. Please ensure the you entered '
- f'the name and path correctly')
- raise SystemExit
- # Read in the batch file using pandas.read_csv. Use tabs as the separator, and provide the header names.
- # Transpose the data, and convert the dataframe to a dictionary
- batch_dict = pd.read_csv(
- batch_file,
- sep='\t',
- names=headers
- ).transpose().to_dict()
- return batch_dict
- def parse_batch_file(line):
- """
- Extract the requested command and subcommand from a line from an AzureAutomate batch file. Create a dictionary with
- the appropriate header:value for that command and subcommand combination
- :param line: type str: Individual line of text from batch file detailing requested operations. Format is:
- command;subcommand;operation-specific arguments
- :return: command: type str: Desired command to run e.g. upload, sas, move, download, tier, delete
- :return: subcommand: Subcommand for operation e.g. container, file, folder
- :return: batch_dict: Pandas dataframe.transpose().to_dict() of header: value extracted from the desired operation
- """
- # Create a dictionary of the appropriate headers for each command and subcommand combination
- header_dict = {
- 'upload': {
- 'file': ['command', 'subcommand', 'container', 'file', 'reset_path', 'storage_tier'],
- 'folder': ['command', 'subcommand', 'container', 'folder', 'reset_path', 'storage_tier']
- },
- 'sas': {
- 'container': ['command', 'subcommand', 'container', 'expiry', 'output_file'],
- 'file': ['command', 'subcommand', 'container', 'file', 'expiry', 'output_file'],
- 'folder': ['command', 'subcommand', 'container', 'folder', 'expiry', 'output_file']
- },
- 'move': {
- 'container': ['command', 'subcommand', 'container', 'target', 'reset_path', 'storage_tier'],
- 'file': ['command', 'subcommand', 'container', 'target', 'file', 'reset_path', 'storage_tier'],
- 'folder': ['command', 'subcommand', 'container', 'target', 'folder', 'reset_path', 'storage_tier']
- },
- 'download': {
- 'container': ['command', 'subcommand', 'container', 'output_path'],
- 'file': ['command', 'subcommand', 'container', 'file', 'output_path'],
- 'folder': ['command', 'subcommand', 'container', 'folder', 'output_path']
- },
- 'tier': {
- 'container': ['command', 'subcommand', 'container', 'storage_tier'],
- 'file': ['command', 'subcommand', 'container', 'file', 'storage_tier'],
- 'folder': ['command', 'subcommand', 'container', 'folder', 'storage_tier']
- },
- 'delete': {
- 'container': ['command', 'subcommand', 'container'],
- 'file': ['command', 'subcommand', 'container', 'file', 'retention_time'],
- 'folder': ['command', 'subcommand', 'container', 'folder', 'retention_time']
- }
- }
- # Extract the command and subcommand from the line. They will be the first two entries
- try:
- command = line.split('\t')[0]
- subcommand = line.split('\t')[1]
- except IndexError:
- logging.error(f'Could not extract the desired command and subcommand from your file. Please review the '
- f'following line {line}')
- raise SystemExit
- # Use the extracted command and subcommand to determine the appropriate headers
- try:
- headers = header_dict[command][subcommand]
- except KeyError:
- logging.error(f'Could not find the requested command {command} and subcommand {subcommand} in the list of '
- f'commands. Please ensure that you created your batch file correctly')
- raise SystemExit
- # Use StringIO to convert the string into a format that can be read by pandas.read_csv
- input_string = StringIO(line.rstrip())
- # Read in the line using pandas.read_csv. Use tabs as the separator, and provide the header names.
- # Transpose the data, and convert the dataframe to a dictionary
- try:
- batch_dict = pd.read_csv(
- input_string,
- sep='\t',
- names=headers
- ).transpose().to_dict()
- except pandas.errors.ParserError as e:
- logging.error(f'Pandas error parsing data: {e}')
- raise SystemExit
- # Return the command, subcommand, and parsed dictionary
- return command, subcommand, batch_dict
|