12345678910111213141516171819202122232425 |
- def provide_azure_data_lake_default_connection(key_file_path: str):
- """
- Context manager to provide a temporary value for azure_data_lake_default connection
- :param key_file_path: Path to file with azure_data_lake_default credentials .json file.
- """
- required_fields = {'login', 'password', 'extra'}
- if not key_file_path.endswith(".json"):
- raise AirflowException("Use a JSON key file.")
- with open(key_file_path) as credentials:
- creds = json.load(credentials)
- missing_keys = required_fields - creds.keys()
- if missing_keys:
- message = f"{missing_keys} fields are missing"
- raise AirflowException(message)
- conn = Connection(
- conn_id=DATA_LAKE_CONNECTION_ID,
- conn_type=DATA_LAKE_CONNECTION_TYPE,
- host=creds.get("host", None),
- login=creds.get("login", None),
- password=creds.get("password", None),
- extra=json.dumps(creds.get('extra', None)),
- )
- with patch_environ({f"AIRFLOW_CONN_{conn.conn_id.upper()}": conn.get_uri()}):
- yield
|