test_adx_7.py 1.0 KB

123456789101112131415161718192021222324
  1. def test_conn_method_aad_app_cert(self, mock_init):
  2. mock_init.return_value = None
  3. db.merge_conn(
  4. Connection(
  5. conn_id=ADX_TEST_CONN_ID,
  6. conn_type='azure_data_explorer',
  7. login='client_id',
  8. host='https://help.kusto.windows.net',
  9. extra=json.dumps(
  10. {
  11. 'extra__azure_data_explorer__tenant': 'tenant',
  12. 'extra__azure_data_explorer__auth_method': 'AAD_APP_CERT',
  13. 'extra__azure_data_explorer__certificate': 'PEM',
  14. 'extra__azure_data_explorer__thumbprint': 'thumbprint',
  15. }
  16. ),
  17. )
  18. )
  19. AzureDataExplorerHook(azure_data_explorer_conn_id=ADX_TEST_CONN_ID)
  20. assert mock_init.called_with(
  21. KustoConnectionStringBuilder.with_aad_application_certificate_authentication(
  22. 'https://help.kusto.windows.net', 'client_id', 'PEM', 'thumbprint', 'tenant'
  23. )
  24. )