datafactory.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. from azure.common.credentials import ServicePrincipalCredentials
  2. from azure.mgmt.resource import ResourceManagementClient
  3. from azure.mgmt.datafactory import DataFactoryManagementClient
  4. from azure.mgmt.datafactory.models import *
  5. from datetime import datetime, timedelta
  6. import time
  7. def print_item(group):
  8. """Print an Azure object instance."""
  9. print("\tName: {}".format(group.name))
  10. print("\tId: {}".format(group.id))
  11. if hasattr(group, 'location'):
  12. print("\tLocation: {}".format(group.location))
  13. if hasattr(group, 'tags'):
  14. print("\tTags: {}".format(group.tags))
  15. if hasattr(group, 'properties'):
  16. print_properties(group.properties)
  17. print("\n")
  18. def print_properties(props):
  19. """Print a ResourceGroup properties instance."""
  20. if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
  21. print("\tProperties:")
  22. print("\t\tProvisioning State: {}".format(props.provisioning_state))
  23. print("\n")
  24. def print_activity_run_details(activity_run):
  25. """Print activity run details."""
  26. print("\n\tActivity run details\n")
  27. print("\tActivity run status: {}".format(activity_run.status))
  28. if activity_run.status == 'Succeeded':
  29. print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
  30. print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
  31. print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
  32. else:
  33. print("\tErrors: {}".format(activity_run.error['message']))
  34. def main():
  35. # Azure subscription ID
  36. subscription_id = '<Azure subscription ID>'
  37. # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
  38. rg_name = '<Azure resource group name>'
  39. # The data factory name. It must be globally unique.
  40. df_name = '<Data factory name>'
  41. # Specify your Active Directory client ID, client secret, and tenant ID
  42. credentials = ServicePrincipalCredentials(client_id='<AAD application ID>', secret='<AAD app authentication key>', tenant='<AAD tenant ID>')
  43. resource_client = ResourceManagementClient(credentials, subscription_id)
  44. adf_client = DataFactoryManagementClient(credentials, subscription_id)
  45. rg_params = {'location':'eastus'}
  46. df_params = {'location':'eastus'}
  47. # create the resource group
  48. # comment out if the resource group already exits
  49. resource_client.resource_groups.create_or_update(rg_name, rg_params)
  50. # Create a data factory
  51. df_resource = Factory(location='eastus')
  52. df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
  53. print_item(df)
  54. while df.provisioning_state != 'Succeeded':
  55. df = adf_client.factories.get(rg_name, df_name)
  56. time.sleep(1)
  57. # Create an Azure Storage linked service
  58. ls_name = 'storageLinkedService'
  59. # Specify the name and key of your Azure Storage account
  60. storage_string = SecureString('DefaultEndpointsProtocol=https;AccountName=<Azure storage account>;AccountKey=<Azure storage authentication key>')
  61. ls_azure_storage = AzureStorageLinkedService(connection_string=storage_string)
  62. ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
  63. print_item(ls)
  64. # Create an Azure blob dataset (input)
  65. ds_name = 'ds_in'
  66. ds_ls = LinkedServiceReference(ls_name)
  67. blob_path= 'adftutorial/inputpy'
  68. blob_filename = 'input.txt'
  69. ds_azure_blob= AzureBlobDataset(ds_ls, folder_path=blob_path, file_name = blob_filename)
  70. ds = adf_client.datasets.create_or_update(rg_name, df_name, ds_name, ds_azure_blob)
  71. print_item(ds)
  72. # Create an Azure blob dataset (output)
  73. dsOut_name = 'ds_out'
  74. output_blobpath = 'adftutorial/outputpy'
  75. dsOut_azure_blob = AzureBlobDataset(ds_ls, folder_path=output_blobpath)
  76. dsOut = adf_client.datasets.create_or_update(rg_name, df_name, dsOut_name, dsOut_azure_blob)
  77. print_item(dsOut)
  78. # Create a copy activity
  79. act_name = 'copyBlobtoBlob'
  80. blob_source = BlobSource()
  81. blob_sink = BlobSink()
  82. dsin_ref = DatasetReference(ds_name)
  83. dsOut_ref = DatasetReference(dsOut_name)
  84. copy_activity = CopyActivity(act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=blob_source, sink=blob_sink)
  85. # Create a pipeline with the copy activity
  86. p_name = 'copyPipeline'
  87. params_for_pipeline = {}
  88. p_obj = PipelineResource(activities=[copy_activity], parameters=params_for_pipeline)
  89. p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
  90. print_item(p)
  91. # Create a pipeline run
  92. run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name,
  93. {
  94. }
  95. )
  96. # Monitor the pipeilne run
  97. time.sleep(30)
  98. pipeline_run = adf_client.pipeline_runs.get(rg_name, df_name, run_response.run_id)
  99. print("\n\tPipeline run status: {}".format(pipeline_run.status))
  100. activity_runs_paged = list(adf_client.activity_runs.list_by_pipeline_run(rg_name, df_name, pipeline_run.run_id, datetime.now() - timedelta(1), datetime.now() + timedelta(1)))
  101. print_activity_run_details(activity_runs_paged[0])
  102. # Create a trigger
  103. tr_name = 'mytrigger'
  104. scheduler_recurrence = ScheduleTriggerRecurrence(frequency='Minute', interval='15',start_time=datetime.now(), end_time=datetime.now() + timedelta(1), time_zone='UTC')
  105. pipeline_parameters = {'inputPath':'adftutorial/inputpy', 'outputPath':'adftutorial/outputpy'}
  106. pipelines_to_run = []
  107. pipeline_reference = PipelineReference('copyPipeline')
  108. pipelines_to_run.append(TriggerPipelineReference(pipeline_reference, pipeline_parameters))
  109. tr_properties = ScheduleTrigger(description='My scheduler trigger', pipelines = pipelines_to_run, recurrence=scheduler_recurrence)
  110. adf_client.triggers.create_or_update(rg_name, df_name, tr_name, tr_properties)
  111. # start the trigger
  112. adf_client.triggers.start(rg_name, df_name, tr_name)
  113. # Start the main method
  114. main()