123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415 |
- # python quickstart client Code Sample
- #
- # Copyright (c) Microsoft Corporation
- #
- # All rights reserved.
- #
- # MIT License
- #
- # Permission is hereby granted, free of charge, to any person obtaining a
- # copy of this software and associated documentation files (the "Software"),
- # to deal in the Software without restriction, including without limitation
- # the rights to use, copy, modify, merge, publish, distribute, sublicense,
- # and/or sell copies of the Software, and to permit persons to whom the
- # Software is furnished to do so, subject to the following conditions:
- #
- # The above copyright notice and this permission notice shall be included in
- # all copies or substantial portions of the Software.
- #
- # THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
- # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
- # DEALINGS IN THE SOFTWARE.
- """
- Create a pool of nodes to output text files from azure blob storage.
- """
- import datetime
- import io
- import os
- import sys
- import time
- from azure.storage.blob import (
- BlobServiceClient,
- BlobSasPermissions,
- generate_blob_sas
- )
- from azure.batch import BatchServiceClient
- from azure.batch.batch_auth import SharedKeyCredentials
- import azure.batch.models as batchmodels
- from azure.core.exceptions import ResourceExistsError
- import config
- DEFAULT_ENCODING = "utf-8"
- # Update the Batch and Storage account credential strings in config.py with values
- # unique to your accounts. These are used when constructing connection strings
- # for the Batch and Storage client objects.
- def query_yes_no(question: str, default: str = "yes") -> str:
- """
- Prompts the user for yes/no input, displaying the specified question text.
- :param str question: The text of the prompt for input.
- :param str default: The default if the user hits <ENTER>. Acceptable values
- are 'yes', 'no', and None.
- :return: 'yes' or 'no'
- """
- valid = {'y': 'yes', 'n': 'no'}
- if default is None:
- prompt = ' [y/n] '
- elif default == 'yes':
- prompt = ' [Y/n] '
- elif default == 'no':
- prompt = ' [y/N] '
- else:
- raise ValueError(f"Invalid default answer: '{default}'")
- choice = default
- while 1:
- user_input = input(question + prompt).lower()
- if not user_input:
- break
- try:
- choice = valid[user_input[0]]
- break
- except (KeyError, IndexError):
- print("Please respond with 'yes' or 'no' (or 'y' or 'n').\n")
- return choice
- def print_batch_exception(batch_exception: batchmodels.BatchErrorException):
- """
- Prints the contents of the specified Batch exception.
- :param batch_exception:
- """
- print('-------------------------------------------')
- print('Exception encountered:')
- if batch_exception.error and \
- batch_exception.error.message and \
- batch_exception.error.message.value:
- print(batch_exception.error.message.value)
- if batch_exception.error.values:
- print()
- for mesg in batch_exception.error.values:
- print(f'{mesg.key}:\t{mesg.value}')
- print('-------------------------------------------')
- def upload_file_to_container(blob_storage_service_client: BlobServiceClient,
- container_name: str, file_path: str) -> batchmodels.ResourceFile:
- """
- Uploads a local file to an Azure Blob storage container.
- :param blob_storage_service_client: A blob service client.
- :param str container_name: The name of the Azure Blob storage container.
- :param str file_path: The local path to the file.
- :return: A ResourceFile initialized with a SAS URL appropriate for Batch
- tasks.
- """
- blob_name = os.path.basename(file_path)
- blob_client = blob_storage_service_client.get_blob_client(container_name, blob_name)
- print(f'Uploading file {file_path} to container [{container_name}]...')
- with open(file_path, "rb") as data:
- blob_client.upload_blob(data, overwrite=True)
- sas_token = generate_blob_sas(
- config.STORAGE_ACCOUNT_NAME,
- container_name,
- blob_name,
- account_key=config.STORAGE_ACCOUNT_KEY,
- permission=BlobSasPermissions(read=True),
- expiry=datetime.datetime.utcnow() + datetime.timedelta(hours=2)
- )
- sas_url = generate_sas_url(
- config.STORAGE_ACCOUNT_NAME,
- config.STORAGE_ACCOUNT_DOMAIN,
- container_name,
- blob_name,
- sas_token
- )
- return batchmodels.ResourceFile(
- http_url=sas_url,
- file_path=blob_name
- )
- def generate_sas_url(
- account_name: str,
- account_domain: str,
- container_name: str,
- blob_name: str,
- sas_token: str
- ) -> str:
- """
- Generates and returns a sas url for accessing blob storage
- """
- return f"https://{account_name}.{account_domain}/{container_name}/{blob_name}?{sas_token}"
- def create_pool(batch_service_client: BatchServiceClient, pool_id: str):
- """
- Creates a pool of compute nodes with the specified OS settings.
- :param batch_service_client: A Batch service client.
- :param str pool_id: An ID for the new pool.
- :param str publisher: Marketplace image publisher
- :param str offer: Marketplace image offer
- :param str sku: Marketplace image sku
- """
- print(f'Creating pool [{pool_id}]...')
- # Create a new pool of Linux compute nodes using an Azure Virtual Machines
- # Marketplace image. For more information about creating pools of Linux
- # nodes, see:
- # https://azure.microsoft.com/documentation/articles/batch-linux-nodes/
- new_pool = batchmodels.PoolAddParameter(
- id=pool_id,
- virtual_machine_configuration=batchmodels.VirtualMachineConfiguration(
- image_reference=batchmodels.ImageReference(
- publisher="canonical",
- offer="0001-com-ubuntu-server-focal",
- sku="20_04-lts",
- version="latest"
- ),
- node_agent_sku_id="batch.node.ubuntu 20.04"),
- vm_size=config.POOL_VM_SIZE,
- target_dedicated_nodes=config.POOL_NODE_COUNT
- )
- batch_service_client.pool.add(new_pool)
- def create_job(batch_service_client: BatchServiceClient, job_id: str, pool_id: str):
- """
- Creates a job with the specified ID, associated with the specified pool.
- :param batch_service_client: A Batch service client.
- :param str job_id: The ID for the job.
- :param str pool_id: The ID for the pool.
- """
- print(f'Creating job [{job_id}]...')
- job = batchmodels.JobAddParameter(
- id=job_id,
- pool_info=batchmodels.PoolInformation(pool_id=pool_id))
- batch_service_client.job.add(job)
- def add_tasks(batch_service_client: BatchServiceClient, job_id: str, resource_input_files: list):
- """
- Adds a task for each input file in the collection to the specified job.
- :param batch_service_client: A Batch service client.
- :param str job_id: The ID of the job to which to add the tasks.
- :param list resource_input_files: A collection of input files. One task will be
- created for each input file.
- """
- print(f'Adding {resource_input_files} tasks to job [{job_id}]...')
- tasks = []
- for idx, input_file in enumerate(resource_input_files):
- command = f"/bin/bash -c \"cat {input_file.file_path}\""
- tasks.append(batchmodels.TaskAddParameter(
- id=f'Task{idx}',
- command_line=command,
- resource_files=[input_file]
- )
- )
- batch_service_client.task.add_collection(job_id, tasks)
- def wait_for_tasks_to_complete(batch_service_client: BatchServiceClient, job_id: str,
- timeout: datetime.timedelta):
- """
- Returns when all tasks in the specified job reach the Completed state.
- :param batch_service_client: A Batch service client.
- :param job_id: The id of the job whose tasks should be to monitored.
- :param timeout: The duration to wait for task completion. If all
- tasks in the specified job do not reach Completed state within this time
- period, an exception will be raised.
- """
- timeout_expiration = datetime.datetime.now() + timeout
- print(f"Monitoring all tasks for 'Completed' state, timeout in {timeout}...", end='')
- while datetime.datetime.now() < timeout_expiration:
- print('.', end='')
- sys.stdout.flush()
- tasks = batch_service_client.task.list(job_id)
- incomplete_tasks = [task for task in tasks if
- task.state != batchmodels.TaskState.completed]
- if not incomplete_tasks:
- print()
- return True
- time.sleep(1)
- print()
- raise RuntimeError("ERROR: Tasks did not reach 'Completed' state within "
- "timeout period of " + str(timeout))
- def print_task_output(batch_service_client: BatchServiceClient, job_id: str,
- text_encoding: str=None):
- """
- Prints the stdout.txt file for each task in the job.
- :param batch_client: The batch client to use.
- :param str job_id: The id of the job with task output files to print.
- """
- print('Printing task output...')
- tasks = batch_service_client.task.list(job_id)
- for task in tasks:
- node_id = batch_service_client.task.get(
- job_id, task.id).node_info.node_id
- print(f"Task: {task.id}")
- print(f"Node: {node_id}")
- stream = batch_service_client.file.get_from_task(
- job_id, task.id, config.STANDARD_OUT_FILE_NAME)
- file_text = _read_stream_as_string(
- stream,
- text_encoding)
- if text_encoding is None:
- text_encoding = DEFAULT_ENCODING
- sys.stdout = io.TextIOWrapper(sys.stdout.detach(), encoding = text_encoding)
- sys.stderr = io.TextIOWrapper(sys.stderr.detach(), encoding = text_encoding)
- print("Standard output:")
- print(file_text)
- def _read_stream_as_string(stream, encoding) -> str:
- """
- Read stream as string
- :param stream: input stream generator
- :param str encoding: The encoding of the file. The default is utf-8.
- :return: The file content.
- """
- output = io.BytesIO()
- try:
- for data in stream:
- output.write(data)
- if encoding is None:
- encoding = DEFAULT_ENCODING
- return output.getvalue().decode(encoding)
- finally:
- output.close()
- if __name__ == '__main__':
- start_time = datetime.datetime.now().replace(microsecond=0)
- print(f'Sample start: {start_time}')
- print()
- # Create the blob client, for use in obtaining references to
- # blob storage containers and uploading files to containers.
- blob_service_client = BlobServiceClient(
- account_url=f"https://{config.STORAGE_ACCOUNT_NAME}.{config.STORAGE_ACCOUNT_DOMAIN}/",
- credential=config.STORAGE_ACCOUNT_KEY
- )
- # Use the blob client to create the containers in Azure Storage if they
- # don't yet exist.
- input_container_name = 'input' # pylint: disable=invalid-name
- try:
- blob_service_client.create_container(input_container_name)
- except ResourceExistsError:
- pass
- # The collection of data files that are to be processed by the tasks.
- input_file_paths = [os.path.join(sys.path[0], 'taskdata0.txt'),
- os.path.join(sys.path[0], 'taskdata1.txt'),
- os.path.join(sys.path[0], 'taskdata2.txt')]
- # Upload the data files.
- input_files = [
- upload_file_to_container(blob_service_client, input_container_name, file_path)
- for file_path in input_file_paths]
- # Create a Batch service client. We'll now be interacting with the Batch
- # service in addition to Storage
- credentials = SharedKeyCredentials(config.BATCH_ACCOUNT_NAME,
- config.BATCH_ACCOUNT_KEY)
- batch_client = BatchServiceClient(
- credentials,
- batch_url=config.BATCH_ACCOUNT_URL)
- try:
- # Create the pool that will contain the compute nodes that will execute the
- # tasks.
- create_pool(batch_client, config.POOL_ID)
- # Create the job that will run the tasks.
- create_job(batch_client, config.JOB_ID, config.POOL_ID)
- # Add the tasks to the job.
- add_tasks(batch_client, config.JOB_ID, input_files)
- # Pause execution until tasks reach Completed state.
- wait_for_tasks_to_complete(batch_client,
- config.JOB_ID,
- datetime.timedelta(minutes=30))
- print(" Success! All tasks reached the 'Completed' state within the "
- "specified timeout period.")
- # Print the stdout.txt and stderr.txt files for each task to the console
- print_task_output(batch_client, config.JOB_ID)
- # Print out some timing info
- end_time = datetime.datetime.now().replace(microsecond=0)
- print()
- print(f'Sample end: {end_time}')
- elapsed_time = end_time - start_time
- print(f'Elapsed time: {elapsed_time}')
- print()
- input('Press ENTER to exit...')
- except batchmodels.BatchErrorException as err:
- print_batch_exception(err)
- raise
- finally:
- # Clean up storage resources
- print(f'Deleting container [{input_container_name}]...')
- blob_service_client.delete_container(input_container_name)
- # Clean up Batch resources (if the user so chooses).
- if query_yes_no('Delete job?') == 'yes':
- batch_client.job.delete(config.JOB_ID)
- if query_yes_no('Delete pool?') == 'yes':
- batch_client.pool.delete(config.POOL_ID)
-
|