test_azure_data_factory.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. import json
  18. from typing import Type
  19. from unittest.mock import MagicMock, PropertyMock, patch
  20. import pytest
  21. from azure.identity import ClientSecretCredential, DefaultAzureCredential
  22. from azure.mgmt.datafactory.models import FactoryListResponse
  23. from pytest import fixture
  24. from airflow.exceptions import AirflowException
  25. from airflow.models.connection import Connection
  26. from airflow.providers.microsoft.azure.hooks.data_factory import (
  27. AzureDataFactoryHook,
  28. AzureDataFactoryPipelineRunException,
  29. AzureDataFactoryPipelineRunStatus,
  30. provide_targeted_factory,
  31. )
  32. from airflow.utils import db
  33. DEFAULT_RESOURCE_GROUP = "defaultResourceGroup"
  34. RESOURCE_GROUP = "testResourceGroup"
  35. DEFAULT_FACTORY = "defaultFactory"
  36. FACTORY = "testFactory"
  37. DEFAULT_CONNECTION_CLIENT_SECRET = "azure_data_factory_test_client_secret"
  38. DEFAULT_CONNECTION_DEFAULT_CREDENTIAL = "azure_data_factory_test_default_credential"
  39. MODEL = object()
  40. NAME = "testName"
  41. ID = "testId"
  42. def setup_module():
  43. connection_client_secret = Connection(
  44. conn_id=DEFAULT_CONNECTION_CLIENT_SECRET,
  45. conn_type="azure_data_factory",
  46. login="clientId",
  47. password="clientSecret",
  48. extra=json.dumps(
  49. {
  50. "extra__azure_data_factory__tenantId": "tenantId",
  51. "extra__azure_data_factory__subscriptionId": "subscriptionId",
  52. "extra__azure_data_factory__resource_group_name": DEFAULT_RESOURCE_GROUP,
  53. "extra__azure_data_factory__factory_name": DEFAULT_FACTORY,
  54. }
  55. ),
  56. )
  57. connection_default_credential = Connection(
  58. conn_id=DEFAULT_CONNECTION_DEFAULT_CREDENTIAL,
  59. conn_type="azure_data_factory",
  60. extra=json.dumps(
  61. {
  62. "extra__azure_data_factory__subscriptionId": "subscriptionId",
  63. "extra__azure_data_factory__resource_group_name": DEFAULT_RESOURCE_GROUP,
  64. "extra__azure_data_factory__factory_name": DEFAULT_FACTORY,
  65. }
  66. ),
  67. )
  68. connection_missing_subscription_id = Connection(
  69. conn_id="azure_data_factory_missing_subscription_id",
  70. conn_type="azure_data_factory",
  71. login="clientId",
  72. password="clientSecret",
  73. extra=json.dumps(
  74. {
  75. "extra__azure_data_factory__tenantId": "tenantId",
  76. "extra__azure_data_factory__resource_group_name": DEFAULT_RESOURCE_GROUP,
  77. "extra__azure_data_factory__factory_name": DEFAULT_FACTORY,
  78. }
  79. ),
  80. )
  81. connection_missing_tenant_id = Connection(
  82. conn_id="azure_data_factory_missing_tenant_id",
  83. conn_type="azure_data_factory",
  84. login="clientId",
  85. password="clientSecret",
  86. extra=json.dumps(
  87. {
  88. "extra__azure_data_factory__subscriptionId": "subscriptionId",
  89. "extra__azure_data_factory__resource_group_name": DEFAULT_RESOURCE_GROUP,
  90. "extra__azure_data_factory__factory_name": DEFAULT_FACTORY,
  91. }
  92. ),
  93. )
  94. db.merge_conn(connection_client_secret)
  95. db.merge_conn(connection_default_credential)
  96. db.merge_conn(connection_missing_subscription_id)
  97. db.merge_conn(connection_missing_tenant_id)
  98. @fixture
  99. def hook():
  100. client = AzureDataFactoryHook(azure_data_factory_conn_id=DEFAULT_CONNECTION_CLIENT_SECRET)
  101. client._conn = MagicMock(
  102. spec=[
  103. "factories",
  104. "linked_services",
  105. "datasets",
  106. "pipelines",
  107. "pipeline_runs",
  108. "triggers",
  109. "trigger_runs",
  110. ]
  111. )
  112. return client
  113. def parametrize(explicit_factory, implicit_factory):
  114. def wrapper(func):
  115. return pytest.mark.parametrize(
  116. ("user_args", "sdk_args"),
  117. (explicit_factory, implicit_factory),
  118. ids=("explicit factory", "implicit factory"),
  119. )(func)
  120. return wrapper
  121. def test_provide_targeted_factory():
  122. def echo(_, resource_group_name=None, factory_name=None):
  123. return resource_group_name, factory_name
  124. conn = MagicMock()
  125. hook = MagicMock()
  126. hook.get_connection.return_value = conn
  127. conn.extra_dejson = {}
  128. assert provide_targeted_factory(echo)(hook, RESOURCE_GROUP, FACTORY) == (RESOURCE_GROUP, FACTORY)
  129. conn.extra_dejson = {
  130. "extra__azure_data_factory__resource_group_name": DEFAULT_RESOURCE_GROUP,
  131. "extra__azure_data_factory__factory_name": DEFAULT_FACTORY,
  132. }
  133. assert provide_targeted_factory(echo)(hook) == (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY)
  134. assert provide_targeted_factory(echo)(hook, RESOURCE_GROUP, None) == (RESOURCE_GROUP, DEFAULT_FACTORY)
  135. assert provide_targeted_factory(echo)(hook, None, FACTORY) == (DEFAULT_RESOURCE_GROUP, FACTORY)
  136. assert provide_targeted_factory(echo)(hook, None, None) == (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY)
  137. with pytest.raises(AirflowException):
  138. conn.extra_dejson = {}
  139. provide_targeted_factory(echo)(hook)
  140. @pytest.mark.parametrize(
  141. ("connection_id", "credential_type"),
  142. [
  143. (DEFAULT_CONNECTION_CLIENT_SECRET, ClientSecretCredential),
  144. (DEFAULT_CONNECTION_DEFAULT_CREDENTIAL, DefaultAzureCredential),
  145. ],
  146. )
  147. def test_get_connection_by_credential_client_secret(connection_id: str, credential_type: Type):
  148. hook = AzureDataFactoryHook(connection_id)
  149. with patch.object(hook, "_create_client") as mock_create_client:
  150. mock_create_client.return_value = MagicMock()
  151. connection = hook.get_conn()
  152. assert connection is not None
  153. mock_create_client.assert_called_once()
  154. assert isinstance(mock_create_client.call_args[0][0], credential_type)
  155. assert mock_create_client.call_args[0][1] == "subscriptionId"
  156. @parametrize(
  157. explicit_factory=((RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY)),
  158. implicit_factory=((), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY)),
  159. )
  160. def test_get_factory(hook: AzureDataFactoryHook, user_args, sdk_args):
  161. hook.get_factory(*user_args)
  162. hook._conn.factories.get.assert_called_with(*sdk_args)
  163. @parametrize(
  164. explicit_factory=((MODEL, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, MODEL)),
  165. implicit_factory=((MODEL,), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, MODEL)),
  166. )
  167. def test_create_factory(hook: AzureDataFactoryHook, user_args, sdk_args):
  168. hook.create_factory(*user_args)
  169. hook._conn.factories.create_or_update.assert_called_with(*sdk_args)
  170. @parametrize(
  171. explicit_factory=((MODEL, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, MODEL)),
  172. implicit_factory=((MODEL,), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, MODEL)),
  173. )
  174. def test_update_factory(hook: AzureDataFactoryHook, user_args, sdk_args):
  175. with patch.object(hook, "_factory_exists") as mock_factory_exists:
  176. mock_factory_exists.return_value = True
  177. hook.update_factory(*user_args)
  178. hook._conn.factories.create_or_update.assert_called_with(*sdk_args)
  179. @parametrize(
  180. explicit_factory=((MODEL, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, MODEL)),
  181. implicit_factory=((MODEL,), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, MODEL)),
  182. )
  183. def test_update_factory_non_existent(hook: AzureDataFactoryHook, user_args, sdk_args):
  184. with patch.object(hook, "_factory_exists") as mock_factory_exists:
  185. mock_factory_exists.return_value = False
  186. with pytest.raises(AirflowException, match=r"Factory .+ does not exist"):
  187. hook.update_factory(*user_args)
  188. @parametrize(
  189. explicit_factory=((RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY)),
  190. implicit_factory=((), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY)),
  191. )
  192. def test_delete_factory(hook: AzureDataFactoryHook, user_args, sdk_args):
  193. hook.delete_factory(*user_args)
  194. hook._conn.factories.delete.assert_called_with(*sdk_args)
  195. @parametrize(
  196. explicit_factory=((NAME, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME)),
  197. implicit_factory=((NAME,), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME)),
  198. )
  199. def test_get_linked_service(hook: AzureDataFactoryHook, user_args, sdk_args):
  200. hook.get_linked_service(*user_args)
  201. hook._conn.linked_services.get.assert_called_with(*sdk_args)
  202. @parametrize(
  203. explicit_factory=((NAME, MODEL, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME, MODEL)),
  204. implicit_factory=((NAME, MODEL), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME, MODEL)),
  205. )
  206. def test_create_linked_service(hook: AzureDataFactoryHook, user_args, sdk_args):
  207. hook.create_linked_service(*user_args)
  208. hook._conn.linked_services.create_or_update(*sdk_args)
  209. @parametrize(
  210. explicit_factory=((NAME, MODEL, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME, MODEL)),
  211. implicit_factory=((NAME, MODEL), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME, MODEL)),
  212. )
  213. def test_update_linked_service(hook: AzureDataFactoryHook, user_args, sdk_args):
  214. with patch.object(hook, "_linked_service_exists") as mock_linked_service_exists:
  215. mock_linked_service_exists.return_value = True
  216. hook.update_linked_service(*user_args)
  217. hook._conn.linked_services.create_or_update(*sdk_args)
  218. @parametrize(
  219. explicit_factory=((NAME, MODEL, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME, MODEL)),
  220. implicit_factory=((NAME, MODEL), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME, MODEL)),
  221. )
  222. def test_update_linked_service_non_existent(hook: AzureDataFactoryHook, user_args, sdk_args):
  223. with patch.object(hook, "_linked_service_exists") as mock_linked_service_exists:
  224. mock_linked_service_exists.return_value = False
  225. with pytest.raises(AirflowException, match=r"Linked service .+ does not exist"):
  226. hook.update_linked_service(*user_args)
  227. @parametrize(
  228. explicit_factory=((NAME, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME)),
  229. implicit_factory=((NAME,), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME)),
  230. )
  231. def test_delete_linked_service(hook: AzureDataFactoryHook, user_args, sdk_args):
  232. hook.delete_linked_service(*user_args)
  233. hook._conn.linked_services.delete.assert_called_with(*sdk_args)
  234. @parametrize(
  235. explicit_factory=((NAME, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME)),
  236. implicit_factory=((NAME,), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME)),
  237. )
  238. def test_get_dataset(hook: AzureDataFactoryHook, user_args, sdk_args):
  239. hook.get_dataset(*user_args)
  240. hook._conn.datasets.get.assert_called_with(*sdk_args)
  241. @parametrize(
  242. explicit_factory=((NAME, MODEL, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME, MODEL)),
  243. implicit_factory=((NAME, MODEL), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME, MODEL)),
  244. )
  245. def test_create_dataset(hook: AzureDataFactoryHook, user_args, sdk_args):
  246. hook.create_dataset(*user_args)
  247. hook._conn.datasets.create_or_update.assert_called_with(*sdk_args)
  248. @parametrize(
  249. explicit_factory=((NAME, MODEL, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME, MODEL)),
  250. implicit_factory=((NAME, MODEL), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME, MODEL)),
  251. )
  252. def test_update_dataset(hook: AzureDataFactoryHook, user_args, sdk_args):
  253. with patch.object(hook, "_dataset_exists") as mock_dataset_exists:
  254. mock_dataset_exists.return_value = True
  255. hook.update_dataset(*user_args)
  256. hook._conn.datasets.create_or_update.assert_called_with(*sdk_args)
  257. @parametrize(
  258. explicit_factory=((NAME, MODEL, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME, MODEL)),
  259. implicit_factory=((NAME, MODEL), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME, MODEL)),
  260. )
  261. def test_update_dataset_non_existent(hook: AzureDataFactoryHook, user_args, sdk_args):
  262. with patch.object(hook, "_dataset_exists") as mock_dataset_exists:
  263. mock_dataset_exists.return_value = False
  264. with pytest.raises(AirflowException, match=r"Dataset .+ does not exist"):
  265. hook.update_dataset(*user_args)
  266. @parametrize(
  267. explicit_factory=((NAME, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME)),
  268. implicit_factory=((NAME,), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME)),
  269. )
  270. def test_delete_dataset(hook: AzureDataFactoryHook, user_args, sdk_args):
  271. hook.delete_dataset(*user_args)
  272. hook._conn.datasets.delete.assert_called_with(*sdk_args)
  273. @parametrize(
  274. explicit_factory=((NAME, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME)),
  275. implicit_factory=((NAME,), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME)),
  276. )
  277. def test_get_pipeline(hook: AzureDataFactoryHook, user_args, sdk_args):
  278. hook.get_pipeline(*user_args)
  279. hook._conn.pipelines.get.assert_called_with(*sdk_args)
  280. @parametrize(
  281. explicit_factory=((NAME, MODEL, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME, MODEL)),
  282. implicit_factory=((NAME, MODEL), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME, MODEL)),
  283. )
  284. def test_create_pipeline(hook: AzureDataFactoryHook, user_args, sdk_args):
  285. hook.create_pipeline(*user_args)
  286. hook._conn.pipelines.create_or_update.assert_called_with(*sdk_args)
  287. @parametrize(
  288. explicit_factory=((NAME, MODEL, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME, MODEL)),
  289. implicit_factory=((NAME, MODEL), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME, MODEL)),
  290. )
  291. def test_update_pipeline(hook: AzureDataFactoryHook, user_args, sdk_args):
  292. with patch.object(hook, "_pipeline_exists") as mock_pipeline_exists:
  293. mock_pipeline_exists.return_value = True
  294. hook.update_pipeline(*user_args)
  295. hook._conn.pipelines.create_or_update.assert_called_with(*sdk_args)
  296. @parametrize(
  297. explicit_factory=((NAME, MODEL, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME, MODEL)),
  298. implicit_factory=((NAME, MODEL), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME, MODEL)),
  299. )
  300. def test_update_pipeline_non_existent(hook: AzureDataFactoryHook, user_args, sdk_args):
  301. with patch.object(hook, "_pipeline_exists") as mock_pipeline_exists:
  302. mock_pipeline_exists.return_value = False
  303. with pytest.raises(AirflowException, match=r"Pipeline .+ does not exist"):
  304. hook.update_pipeline(*user_args)
  305. @parametrize(
  306. explicit_factory=((NAME, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME)),
  307. implicit_factory=((NAME,), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME)),
  308. )
  309. def test_delete_pipeline(hook: AzureDataFactoryHook, user_args, sdk_args):
  310. hook.delete_pipeline(*user_args)
  311. hook._conn.pipelines.delete.assert_called_with(*sdk_args)
  312. @parametrize(
  313. explicit_factory=((NAME, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME)),
  314. implicit_factory=((NAME,), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME)),
  315. )
  316. def test_run_pipeline(hook: AzureDataFactoryHook, user_args, sdk_args):
  317. hook.run_pipeline(*user_args)
  318. hook._conn.pipelines.create_run.assert_called_with(*sdk_args)
  319. @parametrize(
  320. explicit_factory=((ID, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, ID)),
  321. implicit_factory=((ID,), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, ID)),
  322. )
  323. def test_get_pipeline_run(hook: AzureDataFactoryHook, user_args, sdk_args):
  324. hook.get_pipeline_run(*user_args)
  325. hook._conn.pipeline_runs.get.assert_called_with(*sdk_args)
  326. _wait_for_pipeline_run_status_test_args = [
  327. (AzureDataFactoryPipelineRunStatus.SUCCEEDED, AzureDataFactoryPipelineRunStatus.SUCCEEDED, True),
  328. (AzureDataFactoryPipelineRunStatus.FAILED, AzureDataFactoryPipelineRunStatus.SUCCEEDED, False),
  329. (AzureDataFactoryPipelineRunStatus.CANCELLED, AzureDataFactoryPipelineRunStatus.SUCCEEDED, False),
  330. (AzureDataFactoryPipelineRunStatus.IN_PROGRESS, AzureDataFactoryPipelineRunStatus.SUCCEEDED, "timeout"),
  331. (AzureDataFactoryPipelineRunStatus.QUEUED, AzureDataFactoryPipelineRunStatus.SUCCEEDED, "timeout"),
  332. (AzureDataFactoryPipelineRunStatus.CANCELING, AzureDataFactoryPipelineRunStatus.SUCCEEDED, "timeout"),
  333. (AzureDataFactoryPipelineRunStatus.SUCCEEDED, AzureDataFactoryPipelineRunStatus.TERMINAL_STATUSES, True),
  334. (AzureDataFactoryPipelineRunStatus.FAILED, AzureDataFactoryPipelineRunStatus.TERMINAL_STATUSES, True),
  335. (AzureDataFactoryPipelineRunStatus.CANCELLED, AzureDataFactoryPipelineRunStatus.TERMINAL_STATUSES, True),
  336. ]
  337. @pytest.mark.parametrize(
  338. argnames=("pipeline_run_status", "expected_status", "expected_output"),
  339. argvalues=_wait_for_pipeline_run_status_test_args,
  340. ids=[
  341. f"run_status_{argval[0]}_expected_{argval[1]}"
  342. if isinstance(argval[1], str)
  343. else f"run_status_{argval[0]}_expected_AnyTerminalStatus"
  344. for argval in _wait_for_pipeline_run_status_test_args
  345. ],
  346. )
  347. def test_wait_for_pipeline_run_status(hook, pipeline_run_status, expected_status, expected_output):
  348. config = {"run_id": ID, "timeout": 3, "check_interval": 1, "expected_statuses": expected_status}
  349. with patch.object(AzureDataFactoryHook, "get_pipeline_run") as mock_pipeline_run:
  350. mock_pipeline_run.return_value.status = pipeline_run_status
  351. if expected_output != "timeout":
  352. assert hook.wait_for_pipeline_run_status(**config) == expected_output
  353. else:
  354. with pytest.raises(AzureDataFactoryPipelineRunException):
  355. hook.wait_for_pipeline_run_status(**config)
  356. @parametrize(
  357. explicit_factory=((ID, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, ID)),
  358. implicit_factory=((ID,), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, ID)),
  359. )
  360. def test_cancel_pipeline_run(hook: AzureDataFactoryHook, user_args, sdk_args):
  361. hook.cancel_pipeline_run(*user_args)
  362. hook._conn.pipeline_runs.cancel.assert_called_with(*sdk_args)
  363. @parametrize(
  364. explicit_factory=((NAME, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME)),
  365. implicit_factory=((NAME,), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME)),
  366. )
  367. def test_get_trigger(hook: AzureDataFactoryHook, user_args, sdk_args):
  368. hook.get_trigger(*user_args)
  369. hook._conn.triggers.get.assert_called_with(*sdk_args)
  370. @parametrize(
  371. explicit_factory=((NAME, MODEL, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME, MODEL)),
  372. implicit_factory=((NAME, MODEL), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME, MODEL)),
  373. )
  374. def test_create_trigger(hook: AzureDataFactoryHook, user_args, sdk_args):
  375. hook.create_trigger(*user_args)
  376. hook._conn.triggers.create_or_update.assert_called_with(*sdk_args)
  377. @parametrize(
  378. explicit_factory=((NAME, MODEL, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME, MODEL)),
  379. implicit_factory=((NAME, MODEL), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME, MODEL)),
  380. )
  381. def test_update_trigger(hook: AzureDataFactoryHook, user_args, sdk_args):
  382. with patch.object(hook, "_trigger_exists") as mock_trigger_exists:
  383. mock_trigger_exists.return_value = True
  384. hook.update_trigger(*user_args)
  385. hook._conn.triggers.create_or_update.assert_called_with(*sdk_args)
  386. @parametrize(
  387. explicit_factory=((NAME, MODEL, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME, MODEL)),
  388. implicit_factory=((NAME, MODEL), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME, MODEL)),
  389. )
  390. def test_update_trigger_non_existent(hook: AzureDataFactoryHook, user_args, sdk_args):
  391. with patch.object(hook, "_trigger_exists") as mock_trigger_exists:
  392. mock_trigger_exists.return_value = False
  393. with pytest.raises(AirflowException, match=r"Trigger .+ does not exist"):
  394. hook.update_trigger(*user_args)
  395. @parametrize(
  396. explicit_factory=((NAME, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME)),
  397. implicit_factory=((NAME,), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME)),
  398. )
  399. def test_delete_trigger(hook: AzureDataFactoryHook, user_args, sdk_args):
  400. hook.delete_trigger(*user_args)
  401. hook._conn.triggers.delete.assert_called_with(*sdk_args)
  402. @parametrize(
  403. explicit_factory=((NAME, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME)),
  404. implicit_factory=((NAME,), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME)),
  405. )
  406. def test_start_trigger(hook: AzureDataFactoryHook, user_args, sdk_args):
  407. hook.start_trigger(*user_args)
  408. hook._conn.triggers.begin_start.assert_called_with(*sdk_args)
  409. @parametrize(
  410. explicit_factory=((NAME, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME)),
  411. implicit_factory=((NAME,), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME)),
  412. )
  413. def test_stop_trigger(hook: AzureDataFactoryHook, user_args, sdk_args):
  414. hook.stop_trigger(*user_args)
  415. hook._conn.triggers.begin_stop.assert_called_with(*sdk_args)
  416. @parametrize(
  417. explicit_factory=((NAME, ID, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME, ID)),
  418. implicit_factory=((NAME, ID), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME, ID)),
  419. )
  420. def test_rerun_trigger(hook: AzureDataFactoryHook, user_args, sdk_args):
  421. hook.rerun_trigger(*user_args)
  422. hook._conn.trigger_runs.rerun.assert_called_with(*sdk_args)
  423. @parametrize(
  424. explicit_factory=((NAME, ID, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME, ID)),
  425. implicit_factory=((NAME, ID), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME, ID)),
  426. )
  427. def test_cancel_trigger(hook: AzureDataFactoryHook, user_args, sdk_args):
  428. hook.cancel_trigger(*user_args)
  429. hook._conn.trigger_runs.cancel.assert_called_with(*sdk_args)
  430. @pytest.mark.parametrize(
  431. argnames="factory_list_result",
  432. argvalues=[iter([FactoryListResponse]), iter([])],
  433. ids=["factory_exists", "factory_does_not_exist"],
  434. )
  435. def test_connection_success(hook, factory_list_result):
  436. hook.get_conn().factories.list.return_value = factory_list_result
  437. status, msg = hook.test_connection()
  438. assert status is True
  439. assert msg == "Successfully connected to Azure Data Factory."
  440. def test_connection_failure(hook):
  441. hook.get_conn().factories.list = PropertyMock(side_effect=Exception("Authentication failed."))
  442. status, msg = hook.test_connection()
  443. assert status is False
  444. assert msg == "Authentication failed."
  445. def test_connection_failure_missing_subscription_id():
  446. hook = AzureDataFactoryHook("azure_data_factory_missing_subscription_id")
  447. status, msg = hook.test_connection()
  448. assert status is False
  449. assert msg == "A Subscription ID is required to connect to Azure Data Factory."
  450. def test_connection_failure_missing_tenant_id():
  451. hook = AzureDataFactoryHook("azure_data_factory_missing_tenant_id")
  452. status, msg = hook.test_connection()
  453. assert status is False
  454. assert msg == "A Tenant ID is required when authenticating with Client ID and Secret."