def main(): # Azure subscription ID subscription_id = '' # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group rg_name = '' # The data factory name. It must be globally unique. df_name = '' # Specify your Active Directory client ID, client secret, and tenant ID credentials = ServicePrincipalCredentials(client_id='', secret='', tenant='') resource_client = ResourceManagementClient(credentials, subscription_id) adf_client = DataFactoryManagementClient(credentials, subscription_id) rg_params = {'location':'eastus'} df_params = {'location':'eastus'} # create the resource group # comment out if the resource group already exits resource_client.resource_groups.create_or_update(rg_name, rg_params) # Create a data factory df_resource = Factory(location='eastus') df = adf_client.factories.create_or_update(rg_name, df_name, df_resource) print_item(df) while df.provisioning_state != 'Succeeded': df = adf_client.factories.get(rg_name, df_name) time.sleep(1) # Create an Azure Storage linked service ls_name = 'storageLinkedService' # Specify the name and key of your Azure Storage account storage_string = SecureString('DefaultEndpointsProtocol=https;AccountName=;AccountKey=') ls_azure_storage = AzureStorageLinkedService(connection_string=storage_string) ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage) print_item(ls) # Create an Azure blob dataset (input) ds_name = 'ds_in' ds_ls = LinkedServiceReference(ls_name) blob_path= 'adftutorial/inputpy' blob_filename = 'input.txt' ds_azure_blob= AzureBlobDataset(ds_ls, folder_path=blob_path, file_name = blob_filename) ds = adf_client.datasets.create_or_update(rg_name, df_name, ds_name, ds_azure_blob) print_item(ds) # Create an Azure blob dataset (output) dsOut_name = 'ds_out' output_blobpath = 'adftutorial/outputpy' dsOut_azure_blob = AzureBlobDataset(ds_ls, folder_path=output_blobpath) dsOut = adf_client.datasets.create_or_update(rg_name, df_name, dsOut_name, dsOut_azure_blob) print_item(dsOut) # Create a copy activity act_name = 'copyBlobtoBlob' blob_source = BlobSource() blob_sink = BlobSink() dsin_ref = DatasetReference(ds_name) dsOut_ref = DatasetReference(dsOut_name) copy_activity = CopyActivity(act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=blob_source, sink=blob_sink) # Create a pipeline with the copy activity p_name = 'copyPipeline' params_for_pipeline = {} p_obj = PipelineResource(activities=[copy_activity], parameters=params_for_pipeline) p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj) print_item(p) # Create a pipeline run run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, { } ) # Monitor the pipeilne run time.sleep(30) pipeline_run = adf_client.pipeline_runs.get(rg_name, df_name, run_response.run_id) print("\n\tPipeline run status: {}".format(pipeline_run.status)) activity_runs_paged = list(adf_client.activity_runs.list_by_pipeline_run(rg_name, df_name, pipeline_run.run_id, datetime.now() - timedelta(1), datetime.now() + timedelta(1))) print_activity_run_details(activity_runs_paged[0]) # Create a trigger tr_name = 'mytrigger' scheduler_recurrence = ScheduleTriggerRecurrence(frequency='Minute', interval='15',start_time=datetime.now(), end_time=datetime.now() + timedelta(1), time_zone='UTC') pipeline_parameters = {'inputPath':'adftutorial/inputpy', 'outputPath':'adftutorial/outputpy'} pipelines_to_run = [] pipeline_reference = PipelineReference('copyPipeline') pipelines_to_run.append(TriggerPipelineReference(pipeline_reference, pipeline_parameters)) tr_properties = ScheduleTrigger(description='My scheduler trigger', pipelines = pipelines_to_run, recurrence=scheduler_recurrence) adf_client.triggers.create_or_update(rg_name, df_name, tr_name, tr_properties) # start the trigger adf_client.triggers.start(rg_name, df_name, tr_name)