test_azure_fileshare_to_gcs.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. import unittest
  18. from unittest import mock
  19. from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator
  20. TASK_ID = 'test-azure-fileshare-to-gcs'
  21. AZURE_FILESHARE_SHARE = 'test-share'
  22. AZURE_FILESHARE_DIRECTORY_NAME = '/path/to/dir'
  23. GCS_PATH_PREFIX = 'gs://gcs-bucket/data/'
  24. MOCK_FILES = ["TEST1.csv", "TEST2.csv", "TEST3.csv"]
  25. AZURE_FILESHARE_CONN_ID = 'azure_fileshare_default'
  26. GCS_CONN_ID = 'google_cloud_default'
  27. IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
  28. class TestAzureFileShareToGCSOperator(unittest.TestCase):
  29. def test_init(self):
  30. """Test AzureFileShareToGCSOperator instance is properly initialized."""
  31. operator = AzureFileShareToGCSOperator(
  32. task_id=TASK_ID,
  33. share_name=AZURE_FILESHARE_SHARE,
  34. directory_name=AZURE_FILESHARE_DIRECTORY_NAME,
  35. azure_fileshare_conn_id=AZURE_FILESHARE_CONN_ID,
  36. gcp_conn_id=GCS_CONN_ID,
  37. dest_gcs=GCS_PATH_PREFIX,
  38. google_impersonation_chain=IMPERSONATION_CHAIN,
  39. )
  40. assert operator.task_id == TASK_ID
  41. assert operator.share_name == AZURE_FILESHARE_SHARE
  42. assert operator.directory_name == AZURE_FILESHARE_DIRECTORY_NAME
  43. assert operator.azure_fileshare_conn_id == AZURE_FILESHARE_CONN_ID
  44. assert operator.gcp_conn_id == GCS_CONN_ID
  45. assert operator.dest_gcs == GCS_PATH_PREFIX
  46. assert operator.google_impersonation_chain == IMPERSONATION_CHAIN
  47. @mock.patch('airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs.AzureFileShareHook')
  48. @mock.patch('airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs.GCSHook')
  49. def test_execute(self, gcs_mock_hook, azure_fileshare_mock_hook):
  50. """Test the execute function when the run is successful."""
  51. operator = AzureFileShareToGCSOperator(
  52. task_id=TASK_ID,
  53. share_name=AZURE_FILESHARE_SHARE,
  54. directory_name=AZURE_FILESHARE_DIRECTORY_NAME,
  55. azure_fileshare_conn_id=AZURE_FILESHARE_CONN_ID,
  56. gcp_conn_id=GCS_CONN_ID,
  57. dest_gcs=GCS_PATH_PREFIX,
  58. google_impersonation_chain=IMPERSONATION_CHAIN,
  59. )
  60. azure_fileshare_mock_hook.return_value.list_files.return_value = MOCK_FILES
  61. uploaded_files = operator.execute(None)
  62. gcs_mock_hook.return_value.upload.assert_has_calls(
  63. [
  64. mock.call('gcs-bucket', 'data/TEST1.csv', mock.ANY, gzip=False),
  65. mock.call('gcs-bucket', 'data/TEST3.csv', mock.ANY, gzip=False),
  66. mock.call('gcs-bucket', 'data/TEST2.csv', mock.ANY, gzip=False),
  67. ],
  68. any_order=True,
  69. )
  70. azure_fileshare_mock_hook.assert_called_once_with(AZURE_FILESHARE_CONN_ID)
  71. gcs_mock_hook.assert_called_once_with(
  72. gcp_conn_id=GCS_CONN_ID,
  73. delegate_to=None,
  74. impersonation_chain=IMPERSONATION_CHAIN,
  75. )
  76. assert sorted(MOCK_FILES) == sorted(uploaded_files)
  77. @mock.patch('airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs.AzureFileShareHook')
  78. @mock.patch('airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs.GCSHook')
  79. def test_execute_with_gzip(self, gcs_mock_hook, azure_fileshare_mock_hook):
  80. """Test the execute function when the run is successful."""
  81. operator = AzureFileShareToGCSOperator(
  82. task_id=TASK_ID,
  83. share_name=AZURE_FILESHARE_SHARE,
  84. directory_name=AZURE_FILESHARE_DIRECTORY_NAME,
  85. azure_fileshare_conn_id=AZURE_FILESHARE_CONN_ID,
  86. gcp_conn_id=GCS_CONN_ID,
  87. dest_gcs=GCS_PATH_PREFIX,
  88. google_impersonation_chain=IMPERSONATION_CHAIN,
  89. gzip=True,
  90. )
  91. azure_fileshare_mock_hook.return_value.list_files.return_value = MOCK_FILES
  92. operator.execute(None)
  93. gcs_mock_hook.return_value.upload.assert_has_calls(
  94. [
  95. mock.call('gcs-bucket', 'data/TEST1.csv', mock.ANY, gzip=True),
  96. mock.call('gcs-bucket', 'data/TEST3.csv', mock.ANY, gzip=True),
  97. mock.call('gcs-bucket', 'data/TEST2.csv', mock.ANY, gzip=True),
  98. ],
  99. any_order=True,
  100. )