datafactory_4.py 4.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. def main():
  2. # Azure subscription ID
  3. subscription_id = '<Azure subscription ID>'
  4. # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
  5. rg_name = '<Azure resource group name>'
  6. # The data factory name. It must be globally unique.
  7. df_name = '<Data factory name>'
  8. # Specify your Active Directory client ID, client secret, and tenant ID
  9. credentials = ServicePrincipalCredentials(client_id='<AAD application ID>', secret='<AAD app authentication key>', tenant='<AAD tenant ID>')
  10. resource_client = ResourceManagementClient(credentials, subscription_id)
  11. adf_client = DataFactoryManagementClient(credentials, subscription_id)
  12. rg_params = {'location':'eastus'}
  13. df_params = {'location':'eastus'}
  14. # create the resource group
  15. # comment out if the resource group already exits
  16. resource_client.resource_groups.create_or_update(rg_name, rg_params)
  17. # Create a data factory
  18. df_resource = Factory(location='eastus')
  19. df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
  20. print_item(df)
  21. while df.provisioning_state != 'Succeeded':
  22. df = adf_client.factories.get(rg_name, df_name)
  23. time.sleep(1)
  24. # Create an Azure Storage linked service
  25. ls_name = 'storageLinkedService'
  26. # Specify the name and key of your Azure Storage account
  27. storage_string = SecureString('DefaultEndpointsProtocol=https;AccountName=<Azure storage account>;AccountKey=<Azure storage authentication key>')
  28. ls_azure_storage = AzureStorageLinkedService(connection_string=storage_string)
  29. ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
  30. print_item(ls)
  31. # Create an Azure blob dataset (input)
  32. ds_name = 'ds_in'
  33. ds_ls = LinkedServiceReference(ls_name)
  34. blob_path= 'adftutorial/inputpy'
  35. blob_filename = 'input.txt'
  36. ds_azure_blob= AzureBlobDataset(ds_ls, folder_path=blob_path, file_name = blob_filename)
  37. ds = adf_client.datasets.create_or_update(rg_name, df_name, ds_name, ds_azure_blob)
  38. print_item(ds)
  39. # Create an Azure blob dataset (output)
  40. dsOut_name = 'ds_out'
  41. output_blobpath = 'adftutorial/outputpy'
  42. dsOut_azure_blob = AzureBlobDataset(ds_ls, folder_path=output_blobpath)
  43. dsOut = adf_client.datasets.create_or_update(rg_name, df_name, dsOut_name, dsOut_azure_blob)
  44. print_item(dsOut)
  45. # Create a copy activity
  46. act_name = 'copyBlobtoBlob'
  47. blob_source = BlobSource()
  48. blob_sink = BlobSink()
  49. dsin_ref = DatasetReference(ds_name)
  50. dsOut_ref = DatasetReference(dsOut_name)
  51. copy_activity = CopyActivity(act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=blob_source, sink=blob_sink)
  52. # Create a pipeline with the copy activity
  53. p_name = 'copyPipeline'
  54. params_for_pipeline = {}
  55. p_obj = PipelineResource(activities=[copy_activity], parameters=params_for_pipeline)
  56. p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
  57. print_item(p)
  58. # Create a pipeline run
  59. run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name,
  60. {
  61. }
  62. )
  63. # Monitor the pipeilne run
  64. time.sleep(30)
  65. pipeline_run = adf_client.pipeline_runs.get(rg_name, df_name, run_response.run_id)
  66. print("\n\tPipeline run status: {}".format(pipeline_run.status))
  67. activity_runs_paged = list(adf_client.activity_runs.list_by_pipeline_run(rg_name, df_name, pipeline_run.run_id, datetime.now() - timedelta(1), datetime.now() + timedelta(1)))
  68. print_activity_run_details(activity_runs_paged[0])
  69. # Create a trigger
  70. tr_name = 'mytrigger'
  71. scheduler_recurrence = ScheduleTriggerRecurrence(frequency='Minute', interval='15',start_time=datetime.now(), end_time=datetime.now() + timedelta(1), time_zone='UTC')
  72. pipeline_parameters = {'inputPath':'adftutorial/inputpy', 'outputPath':'adftutorial/outputpy'}
  73. pipelines_to_run = []
  74. pipeline_reference = PipelineReference('copyPipeline')
  75. pipelines_to_run.append(TriggerPipelineReference(pipeline_reference, pipeline_parameters))
  76. tr_properties = ScheduleTrigger(description='My scheduler trigger', pipelines = pipelines_to_run, recurrence=scheduler_recurrence)
  77. adf_client.triggers.create_or_update(rg_name, df_name, tr_name, tr_properties)
  78. # start the trigger
  79. adf_client.triggers.start(rg_name, df_name, tr_name)