test_adx_6.py 916 B

1234567891011121314151617181920212223
  1. def test_conn_method_aad_app(self, mock_init):
  2. mock_init.return_value = None
  3. db.merge_conn(
  4. Connection(
  5. conn_id=ADX_TEST_CONN_ID,
  6. conn_type='azure_data_explorer',
  7. login='app_id',
  8. password='app key',
  9. host='https://help.kusto.windows.net',
  10. extra=json.dumps(
  11. {
  12. 'extra__azure_data_explorer__tenant': 'tenant',
  13. 'extra__azure_data_explorer__auth_method': 'AAD_APP',
  14. }
  15. ),
  16. )
  17. )
  18. AzureDataExplorerHook(azure_data_explorer_conn_id=ADX_TEST_CONN_ID)
  19. assert mock_init.called_with(
  20. KustoConnectionStringBuilder.with_aad_application_key_authentication(
  21. 'https://help.kusto.windows.net', 'app_id', 'app key', 'tenant'
  22. )
  23. )