# ------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# -------------------------------------------------------------------------
import json
import datetime
import logging
from typing import (
Any,
Callable,
Dict,
Mapping,
Optional,
overload,
List,
Tuple,
TYPE_CHECKING,
)
from azure.appconfiguration import ( # type:ignore # pylint:disable=no-name-in-module
ConfigurationSetting,
FeatureFlagConfigurationSetting,
SecretReferenceConfigurationSetting,
)
from azure.core.exceptions import AzureError, HttpResponseError
from azure.keyvault.secrets import SecretClient, KeyVaultSecretIdentifier
from ._models import AzureAppConfigurationKeyVaultOptions, SettingSelector
from ._constants import (
FEATURE_MANAGEMENT_KEY,
FEATURE_FLAG_KEY,
)
from ._azureappconfigurationproviderbase import (
AzureAppConfigurationProviderBase,
update_correlation_context_header,
delay_failure,
is_json_content_type,
sdk_allowed_kwargs,
)
from ._client_manager import ConfigurationClientManager
from ._user_agent import USER_AGENT
if TYPE_CHECKING:
from azure.core.credentials import TokenCredential
JSON = Mapping[str, Any]
logger = logging.getLogger(__name__)
@overload
def load( # pylint: disable=docstring-keyword-should-match-keyword-only
endpoint: str,
credential: "TokenCredential",
*,
selects: Optional[List[SettingSelector]] = None,
trim_prefixes: Optional[List[str]] = None,
keyvault_credential: Optional["TokenCredential"] = None,
keyvault_client_configs: Optional[Mapping[str, JSON]] = None,
secret_resolver: Optional[Callable[[str], str]] = None,
key_vault_options: Optional[AzureAppConfigurationKeyVaultOptions] = None,
refresh_on: Optional[List[Tuple[str, str]]] = None,
refresh_interval: int = 30,
on_refresh_success: Optional[Callable] = None,
on_refresh_error: Optional[Callable[[Exception], None]] = None,
feature_flag_enabled: bool = False,
feature_flag_selectors: Optional[List[SettingSelector]] = None,
feature_flag_refresh_enabled: bool = False,
**kwargs,
) -> "AzureAppConfigurationProvider":
"""
Loads configuration settings from Azure App Configuration into a Python application.
:param str endpoint: Endpoint for App Configuration resource.
:param ~azure.core.credentials.TokenCredential credential: Credential for App Configuration resource.
:keyword Optional[List[~azure.appconfiguration.provider.SettingSelector]] selects: List of setting selectors to
filter configuration settings
:keyword Optional[List[str]] trim_prefixes: List of prefixes to trim from configuration keys
:keyword ~azure.core.credentials.TokenCredential keyvault_credential: A credential for authenticating with the key
vault. This is optional if keyvault_client_configs is provided.
:keyword Mapping[str, Mapping] keyvault_client_configs: A Mapping of SecretClient endpoints to client
configurations from azure-keyvault-secrets. This is optional if keyvault_credential is provided. If a credential
isn't provided a credential will need to be in each set for each.
:keyword Callable[[str], str] secret_resolver: A function that takes a URI and returns a value.
:keyword List[Tuple[str, str]] refresh_on: One or more settings whose modification will trigger a full refresh
after a fixed interval. This should be a list of Key-Label pairs for specific settings (filters and wildcards are
not supported).
:keyword int refresh_interval: The minimum time in seconds between when a call to `refresh` will actually trigger a
service call to update the settings. Default value is 30 seconds.
:keyword on_refresh_success: Optional callback to be invoked when a change is found and a successful refresh has
happened.
:paramtype on_refresh_success: Optional[Callable]
:keyword on_refresh_error: Optional callback to be invoked when an error occurs while refreshing settings. If not
specified, errors will be raised.
:paramtype on_refresh_error: Optional[Callable[[Exception], None]]
:keyword feature_flag_enabled: Optional flag to enable or disable the loading of feature flags. Default is False.
:paramtype feature_flag_enabled: bool
:keyword feature_flag_selectors: Optional list of selectors to filter feature flags. By default will load all
feature flags without a label.
:paramtype feature_flag_selectors: List[SettingSelector]
:keyword feature_flag_refresh_enabled: Optional flag to enable or disable the refresh of feature flags. Default is
False.
:paramtype feature_flag_refresh_enabled: bool
:keyword replica_discovery_enabled: Optional flag to enable or disable the discovery of replica endpoints. Default
is True.
:paramtype replica_discovery_enabled: bool
:keyword load_balancing_enabled: Optional flag to enable or disable the load balancing of replica endpoints. Default
is False.
:paramtype load_balancing_enabled: bool
"""
@overload
def load( # pylint: disable=docstring-keyword-should-match-keyword-only
*,
connection_string: str,
selects: Optional[List[SettingSelector]] = None,
trim_prefixes: Optional[List[str]] = None,
keyvault_credential: Optional["TokenCredential"] = None,
keyvault_client_configs: Optional[Mapping[str, JSON]] = None,
secret_resolver: Optional[Callable[[str], str]] = None,
key_vault_options: Optional[AzureAppConfigurationKeyVaultOptions] = None,
refresh_on: Optional[List[Tuple[str, str]]] = None,
refresh_interval: int = 30,
on_refresh_success: Optional[Callable] = None,
on_refresh_error: Optional[Callable[[Exception], None]] = None,
feature_flag_enabled: bool = False,
feature_flag_selectors: Optional[List[SettingSelector]] = None,
feature_flag_refresh_enabled: bool = False,
**kwargs,
) -> "AzureAppConfigurationProvider":
"""
Loads configuration settings from Azure App Configuration into a Python application.
:keyword str connection_string: Connection string for App Configuration resource.
:keyword Optional[List[~azure.appconfiguration.provider.SettingSelector]] selects: List of setting selectors to
filter configuration settings
:keyword trim_prefixes: Optional[List[str]] trim_prefixes: List of prefixes to trim from configuration keys
:keyword ~azure.core.credentials.TokenCredential keyvault_credential: A credential for authenticating with the key
vault. This is optional if keyvault_client_configs is provided.
:keyword Mapping[str, Mapping] keyvault_client_configs: A Mapping of SecretClient endpoints to client
configurations from azure-keyvault-secrets. This is optional if keyvault_credential is provided. If a credential
isn't provided a credential will need to be in each set for each.
:keyword Callable[[str], str] secret_resolver: A function that takes a URI and returns a value.
:keyword List[Tuple[str, str]] refresh_on: One or more settings whose modification will trigger a full refresh
after a fixed interval. This should be a list of Key-Label pairs for specific settings (filters and wildcards are
not supported).
:keyword refresh_on: One or more settings whose modification will trigger a full refresh after a fixed interval.
This should be a list of Key-Label pairs for specific settings (filters and wildcards are not supported).
:paramtype refresh_on: List[Tuple[str, str]]
:keyword int refresh_interval: The minimum time in seconds between when a call to `refresh` will actually trigger a
service call to update the settings. Default value is 30 seconds.
:keyword on_refresh_success: Optional callback to be invoked when a change is found and a successful refresh has
happened.
:paramtype on_refresh_success: Optional[Callable]
:keyword on_refresh_error: Optional callback to be invoked when an error occurs while refreshing settings. If not
specified, errors will be raised.
:paramtype on_refresh_error: Optional[Callable[[Exception], None]]
:keyword feature_flag_enabled: Optional flag to enable or disable the loading of feature flags. Default is False.
:paramtype feature_flag_enabled: bool
:keyword feature_flag_selectors: Optional list of selectors to filter feature flags. By default will load all
feature flags without a label.
:paramtype feature_flag_selectors: List[SettingSelector]
:keyword feature_flag_refresh_enabled: Optional flag to enable or disable the refresh of feature flags. Default is
False.
:paramtype feature_flag_refresh_enabled: bool
:keyword replica_discovery_enabled: Optional flag to enable or disable the discovery of replica endpoints. Default
is True.
:paramtype replica_discovery_enabled: bool
:keyword load_balancing_enabled: Optional flag to enable or disable the load balancing of replica endpoints. Default
is False.
:paramtype load_balancing_enabled: bool
"""
[docs]
def load(*args, **kwargs) -> "AzureAppConfigurationProvider":
endpoint: Optional[str] = kwargs.pop("endpoint", None)
credential: Optional["TokenCredential"] = kwargs.pop("credential", None)
connection_string: Optional[str] = kwargs.pop("connection_string", None)
key_vault_options: Optional[AzureAppConfigurationKeyVaultOptions] = kwargs.pop("key_vault_options", None)
start_time = datetime.datetime.now()
# Update endpoint and credential if specified positionally.
if len(args) > 2:
raise TypeError(
"Unexpected positional parameters. Please pass either endpoint and credential, or a connection string."
)
if len(args) == 1:
if endpoint is not None:
raise TypeError("Received multiple values for parameter 'endpoint'.")
endpoint = args[0]
elif len(args) == 2:
if credential is not None:
raise TypeError("Received multiple values for parameter 'credential'.")
endpoint, credential = args
if (endpoint or credential) and connection_string:
raise ValueError("Please pass either endpoint and credential, or a connection string.")
# Removing use of AzureAppConfigurationKeyVaultOptions
if key_vault_options:
if "keyvault_credential" in kwargs or "secret_resolver" in kwargs or "keyvault_client_configs" in kwargs:
raise ValueError(
"Key Vault configurations should only be set by either the key_vault_options or kwargs not both."
)
kwargs["keyvault_credential"] = key_vault_options.credential
kwargs["secret_resolver"] = key_vault_options.secret_resolver
kwargs["keyvault_client_configs"] = key_vault_options.client_configs
if kwargs.get("keyvault_credential") is not None and kwargs.get("secret_resolver") is not None:
raise ValueError("A keyvault credential and secret resolver can't both be configured.")
uses_key_vault = (
"keyvault_credential" in kwargs
or "keyvault_client_configs" in kwargs
or "secret_resolver" in kwargs
or kwargs.get("uses_key_vault", False)
)
provider = _buildprovider(connection_string, endpoint, credential, uses_key_vault=uses_key_vault, **kwargs)
kwargs = sdk_allowed_kwargs(kwargs)
try:
provider._load_all(**kwargs) # pylint:disable=protected-access
except Exception as e:
delay_failure(start_time)
raise e
return provider
def _buildprovider(
connection_string: Optional[str], endpoint: Optional[str], credential: Optional["TokenCredential"], **kwargs
) -> "AzureAppConfigurationProvider":
# pylint:disable=protected-access
if connection_string:
endpoint = connection_string.split(";")[0].split("=")[1]
if not endpoint:
raise ValueError("No endpoint specified.")
kwargs["endpoint"] = endpoint
kwargs["connection_string"] = connection_string
kwargs["credential"] = credential
return AzureAppConfigurationProvider(**kwargs)
def _resolve_keyvault_reference(
config: "SecretReferenceConfigurationSetting", provider: "AzureAppConfigurationProvider"
) -> str:
# pylint:disable=protected-access
if not (provider._keyvault_credential or provider._keyvault_client_configs or provider._secret_resolver):
raise ValueError(
"""
Either a credential to Key Vault, custom Key Vault client, or a secret resolver must be set to resolve Key
Vault references.
"""
)
if config.secret_id is None:
raise ValueError("Key Vault reference must have a uri value.")
keyvault_identifier = KeyVaultSecretIdentifier(config.secret_id)
vault_url = keyvault_identifier.vault_url + "/"
# pylint:disable=protected-access
referenced_client = provider._secret_clients.get(vault_url, None)
vault_config = provider._keyvault_client_configs.get(vault_url, {})
credential = vault_config.pop("credential", provider._keyvault_credential)
if referenced_client is None and credential is not None:
referenced_client = SecretClient(vault_url=vault_url, credential=credential, **vault_config)
provider._secret_clients[vault_url] = referenced_client
if referenced_client:
secret_value = referenced_client.get_secret(keyvault_identifier.name, version=keyvault_identifier.version).value
if secret_value is not None:
return secret_value
if provider._secret_resolver:
return provider._secret_resolver(config.secret_id)
raise ValueError("No Secret Client found for Key Vault reference %s" % (vault_url))
[docs]
class AzureAppConfigurationProvider(AzureAppConfigurationProviderBase): # pylint: disable=too-many-instance-attributes
"""
Provides a dictionary-like interface to Azure App Configuration settings. Enables loading of sets of configuration
settings from Azure App Configuration into a Python application. Enables trimming of prefixes from configuration
keys. Enables resolution of Key Vault references in configuration settings.
"""
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
if "user_agent" in kwargs:
user_agent = kwargs.pop("user_agent") + " " + USER_AGENT
else:
user_agent = USER_AGENT
interval: int = kwargs.get("refresh_interval", 30)
if interval < 1:
raise ValueError("Refresh interval must be greater than or equal to 1 second.")
min_backoff: int = min(kwargs.pop("min_backoff", 30), interval)
max_backoff: int = min(kwargs.pop("max_backoff", 600), interval)
self._replica_client_manager = ConfigurationClientManager(
connection_string=kwargs.pop("connection_string", None),
endpoint=kwargs.pop("endpoint", None),
credential=kwargs.pop("credential", None),
user_agent=user_agent,
retry_total=kwargs.pop("retry_total", 2),
retry_backoff_max=kwargs.pop("retry_backoff_max", 60),
replica_discovery_enabled=kwargs.pop("replica_discovery_enabled", True),
min_backoff_sec=min_backoff,
max_backoff_sec=max_backoff,
load_balancing_enabled=kwargs.pop("load_balancing_enabled", False),
**kwargs,
)
self._secret_clients: Dict[str, SecretClient] = {}
self._on_refresh_success: Optional[Callable] = kwargs.pop("on_refresh_success", None)
self._on_refresh_error: Optional[Callable[[Exception], None]] = kwargs.pop("on_refresh_error", None)
[docs]
def refresh(self, **kwargs) -> None: # pylint: disable=too-many-statements
if not self._refresh_on and not self._feature_flag_refresh_enabled:
logger.debug("Refresh called but no refresh enabled.")
return
if not self._refresh_timer.needs_refresh():
logger.debug("Refresh called but refresh interval not elapsed.")
return
if not self._refresh_lock.acquire(blocking=False): # pylint: disable= consider-using-with
logger.debug("Refresh called but refresh already in progress.")
return
success = False
need_refresh = False
error_message = """
Failed to refresh configuration settings from Azure App Configuration.
"""
exception: Exception = RuntimeError(error_message)
is_failover_request = False
try:
self._replica_client_manager.refresh_clients()
self._replica_client_manager.find_active_clients()
replica_count = self._replica_client_manager.get_client_count() - 1
while client := self._replica_client_manager.get_next_active_client():
headers = update_correlation_context_header(
kwargs.pop("headers", {}),
"Watch",
replica_count,
self._feature_flag_enabled,
self._feature_filter_usage,
self._uses_key_vault,
self._uses_load_balancing,
is_failover_request,
)
try:
if self._refresh_on:
need_refresh, self._refresh_on, configuration_settings = client.refresh_configuration_settings(
self._selects, self._refresh_on, headers=headers, **kwargs
)
configuration_settings_processed = self._process_configurations(configuration_settings)
if need_refresh:
self._dict = configuration_settings_processed
if self._feature_flag_refresh_enabled:
need_ff_refresh, refresh_on_feature_flags, feature_flags, filters_used = (
client.refresh_feature_flags(
self._refresh_on_feature_flags, self._feature_flag_selectors, headers=headers, **kwargs
)
)
if refresh_on_feature_flags:
self._refresh_on_feature_flags = refresh_on_feature_flags
self._feature_filter_usage = filters_used
if need_refresh or need_ff_refresh:
self._dict[FEATURE_MANAGEMENT_KEY] = {}
self._dict[FEATURE_MANAGEMENT_KEY][FEATURE_FLAG_KEY] = feature_flags
# Even if we don't need to refresh, we should reset the timer
self._refresh_timer.reset()
success = True
break
except AzureError as e:
exception = e
logger.warning("Failed to refresh configurations from endpoint %s", client.endpoint)
self._replica_client_manager.backoff(client)
is_failover_request = True
if not success:
self._refresh_timer.backoff()
if self._on_refresh_error:
self._on_refresh_error(exception)
return
raise exception
if self._on_refresh_success:
self._on_refresh_success()
finally:
self._refresh_lock.release()
def _load_all(self, **kwargs):
self._replica_client_manager.refresh_clients()
self._replica_client_manager.find_active_clients()
is_failover_request = False
replica_count = self._replica_client_manager.get_client_count() - 1
error_message = """
Failed to load configuration settings. No Azure App Configuration stores successfully loaded from.
"""
exception: Exception = RuntimeError(error_message)
while client := self._replica_client_manager.get_next_active_client():
headers = update_correlation_context_header(
kwargs.pop("headers", {}),
"Startup",
replica_count,
self._feature_flag_enabled,
self._feature_filter_usage,
self._uses_key_vault,
self._uses_load_balancing,
is_failover_request,
)
try:
configuration_settings, sentinel_keys = client.load_configuration_settings(
self._selects, self._refresh_on, headers=headers, **kwargs
)
configuration_settings_processed = self._process_configurations(configuration_settings)
if self._feature_flag_enabled:
feature_flags, feature_flag_sentinel_keys, used_filters = client.load_feature_flags(
self._feature_flag_selectors, self._feature_flag_refresh_enabled, headers=headers, **kwargs
)
self._feature_filter_usage = used_filters
configuration_settings_processed[FEATURE_MANAGEMENT_KEY] = {}
configuration_settings_processed[FEATURE_MANAGEMENT_KEY][FEATURE_FLAG_KEY] = feature_flags
self._refresh_on_feature_flags = feature_flag_sentinel_keys
for (key, label), etag in self._refresh_on.items():
if not etag:
try:
sentinel = client.get_configuration_setting(key, label, headers=headers) # type:ignore
self._refresh_on[(key, label)] = sentinel.etag # type:ignore
except HttpResponseError as e:
if e.status_code == 404:
# If the sentinel is not found a refresh should be triggered when it is created.
logger.debug(
"""
WatchKey key: %s label %s was configured but not found. Refresh will be triggered
if created.
""",
key,
label,
)
self._refresh_on[(key, label)] = None # type: ignore
else:
raise e
with self._update_lock:
self._refresh_on = sentinel_keys
self._dict = configuration_settings_processed
return
except AzureError as e:
exception = e
logger.warning("Failed to load configurations from endpoint %s.\n %s", client.endpoint, e.message)
self._replica_client_manager.backoff(client)
is_failover_request = True
raise exception
def _process_configurations(self, configuration_settings: List[ConfigurationSetting]) -> Dict[str, Any]:
configuration_settings_processed = {}
for config in configuration_settings:
if isinstance(config, FeatureFlagConfigurationSetting):
# Feature flags are not processed like other settings
continue
key = self._process_key_name(config)
value = self._process_key_value(config)
configuration_settings_processed[key] = value
if self._feature_flag_enabled and FEATURE_MANAGEMENT_KEY in self._dict:
configuration_settings_processed[FEATURE_MANAGEMENT_KEY] = self._dict[FEATURE_MANAGEMENT_KEY]
return configuration_settings_processed
def _process_key_value(self, config):
if isinstance(config, SecretReferenceConfigurationSetting):
return _resolve_keyvault_reference(config, self)
if is_json_content_type(config.content_type) and not isinstance(config, FeatureFlagConfigurationSetting):
# Feature flags are of type json, but don't treat them as such
try:
return json.loads(config.value)
except json.JSONDecodeError:
# If the value is not a valid JSON, treat it like regular string value
return config.value
return config.value
def __eq__(self, other: Any) -> bool:
if not isinstance(other, AzureAppConfigurationProvider):
return False
if self._dict != other._dict:
return False
if self._trim_prefixes != other._trim_prefixes:
return False
return self._replica_client_manager == other._replica_client_manager
[docs]
def close(self) -> None:
"""
Closes the connection to Azure App Configuration.
"""
for client in self._secret_clients.values():
client.close()
self._replica_client_manager.close()
def __enter__(self) -> "AzureAppConfigurationProvider":
self._replica_client_manager.__enter__()
for client in self._secret_clients.values():
client.__enter__()
return self
def __exit__(self, *args) -> None:
self._replica_client_manager.__exit__()
for client in self._secret_clients.values():
client.__exit__()