test_azure_data_lake.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. #
  19. import json
  20. import unittest
  21. from unittest import mock
  22. from airflow.models import Connection
  23. from airflow.utils import db
  24. class TestAzureDataLakeHook(unittest.TestCase):
  25. def setUp(self):
  26. db.merge_conn(
  27. Connection(
  28. conn_id='adl_test_key',
  29. conn_type='azure_data_lake',
  30. login='client_id',
  31. password='client secret',
  32. extra=json.dumps({"tenant": "tenant", "account_name": "accountname"}),
  33. )
  34. )
  35. @mock.patch('airflow.providers.microsoft.azure.hooks.data_lake.lib', autospec=True)
  36. def test_conn(self, mock_lib):
  37. from azure.datalake.store import core
  38. from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook
  39. hook = AzureDataLakeHook(azure_data_lake_conn_id='adl_test_key')
  40. assert hook._conn is None
  41. assert hook.conn_id == 'adl_test_key'
  42. assert isinstance(hook.get_conn(), core.AzureDLFileSystem)
  43. assert mock_lib.auth.called
  44. @mock.patch('airflow.providers.microsoft.azure.hooks.data_lake.core.AzureDLFileSystem', autospec=True)
  45. @mock.patch('airflow.providers.microsoft.azure.hooks.data_lake.lib', autospec=True)
  46. def test_check_for_blob(self, mock_lib, mock_filesystem):
  47. from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook
  48. hook = AzureDataLakeHook(azure_data_lake_conn_id='adl_test_key')
  49. hook.check_for_file('file_path')
  50. mock_filesystem.glob.called
  51. @mock.patch('airflow.providers.microsoft.azure.hooks.data_lake.multithread.ADLUploader', autospec=True)
  52. @mock.patch('airflow.providers.microsoft.azure.hooks.data_lake.lib', autospec=True)
  53. def test_upload_file(self, mock_lib, mock_uploader):
  54. from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook
  55. hook = AzureDataLakeHook(azure_data_lake_conn_id='adl_test_key')
  56. hook.upload_file(
  57. local_path='tests/hooks/test_adl_hook.py',
  58. remote_path='/test_adl_hook.py',
  59. nthreads=64,
  60. overwrite=True,
  61. buffersize=4194304,
  62. blocksize=4194304,
  63. )
  64. mock_uploader.assert_called_once_with(
  65. hook.get_conn(),
  66. lpath='tests/hooks/test_adl_hook.py',
  67. rpath='/test_adl_hook.py',
  68. nthreads=64,
  69. overwrite=True,
  70. buffersize=4194304,
  71. blocksize=4194304,
  72. )
  73. @mock.patch('airflow.providers.microsoft.azure.hooks.data_lake.multithread.ADLDownloader', autospec=True)
  74. @mock.patch('airflow.providers.microsoft.azure.hooks.data_lake.lib', autospec=True)
  75. def test_download_file(self, mock_lib, mock_downloader):
  76. from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook
  77. hook = AzureDataLakeHook(azure_data_lake_conn_id='adl_test_key')
  78. hook.download_file(
  79. local_path='test_adl_hook.py',
  80. remote_path='/test_adl_hook.py',
  81. nthreads=64,
  82. overwrite=True,
  83. buffersize=4194304,
  84. blocksize=4194304,
  85. )
  86. mock_downloader.assert_called_once_with(
  87. hook.get_conn(),
  88. lpath='test_adl_hook.py',
  89. rpath='/test_adl_hook.py',
  90. nthreads=64,
  91. overwrite=True,
  92. buffersize=4194304,
  93. blocksize=4194304,
  94. )
  95. @mock.patch('airflow.providers.microsoft.azure.hooks.data_lake.core.AzureDLFileSystem', autospec=True)
  96. @mock.patch('airflow.providers.microsoft.azure.hooks.data_lake.lib', autospec=True)
  97. def test_list_glob(self, mock_lib, mock_fs):
  98. from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook
  99. hook = AzureDataLakeHook(azure_data_lake_conn_id='adl_test_key')
  100. hook.list('file_path/*')
  101. mock_fs.return_value.glob.assert_called_once_with('file_path/*')
  102. @mock.patch('airflow.providers.microsoft.azure.hooks.data_lake.core.AzureDLFileSystem', autospec=True)
  103. @mock.patch('airflow.providers.microsoft.azure.hooks.data_lake.lib', autospec=True)
  104. def test_list_walk(self, mock_lib, mock_fs):
  105. from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook
  106. hook = AzureDataLakeHook(azure_data_lake_conn_id='adl_test_key')
  107. hook.list('file_path/some_folder/')
  108. mock_fs.return_value.walk.assert_called_once_with('file_path/some_folder/')
  109. @mock.patch('airflow.providers.microsoft.azure.hooks.data_lake.core.AzureDLFileSystem', autospec=True)
  110. @mock.patch('airflow.providers.microsoft.azure.hooks.data_lake.lib', autospec=True)
  111. def test_remove(self, mock_lib, mock_fs):
  112. from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook
  113. hook = AzureDataLakeHook(azure_data_lake_conn_id='adl_test_key')
  114. hook.remove('filepath', True)
  115. mock_fs.return_value.remove.assert_called_once_with('filepath', recursive=True)