test_azure_fileshare.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. #
  19. """
  20. This module contains integration with Azure File Share.
  21. Cloud variant of a SMB file share. Make sure that a Airflow connection of
  22. type `wasb` exists. Authorization can be done by supplying a login (=Storage account name)
  23. and password (=Storage account key), or login and SAS token in the extra field
  24. (see connection `azure_fileshare_default` for an example).
  25. """
  26. import json
  27. import unittest
  28. from unittest import mock
  29. import pytest
  30. from azure.storage.file import Directory, File
  31. from airflow.models import Connection
  32. from airflow.providers.microsoft.azure.hooks.fileshare import AzureFileShareHook
  33. from airflow.utils import db
  34. class TestAzureFileshareHook(unittest.TestCase):
  35. def setUp(self):
  36. db.merge_conn(
  37. Connection(
  38. conn_id='azure_fileshare_test_key',
  39. conn_type='azure_file_share',
  40. login='login',
  41. password='key',
  42. )
  43. )
  44. db.merge_conn(
  45. Connection(
  46. conn_id='azure_fileshare_extras',
  47. conn_type='azure_fileshare',
  48. login='login',
  49. extra=json.dumps(
  50. {
  51. 'extra__azure_fileshare__sas_token': 'token',
  52. 'extra__azure_fileshare__protocol': 'http',
  53. }
  54. ),
  55. )
  56. )
  57. db.merge_conn(
  58. # Neither password nor sas_token present
  59. Connection(
  60. conn_id='azure_fileshare_missing_credentials',
  61. conn_type='azure_fileshare',
  62. login='login',
  63. )
  64. )
  65. db.merge_conn(
  66. Connection(
  67. conn_id='azure_fileshare_extras_deprecated',
  68. conn_type='azure_fileshare',
  69. login='login',
  70. extra=json.dumps(
  71. {
  72. 'sas_token': 'token',
  73. }
  74. ),
  75. )
  76. )
  77. db.merge_conn(
  78. Connection(
  79. conn_id='azure_fileshare_extras_deprecated_empty_wasb_extra',
  80. conn_type='azure_fileshare',
  81. login='login',
  82. password='password',
  83. extra=json.dumps(
  84. {
  85. 'extra__azure_fileshare__shared_access_key': '',
  86. }
  87. ),
  88. )
  89. )
  90. db.merge_conn(
  91. Connection(
  92. conn_id='azure_fileshare_extras_wrong',
  93. conn_type='azure_fileshare',
  94. login='login',
  95. extra=json.dumps(
  96. {
  97. 'wrong_key': 'token',
  98. }
  99. ),
  100. )
  101. )
  102. def test_key_and_connection(self):
  103. from azure.storage.file import FileService
  104. hook = AzureFileShareHook(azure_fileshare_conn_id='azure_fileshare_test_key')
  105. assert hook.conn_id == 'azure_fileshare_test_key'
  106. assert hook._conn is None
  107. print(hook.get_conn())
  108. assert isinstance(hook.get_conn(), FileService)
  109. def test_sas_token(self):
  110. from azure.storage.file import FileService
  111. hook = AzureFileShareHook(azure_fileshare_conn_id='azure_fileshare_extras')
  112. assert hook.conn_id == 'azure_fileshare_extras'
  113. assert isinstance(hook.get_conn(), FileService)
  114. def test_deprecated_sas_token(self):
  115. from azure.storage.file import FileService
  116. with pytest.warns(DeprecationWarning):
  117. hook = AzureFileShareHook(azure_fileshare_conn_id='azure_fileshare_extras_deprecated')
  118. assert hook.conn_id == 'azure_fileshare_extras_deprecated'
  119. assert isinstance(hook.get_conn(), FileService)
  120. def test_deprecated_wasb_connection(self):
  121. from azure.storage.file import FileService
  122. with pytest.warns(DeprecationWarning):
  123. hook = AzureFileShareHook(
  124. azure_fileshare_conn_id='azure_fileshare_extras_deprecated_empty_wasb_extra'
  125. )
  126. assert hook.conn_id == 'azure_fileshare_extras_deprecated_empty_wasb_extra'
  127. assert isinstance(hook.get_conn(), FileService)
  128. def test_wrong_extras(self):
  129. hook = AzureFileShareHook(azure_fileshare_conn_id='azure_fileshare_extras_wrong')
  130. assert hook.conn_id == 'azure_fileshare_extras_wrong'
  131. with pytest.raises(TypeError, match=".*wrong_key.*"):
  132. hook.get_conn()
  133. def test_missing_credentials(self):
  134. hook = AzureFileShareHook(azure_fileshare_conn_id='azure_fileshare_missing_credentials')
  135. assert hook.conn_id == 'azure_fileshare_missing_credentials'
  136. with pytest.raises(ValueError, match=".*account_key or sas_token.*"):
  137. hook.get_conn()
  138. @mock.patch('airflow.providers.microsoft.azure.hooks.fileshare.FileService', autospec=True)
  139. def test_check_for_file(self, mock_service):
  140. mock_instance = mock_service.return_value
  141. mock_instance.exists.return_value = True
  142. hook = AzureFileShareHook(azure_fileshare_conn_id='azure_fileshare_extras')
  143. assert hook.check_for_file('share', 'directory', 'file', timeout=3)
  144. mock_instance.exists.assert_called_once_with('share', 'directory', 'file', timeout=3)
  145. @mock.patch('airflow.providers.microsoft.azure.hooks.fileshare.FileService', autospec=True)
  146. def test_check_for_directory(self, mock_service):
  147. mock_instance = mock_service.return_value
  148. mock_instance.exists.return_value = True
  149. hook = AzureFileShareHook(azure_fileshare_conn_id='azure_fileshare_extras')
  150. assert hook.check_for_directory('share', 'directory', timeout=3)
  151. mock_instance.exists.assert_called_once_with('share', 'directory', timeout=3)
  152. @mock.patch('airflow.providers.microsoft.azure.hooks.fileshare.FileService', autospec=True)
  153. def test_load_file(self, mock_service):
  154. mock_instance = mock_service.return_value
  155. hook = AzureFileShareHook(azure_fileshare_conn_id='azure_fileshare_extras')
  156. hook.load_file('path', 'share', 'directory', 'file', max_connections=1)
  157. mock_instance.create_file_from_path.assert_called_once_with(
  158. 'share', 'directory', 'file', 'path', max_connections=1
  159. )
  160. @mock.patch('airflow.providers.microsoft.azure.hooks.fileshare.FileService', autospec=True)
  161. def test_load_string(self, mock_service):
  162. mock_instance = mock_service.return_value
  163. hook = AzureFileShareHook(azure_fileshare_conn_id='azure_fileshare_extras')
  164. hook.load_string('big string', 'share', 'directory', 'file', timeout=1)
  165. mock_instance.create_file_from_text.assert_called_once_with(
  166. 'share', 'directory', 'file', 'big string', timeout=1
  167. )
  168. @mock.patch('airflow.providers.microsoft.azure.hooks.fileshare.FileService', autospec=True)
  169. def test_load_stream(self, mock_service):
  170. mock_instance = mock_service.return_value
  171. hook = AzureFileShareHook(azure_fileshare_conn_id='azure_fileshare_extras')
  172. hook.load_stream('stream', 'share', 'directory', 'file', 42, timeout=1)
  173. mock_instance.create_file_from_stream.assert_called_once_with(
  174. 'share', 'directory', 'file', 'stream', 42, timeout=1
  175. )
  176. @mock.patch('airflow.providers.microsoft.azure.hooks.fileshare.FileService', autospec=True)
  177. def test_list_directories_and_files(self, mock_service):
  178. mock_instance = mock_service.return_value
  179. hook = AzureFileShareHook(azure_fileshare_conn_id='azure_fileshare_extras')
  180. hook.list_directories_and_files('share', 'directory', timeout=1)
  181. mock_instance.list_directories_and_files.assert_called_once_with('share', 'directory', timeout=1)
  182. @mock.patch('airflow.providers.microsoft.azure.hooks.fileshare.FileService', autospec=True)
  183. def test_list_files(self, mock_service):
  184. mock_instance = mock_service.return_value
  185. mock_instance.list_directories_and_files.return_value = [
  186. File("file1"),
  187. File("file2"),
  188. Directory("dir1"),
  189. Directory("dir2"),
  190. ]
  191. hook = AzureFileShareHook(azure_fileshare_conn_id='azure_fileshare_extras')
  192. files = hook.list_files('share', 'directory', timeout=1)
  193. assert files == ["file1", 'file2']
  194. mock_instance.list_directories_and_files.assert_called_once_with('share', 'directory', timeout=1)
  195. @mock.patch('airflow.providers.microsoft.azure.hooks.fileshare.FileService', autospec=True)
  196. def test_create_directory(self, mock_service):
  197. mock_instance = mock_service.return_value
  198. hook = AzureFileShareHook(azure_fileshare_conn_id='azure_fileshare_extras')
  199. hook.create_directory('share', 'directory', timeout=1)
  200. mock_instance.create_directory.assert_called_once_with('share', 'directory', timeout=1)
  201. @mock.patch('airflow.providers.microsoft.azure.hooks.fileshare.FileService', autospec=True)
  202. def test_get_file(self, mock_service):
  203. mock_instance = mock_service.return_value
  204. hook = AzureFileShareHook(azure_fileshare_conn_id='azure_fileshare_extras')
  205. hook.get_file('path', 'share', 'directory', 'file', max_connections=1)
  206. mock_instance.get_file_to_path.assert_called_once_with(
  207. 'share', 'directory', 'file', 'path', max_connections=1
  208. )
  209. @mock.patch('airflow.providers.microsoft.azure.hooks.fileshare.FileService', autospec=True)
  210. def test_get_file_to_stream(self, mock_service):
  211. mock_instance = mock_service.return_value
  212. hook = AzureFileShareHook(azure_fileshare_conn_id='azure_fileshare_extras')
  213. hook.get_file_to_stream('stream', 'share', 'directory', 'file', max_connections=1)
  214. mock_instance.get_file_to_stream.assert_called_once_with(
  215. 'share', 'directory', 'file', 'stream', max_connections=1
  216. )
  217. @mock.patch('airflow.providers.microsoft.azure.hooks.fileshare.FileService', autospec=True)
  218. def test_create_share(self, mock_service):
  219. mock_instance = mock_service.return_value
  220. hook = AzureFileShareHook(azure_fileshare_conn_id='azure_fileshare_extras')
  221. hook.create_share('my_share')
  222. mock_instance.create_share.assert_called_once_with('my_share')
  223. @mock.patch('airflow.providers.microsoft.azure.hooks.fileshare.FileService', autospec=True)
  224. def test_delete_share(self, mock_service):
  225. mock_instance = mock_service.return_value
  226. hook = AzureFileShareHook(azure_fileshare_conn_id='azure_fileshare_extras')
  227. hook.delete_share('my_share')
  228. mock_instance.delete_share.assert_called_once_with('my_share')