azure_system_helpers_2.py 1.0 KB

12345678910111213141516171819202122232425
  1. def provide_azure_data_lake_default_connection(key_file_path: str):
  2. """
  3. Context manager to provide a temporary value for azure_data_lake_default connection
  4. :param key_file_path: Path to file with azure_data_lake_default credentials .json file.
  5. """
  6. required_fields = {'login', 'password', 'extra'}
  7. if not key_file_path.endswith(".json"):
  8. raise AirflowException("Use a JSON key file.")
  9. with open(key_file_path) as credentials:
  10. creds = json.load(credentials)
  11. missing_keys = required_fields - creds.keys()
  12. if missing_keys:
  13. message = f"{missing_keys} fields are missing"
  14. raise AirflowException(message)
  15. conn = Connection(
  16. conn_id=DATA_LAKE_CONNECTION_ID,
  17. conn_type=DATA_LAKE_CONNECTION_TYPE,
  18. host=creds.get("host", None),
  19. login=creds.get("login", None),
  20. password=creds.get("password", None),
  21. extra=json.dumps(creds.get('extra', None)),
  22. )
  23. with patch_environ({f"AIRFLOW_CONN_{conn.conn_id.upper()}": conn.get_uri()}):
  24. yield