12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- def main():
- # Azure subscription ID
- subscription_id = '<Azure subscription ID>'
- # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
- rg_name = '<Azure resource group name>'
- # The data factory name. It must be globally unique.
- df_name = '<Data factory name>'
- # Specify your Active Directory client ID, client secret, and tenant ID
- credentials = ServicePrincipalCredentials(client_id='<AAD application ID>', secret='<AAD app authentication key>', tenant='<AAD tenant ID>')
- resource_client = ResourceManagementClient(credentials, subscription_id)
- adf_client = DataFactoryManagementClient(credentials, subscription_id)
- rg_params = {'location':'eastus'}
- df_params = {'location':'eastus'}
- # create the resource group
- # comment out if the resource group already exits
- resource_client.resource_groups.create_or_update(rg_name, rg_params)
- # Create a data factory
- df_resource = Factory(location='eastus')
- df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
- print_item(df)
- while df.provisioning_state != 'Succeeded':
- df = adf_client.factories.get(rg_name, df_name)
- time.sleep(1)
- # Create an Azure Storage linked service
- ls_name = 'storageLinkedService'
- # Specify the name and key of your Azure Storage account
- storage_string = SecureString('DefaultEndpointsProtocol=https;AccountName=<Azure storage account>;AccountKey=<Azure storage authentication key>')
- ls_azure_storage = AzureStorageLinkedService(connection_string=storage_string)
- ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
- print_item(ls)
- # Create an Azure blob dataset (input)
- ds_name = 'ds_in'
- ds_ls = LinkedServiceReference(ls_name)
- blob_path= 'adftutorial/inputpy'
- blob_filename = 'input.txt'
- ds_azure_blob= AzureBlobDataset(ds_ls, folder_path=blob_path, file_name = blob_filename)
- ds = adf_client.datasets.create_or_update(rg_name, df_name, ds_name, ds_azure_blob)
- print_item(ds)
- # Create an Azure blob dataset (output)
- dsOut_name = 'ds_out'
- output_blobpath = 'adftutorial/outputpy'
- dsOut_azure_blob = AzureBlobDataset(ds_ls, folder_path=output_blobpath)
- dsOut = adf_client.datasets.create_or_update(rg_name, df_name, dsOut_name, dsOut_azure_blob)
- print_item(dsOut)
- # Create a copy activity
- act_name = 'copyBlobtoBlob'
- blob_source = BlobSource()
- blob_sink = BlobSink()
- dsin_ref = DatasetReference(ds_name)
- dsOut_ref = DatasetReference(dsOut_name)
- copy_activity = CopyActivity(act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=blob_source, sink=blob_sink)
- # Create a pipeline with the copy activity
- p_name = 'copyPipeline'
- params_for_pipeline = {}
- p_obj = PipelineResource(activities=[copy_activity], parameters=params_for_pipeline)
- p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
- print_item(p)
- # Create a pipeline run
- run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name,
- {
- }
- )
- # Monitor the pipeilne run
- time.sleep(30)
- pipeline_run = adf_client.pipeline_runs.get(rg_name, df_name, run_response.run_id)
- print("\n\tPipeline run status: {}".format(pipeline_run.status))
- activity_runs_paged = list(adf_client.activity_runs.list_by_pipeline_run(rg_name, df_name, pipeline_run.run_id, datetime.now() - timedelta(1), datetime.now() + timedelta(1)))
- print_activity_run_details(activity_runs_paged[0])
- # Create a trigger
- tr_name = 'mytrigger'
- scheduler_recurrence = ScheduleTriggerRecurrence(frequency='Minute', interval='15',start_time=datetime.now(), end_time=datetime.now() + timedelta(1), time_zone='UTC')
- pipeline_parameters = {'inputPath':'adftutorial/inputpy', 'outputPath':'adftutorial/outputpy'}
- pipelines_to_run = []
- pipeline_reference = PipelineReference('copyPipeline')
- pipelines_to_run.append(TriggerPipelineReference(pipeline_reference, pipeline_parameters))
- tr_properties = ScheduleTrigger(description='My scheduler trigger', pipelines = pipelines_to_run, recurrence=scheduler_recurrence)
- adf_client.triggers.create_or_update(rg_name, df_name, tr_name, tr_properties)
- # start the trigger
- adf_client.triggers.start(rg_name, df_name, tr_name)
|