python-quick-start.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415
  1. # python quickstart client Code Sample
  2. #
  3. # Copyright (c) Microsoft Corporation
  4. #
  5. # All rights reserved.
  6. #
  7. # MIT License
  8. #
  9. # Permission is hereby granted, free of charge, to any person obtaining a
  10. # copy of this software and associated documentation files (the "Software"),
  11. # to deal in the Software without restriction, including without limitation
  12. # the rights to use, copy, modify, merge, publish, distribute, sublicense,
  13. # and/or sell copies of the Software, and to permit persons to whom the
  14. # Software is furnished to do so, subject to the following conditions:
  15. #
  16. # The above copyright notice and this permission notice shall be included in
  17. # all copies or substantial portions of the Software.
  18. #
  19. # THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  20. # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  21. # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  22. # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  23. # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  24. # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
  25. # DEALINGS IN THE SOFTWARE.
  26. """
  27. Create a pool of nodes to output text files from azure blob storage.
  28. """
  29. import datetime
  30. import io
  31. import os
  32. import sys
  33. import time
  34. from azure.storage.blob import (
  35. BlobServiceClient,
  36. BlobSasPermissions,
  37. generate_blob_sas
  38. )
  39. from azure.batch import BatchServiceClient
  40. from azure.batch.batch_auth import SharedKeyCredentials
  41. import azure.batch.models as batchmodels
  42. from azure.core.exceptions import ResourceExistsError
  43. import config
  44. DEFAULT_ENCODING = "utf-8"
  45. # Update the Batch and Storage account credential strings in config.py with values
  46. # unique to your accounts. These are used when constructing connection strings
  47. # for the Batch and Storage client objects.
  48. def query_yes_no(question: str, default: str = "yes") -> str:
  49. """
  50. Prompts the user for yes/no input, displaying the specified question text.
  51. :param str question: The text of the prompt for input.
  52. :param str default: The default if the user hits <ENTER>. Acceptable values
  53. are 'yes', 'no', and None.
  54. :return: 'yes' or 'no'
  55. """
  56. valid = {'y': 'yes', 'n': 'no'}
  57. if default is None:
  58. prompt = ' [y/n] '
  59. elif default == 'yes':
  60. prompt = ' [Y/n] '
  61. elif default == 'no':
  62. prompt = ' [y/N] '
  63. else:
  64. raise ValueError(f"Invalid default answer: '{default}'")
  65. choice = default
  66. while 1:
  67. user_input = input(question + prompt).lower()
  68. if not user_input:
  69. break
  70. try:
  71. choice = valid[user_input[0]]
  72. break
  73. except (KeyError, IndexError):
  74. print("Please respond with 'yes' or 'no' (or 'y' or 'n').\n")
  75. return choice
  76. def print_batch_exception(batch_exception: batchmodels.BatchErrorException):
  77. """
  78. Prints the contents of the specified Batch exception.
  79. :param batch_exception:
  80. """
  81. print('-------------------------------------------')
  82. print('Exception encountered:')
  83. if batch_exception.error and \
  84. batch_exception.error.message and \
  85. batch_exception.error.message.value:
  86. print(batch_exception.error.message.value)
  87. if batch_exception.error.values:
  88. print()
  89. for mesg in batch_exception.error.values:
  90. print(f'{mesg.key}:\t{mesg.value}')
  91. print('-------------------------------------------')
  92. def upload_file_to_container(blob_storage_service_client: BlobServiceClient,
  93. container_name: str, file_path: str) -> batchmodels.ResourceFile:
  94. """
  95. Uploads a local file to an Azure Blob storage container.
  96. :param blob_storage_service_client: A blob service client.
  97. :param str container_name: The name of the Azure Blob storage container.
  98. :param str file_path: The local path to the file.
  99. :return: A ResourceFile initialized with a SAS URL appropriate for Batch
  100. tasks.
  101. """
  102. blob_name = os.path.basename(file_path)
  103. blob_client = blob_storage_service_client.get_blob_client(container_name, blob_name)
  104. print(f'Uploading file {file_path} to container [{container_name}]...')
  105. with open(file_path, "rb") as data:
  106. blob_client.upload_blob(data, overwrite=True)
  107. sas_token = generate_blob_sas(
  108. config.STORAGE_ACCOUNT_NAME,
  109. container_name,
  110. blob_name,
  111. account_key=config.STORAGE_ACCOUNT_KEY,
  112. permission=BlobSasPermissions(read=True),
  113. expiry=datetime.datetime.utcnow() + datetime.timedelta(hours=2)
  114. )
  115. sas_url = generate_sas_url(
  116. config.STORAGE_ACCOUNT_NAME,
  117. config.STORAGE_ACCOUNT_DOMAIN,
  118. container_name,
  119. blob_name,
  120. sas_token
  121. )
  122. return batchmodels.ResourceFile(
  123. http_url=sas_url,
  124. file_path=blob_name
  125. )
  126. def generate_sas_url(
  127. account_name: str,
  128. account_domain: str,
  129. container_name: str,
  130. blob_name: str,
  131. sas_token: str
  132. ) -> str:
  133. """
  134. Generates and returns a sas url for accessing blob storage
  135. """
  136. return f"https://{account_name}.{account_domain}/{container_name}/{blob_name}?{sas_token}"
  137. def create_pool(batch_service_client: BatchServiceClient, pool_id: str):
  138. """
  139. Creates a pool of compute nodes with the specified OS settings.
  140. :param batch_service_client: A Batch service client.
  141. :param str pool_id: An ID for the new pool.
  142. :param str publisher: Marketplace image publisher
  143. :param str offer: Marketplace image offer
  144. :param str sku: Marketplace image sku
  145. """
  146. print(f'Creating pool [{pool_id}]...')
  147. # Create a new pool of Linux compute nodes using an Azure Virtual Machines
  148. # Marketplace image. For more information about creating pools of Linux
  149. # nodes, see:
  150. # https://azure.microsoft.com/documentation/articles/batch-linux-nodes/
  151. new_pool = batchmodels.PoolAddParameter(
  152. id=pool_id,
  153. virtual_machine_configuration=batchmodels.VirtualMachineConfiguration(
  154. image_reference=batchmodels.ImageReference(
  155. publisher="canonical",
  156. offer="0001-com-ubuntu-server-focal",
  157. sku="20_04-lts",
  158. version="latest"
  159. ),
  160. node_agent_sku_id="batch.node.ubuntu 20.04"),
  161. vm_size=config.POOL_VM_SIZE,
  162. target_dedicated_nodes=config.POOL_NODE_COUNT
  163. )
  164. batch_service_client.pool.add(new_pool)
  165. def create_job(batch_service_client: BatchServiceClient, job_id: str, pool_id: str):
  166. """
  167. Creates a job with the specified ID, associated with the specified pool.
  168. :param batch_service_client: A Batch service client.
  169. :param str job_id: The ID for the job.
  170. :param str pool_id: The ID for the pool.
  171. """
  172. print(f'Creating job [{job_id}]...')
  173. job = batchmodels.JobAddParameter(
  174. id=job_id,
  175. pool_info=batchmodels.PoolInformation(pool_id=pool_id))
  176. batch_service_client.job.add(job)
  177. def add_tasks(batch_service_client: BatchServiceClient, job_id: str, resource_input_files: list):
  178. """
  179. Adds a task for each input file in the collection to the specified job.
  180. :param batch_service_client: A Batch service client.
  181. :param str job_id: The ID of the job to which to add the tasks.
  182. :param list resource_input_files: A collection of input files. One task will be
  183. created for each input file.
  184. """
  185. print(f'Adding {resource_input_files} tasks to job [{job_id}]...')
  186. tasks = []
  187. for idx, input_file in enumerate(resource_input_files):
  188. command = f"/bin/bash -c \"cat {input_file.file_path}\""
  189. tasks.append(batchmodels.TaskAddParameter(
  190. id=f'Task{idx}',
  191. command_line=command,
  192. resource_files=[input_file]
  193. )
  194. )
  195. batch_service_client.task.add_collection(job_id, tasks)
  196. def wait_for_tasks_to_complete(batch_service_client: BatchServiceClient, job_id: str,
  197. timeout: datetime.timedelta):
  198. """
  199. Returns when all tasks in the specified job reach the Completed state.
  200. :param batch_service_client: A Batch service client.
  201. :param job_id: The id of the job whose tasks should be to monitored.
  202. :param timeout: The duration to wait for task completion. If all
  203. tasks in the specified job do not reach Completed state within this time
  204. period, an exception will be raised.
  205. """
  206. timeout_expiration = datetime.datetime.now() + timeout
  207. print(f"Monitoring all tasks for 'Completed' state, timeout in {timeout}...", end='')
  208. while datetime.datetime.now() < timeout_expiration:
  209. print('.', end='')
  210. sys.stdout.flush()
  211. tasks = batch_service_client.task.list(job_id)
  212. incomplete_tasks = [task for task in tasks if
  213. task.state != batchmodels.TaskState.completed]
  214. if not incomplete_tasks:
  215. print()
  216. return True
  217. time.sleep(1)
  218. print()
  219. raise RuntimeError("ERROR: Tasks did not reach 'Completed' state within "
  220. "timeout period of " + str(timeout))
  221. def print_task_output(batch_service_client: BatchServiceClient, job_id: str,
  222. text_encoding: str=None):
  223. """
  224. Prints the stdout.txt file for each task in the job.
  225. :param batch_client: The batch client to use.
  226. :param str job_id: The id of the job with task output files to print.
  227. """
  228. print('Printing task output...')
  229. tasks = batch_service_client.task.list(job_id)
  230. for task in tasks:
  231. node_id = batch_service_client.task.get(
  232. job_id, task.id).node_info.node_id
  233. print(f"Task: {task.id}")
  234. print(f"Node: {node_id}")
  235. stream = batch_service_client.file.get_from_task(
  236. job_id, task.id, config.STANDARD_OUT_FILE_NAME)
  237. file_text = _read_stream_as_string(
  238. stream,
  239. text_encoding)
  240. if text_encoding is None:
  241. text_encoding = DEFAULT_ENCODING
  242. sys.stdout = io.TextIOWrapper(sys.stdout.detach(), encoding = text_encoding)
  243. sys.stderr = io.TextIOWrapper(sys.stderr.detach(), encoding = text_encoding)
  244. print("Standard output:")
  245. print(file_text)
  246. def _read_stream_as_string(stream, encoding) -> str:
  247. """
  248. Read stream as string
  249. :param stream: input stream generator
  250. :param str encoding: The encoding of the file. The default is utf-8.
  251. :return: The file content.
  252. """
  253. output = io.BytesIO()
  254. try:
  255. for data in stream:
  256. output.write(data)
  257. if encoding is None:
  258. encoding = DEFAULT_ENCODING
  259. return output.getvalue().decode(encoding)
  260. finally:
  261. output.close()
  262. if __name__ == '__main__':
  263. start_time = datetime.datetime.now().replace(microsecond=0)
  264. print(f'Sample start: {start_time}')
  265. print()
  266. # Create the blob client, for use in obtaining references to
  267. # blob storage containers and uploading files to containers.
  268. blob_service_client = BlobServiceClient(
  269. account_url=f"https://{config.STORAGE_ACCOUNT_NAME}.{config.STORAGE_ACCOUNT_DOMAIN}/",
  270. credential=config.STORAGE_ACCOUNT_KEY
  271. )
  272. # Use the blob client to create the containers in Azure Storage if they
  273. # don't yet exist.
  274. input_container_name = 'input' # pylint: disable=invalid-name
  275. try:
  276. blob_service_client.create_container(input_container_name)
  277. except ResourceExistsError:
  278. pass
  279. # The collection of data files that are to be processed by the tasks.
  280. input_file_paths = [os.path.join(sys.path[0], 'taskdata0.txt'),
  281. os.path.join(sys.path[0], 'taskdata1.txt'),
  282. os.path.join(sys.path[0], 'taskdata2.txt')]
  283. # Upload the data files.
  284. input_files = [
  285. upload_file_to_container(blob_service_client, input_container_name, file_path)
  286. for file_path in input_file_paths]
  287. # Create a Batch service client. We'll now be interacting with the Batch
  288. # service in addition to Storage
  289. credentials = SharedKeyCredentials(config.BATCH_ACCOUNT_NAME,
  290. config.BATCH_ACCOUNT_KEY)
  291. batch_client = BatchServiceClient(
  292. credentials,
  293. batch_url=config.BATCH_ACCOUNT_URL)
  294. try:
  295. # Create the pool that will contain the compute nodes that will execute the
  296. # tasks.
  297. create_pool(batch_client, config.POOL_ID)
  298. # Create the job that will run the tasks.
  299. create_job(batch_client, config.JOB_ID, config.POOL_ID)
  300. # Add the tasks to the job.
  301. add_tasks(batch_client, config.JOB_ID, input_files)
  302. # Pause execution until tasks reach Completed state.
  303. wait_for_tasks_to_complete(batch_client,
  304. config.JOB_ID,
  305. datetime.timedelta(minutes=30))
  306. print(" Success! All tasks reached the 'Completed' state within the "
  307. "specified timeout period.")
  308. # Print the stdout.txt and stderr.txt files for each task to the console
  309. print_task_output(batch_client, config.JOB_ID)
  310. # Print out some timing info
  311. end_time = datetime.datetime.now().replace(microsecond=0)
  312. print()
  313. print(f'Sample end: {end_time}')
  314. elapsed_time = end_time - start_time
  315. print(f'Elapsed time: {elapsed_time}')
  316. print()
  317. input('Press ENTER to exit...')
  318. except batchmodels.BatchErrorException as err:
  319. print_batch_exception(err)
  320. raise
  321. finally:
  322. # Clean up storage resources
  323. print(f'Deleting container [{input_container_name}]...')
  324. blob_service_client.delete_container(input_container_name)
  325. # Clean up Batch resources (if the user so chooses).
  326. if query_yes_no('Delete job?') == 'yes':
  327. batch_client.job.delete(config.JOB_ID)
  328. if query_yes_no('Delete pool?') == 'yes':
  329. batch_client.pool.delete(config.POOL_ID)