test_adx_3.py 630 B

1234567891011121314
  1. def test_conn_unknown_method(self):
  2. db.merge_conn(
  3. Connection(
  4. conn_id=ADX_TEST_CONN_ID,
  5. conn_type='azure_data_explorer',
  6. login='client_id',
  7. password='client secret',
  8. host='https://help.kusto.windows.net',
  9. extra=json.dumps({'extra__azure_data_explorer__auth_method': 'AAD_OTHER'}),
  10. )
  11. )
  12. with pytest.raises(AirflowException) as ctx:
  13. AzureDataExplorerHook(azure_data_explorer_conn_id=ADX_TEST_CONN_ID)
  14. assert 'Unknown authentication method: AAD_OTHER' in str(ctx.value)