123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing,
- # software distributed under the License is distributed on an
- # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- # KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations
- # under the License.
- import unittest
- from unittest import mock
- from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator
- TASK_ID = 'test-azure-fileshare-to-gcs'
- AZURE_FILESHARE_SHARE = 'test-share'
- AZURE_FILESHARE_DIRECTORY_NAME = '/path/to/dir'
- GCS_PATH_PREFIX = 'gs://gcs-bucket/data/'
- MOCK_FILES = ["TEST1.csv", "TEST2.csv", "TEST3.csv"]
- AZURE_FILESHARE_CONN_ID = 'azure_fileshare_default'
- GCS_CONN_ID = 'google_cloud_default'
- IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
- class TestAzureFileShareToGCSOperator(unittest.TestCase):
- def test_init(self):
- """Test AzureFileShareToGCSOperator instance is properly initialized."""
- operator = AzureFileShareToGCSOperator(
- task_id=TASK_ID,
- share_name=AZURE_FILESHARE_SHARE,
- directory_name=AZURE_FILESHARE_DIRECTORY_NAME,
- azure_fileshare_conn_id=AZURE_FILESHARE_CONN_ID,
- gcp_conn_id=GCS_CONN_ID,
- dest_gcs=GCS_PATH_PREFIX,
- google_impersonation_chain=IMPERSONATION_CHAIN,
- )
- assert operator.task_id == TASK_ID
- assert operator.share_name == AZURE_FILESHARE_SHARE
- assert operator.directory_name == AZURE_FILESHARE_DIRECTORY_NAME
- assert operator.azure_fileshare_conn_id == AZURE_FILESHARE_CONN_ID
- assert operator.gcp_conn_id == GCS_CONN_ID
- assert operator.dest_gcs == GCS_PATH_PREFIX
- assert operator.google_impersonation_chain == IMPERSONATION_CHAIN
- @mock.patch('airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs.AzureFileShareHook')
- @mock.patch('airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs.GCSHook')
- def test_execute(self, gcs_mock_hook, azure_fileshare_mock_hook):
- """Test the execute function when the run is successful."""
- operator = AzureFileShareToGCSOperator(
- task_id=TASK_ID,
- share_name=AZURE_FILESHARE_SHARE,
- directory_name=AZURE_FILESHARE_DIRECTORY_NAME,
- azure_fileshare_conn_id=AZURE_FILESHARE_CONN_ID,
- gcp_conn_id=GCS_CONN_ID,
- dest_gcs=GCS_PATH_PREFIX,
- google_impersonation_chain=IMPERSONATION_CHAIN,
- )
- azure_fileshare_mock_hook.return_value.list_files.return_value = MOCK_FILES
- uploaded_files = operator.execute(None)
- gcs_mock_hook.return_value.upload.assert_has_calls(
- [
- mock.call('gcs-bucket', 'data/TEST1.csv', mock.ANY, gzip=False),
- mock.call('gcs-bucket', 'data/TEST3.csv', mock.ANY, gzip=False),
- mock.call('gcs-bucket', 'data/TEST2.csv', mock.ANY, gzip=False),
- ],
- any_order=True,
- )
- azure_fileshare_mock_hook.assert_called_once_with(AZURE_FILESHARE_CONN_ID)
- gcs_mock_hook.assert_called_once_with(
- gcp_conn_id=GCS_CONN_ID,
- delegate_to=None,
- impersonation_chain=IMPERSONATION_CHAIN,
- )
- assert sorted(MOCK_FILES) == sorted(uploaded_files)
- @mock.patch('airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs.AzureFileShareHook')
- @mock.patch('airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs.GCSHook')
- def test_execute_with_gzip(self, gcs_mock_hook, azure_fileshare_mock_hook):
- """Test the execute function when the run is successful."""
- operator = AzureFileShareToGCSOperator(
- task_id=TASK_ID,
- share_name=AZURE_FILESHARE_SHARE,
- directory_name=AZURE_FILESHARE_DIRECTORY_NAME,
- azure_fileshare_conn_id=AZURE_FILESHARE_CONN_ID,
- gcp_conn_id=GCS_CONN_ID,
- dest_gcs=GCS_PATH_PREFIX,
- google_impersonation_chain=IMPERSONATION_CHAIN,
- gzip=True,
- )
- azure_fileshare_mock_hook.return_value.list_files.return_value = MOCK_FILES
- operator.execute(None)
- gcs_mock_hook.return_value.upload.assert_has_calls(
- [
- mock.call('gcs-bucket', 'data/TEST1.csv', mock.ANY, gzip=True),
- mock.call('gcs-bucket', 'data/TEST3.csv', mock.ANY, gzip=True),
- mock.call('gcs-bucket', 'data/TEST2.csv', mock.ANY, gzip=True),
- ],
- any_order=True,
- )
|