def provide_azure_data_lake_default_connection(key_file_path: str): """ Context manager to provide a temporary value for azure_data_lake_default connection :param key_file_path: Path to file with azure_data_lake_default credentials .json file. """ required_fields = {'login', 'password', 'extra'} if not key_file_path.endswith(".json"): raise AirflowException("Use a JSON key file.") with open(key_file_path) as credentials: creds = json.load(credentials) missing_keys = required_fields - creds.keys() if missing_keys: message = f"{missing_keys} fields are missing" raise AirflowException(message) conn = Connection( conn_id=DATA_LAKE_CONNECTION_ID, conn_type=DATA_LAKE_CONNECTION_TYPE, host=creds.get("host", None), login=creds.get("login", None), password=creds.get("password", None), extra=json.dumps(creds.get('extra', None)), ) with patch_environ({f"AIRFLOW_CONN_{conn.conn_id.upper()}": conn.get_uri()}): yield