test_azure_fileshare_to_gcs_3.py 1.0 KB

1234567891011121314151617181920212223242526
  1. def test_execute_with_gzip(self, gcs_mock_hook, azure_fileshare_mock_hook):
  2. """Test the execute function when the run is successful."""
  3. operator = AzureFileShareToGCSOperator(
  4. task_id=TASK_ID,
  5. share_name=AZURE_FILESHARE_SHARE,
  6. directory_name=AZURE_FILESHARE_DIRECTORY_NAME,
  7. azure_fileshare_conn_id=AZURE_FILESHARE_CONN_ID,
  8. gcp_conn_id=GCS_CONN_ID,
  9. dest_gcs=GCS_PATH_PREFIX,
  10. google_impersonation_chain=IMPERSONATION_CHAIN,
  11. gzip=True,
  12. )
  13. azure_fileshare_mock_hook.return_value.list_files.return_value = MOCK_FILES
  14. operator.execute(None)
  15. gcs_mock_hook.return_value.upload.assert_has_calls(
  16. [
  17. mock.call('gcs-bucket', 'data/TEST1.csv', mock.ANY, gzip=True),
  18. mock.call('gcs-bucket', 'data/TEST3.csv', mock.ANY, gzip=True),
  19. mock.call('gcs-bucket', 'data/TEST2.csv', mock.ANY, gzip=True),
  20. ],
  21. any_order=True,
  22. )