|
@@ -0,0 +1,97 @@
|
|
|
+def main():
|
|
|
+
|
|
|
+ # Azure subscription ID
|
|
|
+ subscription_id = '<Azure subscription ID>'
|
|
|
+
|
|
|
+ # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
|
|
|
+ rg_name = '<Azure resource group name>'
|
|
|
+
|
|
|
+ # The data factory name. It must be globally unique.
|
|
|
+ df_name = '<Data factory name>'
|
|
|
+
|
|
|
+ # Specify your Active Directory client ID, client secret, and tenant ID
|
|
|
+ credentials = ServicePrincipalCredentials(client_id='<AAD application ID>', secret='<AAD app authentication key>', tenant='<AAD tenant ID>')
|
|
|
+ resource_client = ResourceManagementClient(credentials, subscription_id)
|
|
|
+ adf_client = DataFactoryManagementClient(credentials, subscription_id)
|
|
|
+
|
|
|
+ rg_params = {'location':'eastus'}
|
|
|
+ df_params = {'location':'eastus'}
|
|
|
+
|
|
|
+ # create the resource group
|
|
|
+ # comment out if the resource group already exits
|
|
|
+ resource_client.resource_groups.create_or_update(rg_name, rg_params)
|
|
|
+
|
|
|
+ # Create a data factory
|
|
|
+ df_resource = Factory(location='eastus')
|
|
|
+ df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
|
|
|
+ print_item(df)
|
|
|
+ while df.provisioning_state != 'Succeeded':
|
|
|
+ df = adf_client.factories.get(rg_name, df_name)
|
|
|
+ time.sleep(1)
|
|
|
+
|
|
|
+ # Create an Azure Storage linked service
|
|
|
+ ls_name = 'storageLinkedService'
|
|
|
+
|
|
|
+ # Specify the name and key of your Azure Storage account
|
|
|
+ storage_string = SecureString('DefaultEndpointsProtocol=https;AccountName=<Azure storage account>;AccountKey=<Azure storage authentication key>')
|
|
|
+
|
|
|
+ ls_azure_storage = AzureStorageLinkedService(connection_string=storage_string)
|
|
|
+ ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
|
|
|
+ print_item(ls)
|
|
|
+
|
|
|
+ # Create an Azure blob dataset (input)
|
|
|
+ ds_name = 'ds_in'
|
|
|
+ ds_ls = LinkedServiceReference(ls_name)
|
|
|
+ blob_path= 'adftutorial/inputpy'
|
|
|
+ blob_filename = 'input.txt'
|
|
|
+ ds_azure_blob= AzureBlobDataset(ds_ls, folder_path=blob_path, file_name = blob_filename)
|
|
|
+ ds = adf_client.datasets.create_or_update(rg_name, df_name, ds_name, ds_azure_blob)
|
|
|
+ print_item(ds)
|
|
|
+
|
|
|
+ # Create an Azure blob dataset (output)
|
|
|
+ dsOut_name = 'ds_out'
|
|
|
+ output_blobpath = 'adftutorial/outputpy'
|
|
|
+ dsOut_azure_blob = AzureBlobDataset(ds_ls, folder_path=output_blobpath)
|
|
|
+ dsOut = adf_client.datasets.create_or_update(rg_name, df_name, dsOut_name, dsOut_azure_blob)
|
|
|
+ print_item(dsOut)
|
|
|
+
|
|
|
+ # Create a copy activity
|
|
|
+ act_name = 'copyBlobtoBlob'
|
|
|
+ blob_source = BlobSource()
|
|
|
+ blob_sink = BlobSink()
|
|
|
+ dsin_ref = DatasetReference(ds_name)
|
|
|
+ dsOut_ref = DatasetReference(dsOut_name)
|
|
|
+ copy_activity = CopyActivity(act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=blob_source, sink=blob_sink)
|
|
|
+
|
|
|
+ # Create a pipeline with the copy activity
|
|
|
+ p_name = 'copyPipeline'
|
|
|
+ params_for_pipeline = {}
|
|
|
+ p_obj = PipelineResource(activities=[copy_activity], parameters=params_for_pipeline)
|
|
|
+ p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
|
|
|
+ print_item(p)
|
|
|
+
|
|
|
+ # Create a pipeline run
|
|
|
+ run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name,
|
|
|
+ {
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ # Monitor the pipeilne run
|
|
|
+ time.sleep(30)
|
|
|
+ pipeline_run = adf_client.pipeline_runs.get(rg_name, df_name, run_response.run_id)
|
|
|
+ print("\n\tPipeline run status: {}".format(pipeline_run.status))
|
|
|
+ activity_runs_paged = list(adf_client.activity_runs.list_by_pipeline_run(rg_name, df_name, pipeline_run.run_id, datetime.now() - timedelta(1), datetime.now() + timedelta(1)))
|
|
|
+ print_activity_run_details(activity_runs_paged[0])
|
|
|
+
|
|
|
+ # Create a trigger
|
|
|
+ tr_name = 'mytrigger'
|
|
|
+ scheduler_recurrence = ScheduleTriggerRecurrence(frequency='Minute', interval='15',start_time=datetime.now(), end_time=datetime.now() + timedelta(1), time_zone='UTC')
|
|
|
+ pipeline_parameters = {'inputPath':'adftutorial/inputpy', 'outputPath':'adftutorial/outputpy'}
|
|
|
+ pipelines_to_run = []
|
|
|
+ pipeline_reference = PipelineReference('copyPipeline')
|
|
|
+ pipelines_to_run.append(TriggerPipelineReference(pipeline_reference, pipeline_parameters))
|
|
|
+ tr_properties = ScheduleTrigger(description='My scheduler trigger', pipelines = pipelines_to_run, recurrence=scheduler_recurrence)
|
|
|
+ adf_client.triggers.create_or_update(rg_name, df_name, tr_name, tr_properties)
|
|
|
+
|
|
|
+ # start the trigger
|
|
|
+ adf_client.triggers.start(rg_name, df_name, tr_name)
|