Source code for azure.identity._credentials.default

# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import logging
import os
from typing import List, Any, Optional, cast

from azure.core.credentials import (
    AccessToken,
    AccessTokenInfo,
    TokenRequestOptions,
    SupportsTokenInfo,
    TokenCredential,
)
from .. import CredentialUnavailableError
from .._constants import EnvironmentVariables
from .._internal.utils import get_default_authority, normalize_authority, within_dac, process_credential_exclusions
from .azure_powershell import AzurePowerShellCredential
from .broker import BrokerCredential
from .browser import InteractiveBrowserCredential
from .chained import ChainedTokenCredential
from .environment import EnvironmentCredential
from .managed_identity import ManagedIdentityCredential
from .shared_cache import SharedTokenCacheCredential
from .azure_cli import AzureCliCredential
from .azd_cli import AzureDeveloperCliCredential
from .vscode import VisualStudioCodeCredential
from .workload_identity import WorkloadIdentityCredential

_LOGGER = logging.getLogger(__name__)


class FailedDACCredential:
    """This acts as a substitute for a credential that has failed to initialize in the DAC chain.

    This allows instantiation errors to be reported in ChainTokenCredential if all token requests fail.
    """

    def __init__(self, credential_name: str, error: str) -> None:
        self._error = error
        self._credential_name = credential_name

    def get_token(self, *scopes: str, **kwargs: Any) -> AccessToken:
        raise CredentialUnavailableError(self._error)

    def get_token_info(self, *scopes, options: Optional[TokenRequestOptions] = None, **kwargs: Any) -> AccessTokenInfo:
        raise CredentialUnavailableError(self._error)

    def __enter__(self) -> "FailedDACCredential":
        return self

    def __exit__(self, *args: Any) -> None:
        pass

    def close(self) -> None:
        pass


[docs] class DefaultAzureCredential(ChainedTokenCredential): """A credential capable of handling most Azure SDK authentication scenarios. For more information, See `Usage guidance for DefaultAzureCredential <"https://aka.ms/azsdk/python/identity/credential-chains#usage-guidance-for-defaultazurecredential">`__. The identity it uses depends on the environment. When an access token is needed, it requests one using these identities in turn, stopping when one provides a token: 1. A service principal configured by environment variables. See :class:`~azure.identity.EnvironmentCredential` for more details. 2. WorkloadIdentityCredential if environment variable configuration is set by the Azure workload identity webhook. 3. An Azure managed identity. See :class:`~azure.identity.ManagedIdentityCredential` for more details. 4. On Windows only: a user who has signed in with a Microsoft application, such as Visual Studio. If multiple identities are in the cache, then the value of the environment variable ``AZURE_USERNAME`` is used to select which identity to use. See :class:`~azure.identity.SharedTokenCacheCredential` for more details. 5. The identity currently logged in to the Azure CLI. 6. The identity currently logged in to Azure PowerShell. 7. The identity currently logged in to the Azure Developer CLI. 8. Brokered authentication. On Windows and WSL only, this uses the default account logged in via Web Account Manager (WAM) if the `azure-identity-broker` package is installed. This default behavior is configurable with keyword arguments. :keyword str authority: Authority of a Microsoft Entra endpoint, for example 'login.microsoftonline.com', the authority for Azure Public Cloud (which is the default). :class:`~azure.identity.AzureAuthorityHosts` defines authorities for other clouds. Managed identities ignore this because they reside in a single cloud. :keyword bool exclude_workload_identity_credential: Whether to exclude the workload identity from the credential. Defaults to **False**. :keyword bool exclude_developer_cli_credential: Whether to exclude the Azure Developer CLI from the credential. Defaults to **False**. :keyword bool exclude_cli_credential: Whether to exclude the Azure CLI from the credential. Defaults to **False**. :keyword bool exclude_environment_credential: Whether to exclude a service principal configured by environment variables from the credential. Defaults to **False**. :keyword bool exclude_managed_identity_credential: Whether to exclude managed identity from the credential. Defaults to **False**. :keyword bool exclude_powershell_credential: Whether to exclude Azure PowerShell. Defaults to **False**. :keyword bool exclude_visual_studio_code_credential: Whether to exclude stored credential from VS Code. Defaults to **True**. :keyword bool exclude_shared_token_cache_credential: Whether to exclude the shared token cache. Defaults to **False**. :keyword bool exclude_interactive_browser_credential: Whether to exclude interactive browser authentication (see :class:`~azure.identity.InteractiveBrowserCredential`). Defaults to **True**. :keyword bool exclude_broker_credential: Whether to exclude the broker credential from the credential chain. Defaults to **False**. :keyword str interactive_browser_tenant_id: Tenant ID to use when authenticating a user through :class:`~azure.identity.InteractiveBrowserCredential`. Defaults to the value of environment variable AZURE_TENANT_ID, if any. If unspecified, users will authenticate in their home tenants. :keyword str broker_tenant_id: The tenant ID to use when using brokered authentication. Defaults to the value of environment variable AZURE_TENANT_ID, if any. If unspecified, users will authenticate in their home tenants. :keyword str managed_identity_client_id: The client ID of a user-assigned managed identity. Defaults to the value of the environment variable AZURE_CLIENT_ID, if any. If not specified, a system-assigned identity will be used. :keyword str workload_identity_client_id: The client ID of an identity assigned to the pod. Defaults to the value of the environment variable AZURE_CLIENT_ID, if any. If not specified, the pod's default identity will be used. :keyword str workload_identity_tenant_id: Preferred tenant for :class:`~azure.identity.WorkloadIdentityCredential`. Defaults to the value of environment variable AZURE_TENANT_ID, if any. :keyword str interactive_browser_client_id: The client ID to be used in interactive browser credential. If not specified, users will authenticate to an Azure development application. :keyword str broker_client_id: The client ID to be used in brokered authentication. If not specified, users will authenticate to an Azure development application. :keyword str shared_cache_username: Preferred username for :class:`~azure.identity.SharedTokenCacheCredential`. Defaults to the value of environment variable AZURE_USERNAME, if any. :keyword str shared_cache_tenant_id: Preferred tenant for :class:`~azure.identity.SharedTokenCacheCredential`. Defaults to the value of environment variable AZURE_TENANT_ID, if any. :keyword str visual_studio_code_tenant_id: Tenant ID to use when authenticating with :class:`~azure.identity.VisualStudioCodeCredential`. Defaults to the tenant specified in the authentication record file used by the Azure Resources extension. :keyword int process_timeout: The timeout in seconds to use for developer credentials that run subprocesses (e.g. AzureCliCredential, AzurePowerShellCredential). Defaults to **10** seconds. :keyword bool require_envvar: If **True**, require that the AZURE_TOKEN_CREDENTIALS environment variable be set to a value denoting the credential type or credential group to use. If unset or empty, DefaultAzureCredential will raise a `ValueError`. Defaults to **False**. .. admonition:: Example: .. literalinclude:: ../samples/credential_creation_code_snippets.py :start-after: [START create_default_credential] :end-before: [END create_default_credential] :language: python :dedent: 4 :caption: Create a DefaultAzureCredential. """ def __init__(self, **kwargs: Any) -> None: # pylint: disable=too-many-statements, too-many-locals if "tenant_id" in kwargs: raise TypeError("'tenant_id' is not supported in DefaultAzureCredential.") authority = kwargs.pop("authority", None) authority = normalize_authority(authority) if authority else get_default_authority() vscode_tenant_id = kwargs.pop("visual_studio_code_tenant_id", None) interactive_browser_tenant_id = kwargs.pop( "interactive_browser_tenant_id", os.environ.get(EnvironmentVariables.AZURE_TENANT_ID) ) managed_identity_client_id = kwargs.pop( "managed_identity_client_id", os.environ.get(EnvironmentVariables.AZURE_CLIENT_ID) ) workload_identity_client_id = kwargs.pop("workload_identity_client_id", managed_identity_client_id) workload_identity_tenant_id = kwargs.pop( "workload_identity_tenant_id", os.environ.get(EnvironmentVariables.AZURE_TENANT_ID) ) interactive_browser_client_id = kwargs.pop("interactive_browser_client_id", None) broker_tenant_id = kwargs.pop("broker_tenant_id", os.environ.get(EnvironmentVariables.AZURE_TENANT_ID)) broker_client_id = kwargs.pop("broker_client_id", None) shared_cache_username = kwargs.pop("shared_cache_username", os.environ.get(EnvironmentVariables.AZURE_USERNAME)) shared_cache_tenant_id = kwargs.pop( "shared_cache_tenant_id", os.environ.get(EnvironmentVariables.AZURE_TENANT_ID) ) process_timeout = kwargs.pop("process_timeout", 10) require_envvar = kwargs.pop("require_envvar", False) if require_envvar and not os.environ.get(EnvironmentVariables.AZURE_TOKEN_CREDENTIALS): raise ValueError( "AZURE_TOKEN_CREDENTIALS environment variable is required but is not set or is empty. " "Set it to 'dev', 'prod', or a specific credential name." ) # Define credential configuration mapping credential_config = { "environment": { "exclude_param": "exclude_environment_credential", "env_name": "environmentcredential", "default_exclude": False, }, "workload_identity": { "exclude_param": "exclude_workload_identity_credential", "env_name": "workloadidentitycredential", "default_exclude": False, }, "managed_identity": { "exclude_param": "exclude_managed_identity_credential", "env_name": "managedidentitycredential", "default_exclude": False, }, "shared_token_cache": { "exclude_param": "exclude_shared_token_cache_credential", "default_exclude": False, }, "visual_studio_code": { "exclude_param": "exclude_visual_studio_code_credential", "env_name": "visualstudiocodecredential", "default_exclude": False, }, "cli": { "exclude_param": "exclude_cli_credential", "env_name": "azureclicredential", "default_exclude": False, }, "developer_cli": { "exclude_param": "exclude_developer_cli_credential", "env_name": "azuredeveloperclicredential", "default_exclude": False, }, "powershell": { "exclude_param": "exclude_powershell_credential", "env_name": "azurepowershellcredential", "default_exclude": False, }, "interactive_browser": { "exclude_param": "exclude_interactive_browser_credential", "env_name": "interactivebrowsercredential", "default_exclude": True, }, "broker": { "exclude_param": "exclude_broker_credential", "default_exclude": False, }, } # Extract user-provided exclude flags and set defaults exclude_flags = {} user_excludes = {} for cred_key, config in credential_config.items(): param_name = cast(str, config["exclude_param"]) user_excludes[cred_key] = kwargs.pop(param_name, None) exclude_flags[cred_key] = config["default_exclude"] # Process AZURE_TOKEN_CREDENTIALS environment variable and apply user overrides exclude_flags = process_credential_exclusions(credential_config, exclude_flags, user_excludes) # Extract individual exclude flags for backward compatibility exclude_environment_credential = exclude_flags["environment"] exclude_workload_identity_credential = exclude_flags["workload_identity"] exclude_managed_identity_credential = exclude_flags["managed_identity"] exclude_shared_token_cache_credential = exclude_flags["shared_token_cache"] exclude_visual_studio_code_credential = exclude_flags["visual_studio_code"] exclude_cli_credential = exclude_flags["cli"] exclude_developer_cli_credential = exclude_flags["developer_cli"] exclude_powershell_credential = exclude_flags["powershell"] exclude_interactive_browser_credential = exclude_flags["interactive_browser"] exclude_broker_credential = exclude_flags["broker"] credentials: List[SupportsTokenInfo] = [] within_dac.set(True) if not exclude_environment_credential: credentials.append(EnvironmentCredential(authority=authority, _within_dac=True, **kwargs)) if not exclude_workload_identity_credential: try: credentials.append( WorkloadIdentityCredential( client_id=cast(str, workload_identity_client_id), tenant_id=workload_identity_tenant_id, token_file_path=os.environ.get(EnvironmentVariables.AZURE_FEDERATED_TOKEN_FILE), **kwargs, ) ) except ValueError as ex: credentials.append(FailedDACCredential("WorkloadIdentityCredential", error=str(ex))) if not exclude_managed_identity_credential: credentials.append( ManagedIdentityCredential( client_id=managed_identity_client_id, _exclude_workload_identity_credential=exclude_workload_identity_credential, **kwargs, ) ) if not exclude_shared_token_cache_credential and SharedTokenCacheCredential.supported(): try: # username and/or tenant_id are only required when the cache contains tokens for multiple identities shared_cache = SharedTokenCacheCredential( username=shared_cache_username, tenant_id=shared_cache_tenant_id, authority=authority, **kwargs ) credentials.append(shared_cache) except Exception as ex: # pylint:disable=broad-except _LOGGER.info("Shared token cache is unavailable: '%s'", ex) if not exclude_visual_studio_code_credential: credentials.append(VisualStudioCodeCredential(tenant_id=vscode_tenant_id)) if not exclude_cli_credential: credentials.append(AzureCliCredential(process_timeout=process_timeout)) if not exclude_powershell_credential: credentials.append(AzurePowerShellCredential(process_timeout=process_timeout)) if not exclude_developer_cli_credential: credentials.append(AzureDeveloperCliCredential(process_timeout=process_timeout)) if not exclude_interactive_browser_credential: if interactive_browser_client_id: credentials.append( InteractiveBrowserCredential( tenant_id=interactive_browser_tenant_id, client_id=interactive_browser_client_id, **kwargs ) ) else: credentials.append(InteractiveBrowserCredential(tenant_id=interactive_browser_tenant_id, **kwargs)) if not exclude_broker_credential: broker_credential_args = {"tenant_id": broker_tenant_id, **kwargs} if broker_client_id: broker_credential_args["client_id"] = broker_client_id credentials.append(BrokerCredential(**broker_credential_args)) within_dac.set(False) super(DefaultAzureCredential, self).__init__(*credentials)
[docs] def get_token( self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None, **kwargs: Any ) -> AccessToken: """Request an access token for `scopes`. This method is called automatically by Azure SDK clients. :param str scopes: desired scopes for the access token. This method requires at least one scope. For more information about scopes, see https://learn.microsoft.com/entra/identity-platform/scopes-oidc. :keyword str claims: additional claims required in the token, such as those returned in a resource provider's claims challenge following an authorization failure. :keyword str tenant_id: optional tenant to include in the token request. :return: An access token with the desired scopes. :rtype: ~azure.core.credentials.AccessToken :raises ~azure.core.exceptions.ClientAuthenticationError: authentication failed. The exception has a `message` attribute listing each authentication attempt and its error message. """ if self._successful_credential: token = cast(TokenCredential, self._successful_credential).get_token( *scopes, claims=claims, tenant_id=tenant_id, **kwargs ) _LOGGER.info( "%s acquired a token from %s", self.__class__.__name__, self._successful_credential.__class__.__name__ ) return token within_dac.set(True) try: token = super().get_token(*scopes, claims=claims, tenant_id=tenant_id, **kwargs) finally: within_dac.set(False) return token
[docs] def get_token_info(self, *scopes: str, options: Optional[TokenRequestOptions] = None) -> AccessTokenInfo: """Request an access token for `scopes`. This is an alternative to `get_token` to enable certain scenarios that require additional properties on the token. This method is called automatically by Azure SDK clients. :param str scopes: desired scopes for the access token. This method requires at least one scope. For more information about scopes, see https://learn.microsoft.com/entra/identity-platform/scopes-oidc. :keyword options: A dictionary of options for the token request. Unknown options will be ignored. Optional. :paramtype options: ~azure.core.credentials.TokenRequestOptions :rtype: ~azure.core.credentials.AccessTokenInfo :return: An AccessTokenInfo instance containing information about the token. :raises ~azure.core.exceptions.ClientAuthenticationError: authentication failed. The exception has a `message` attribute listing each authentication attempt and its error message. """ if self._successful_credential: token_info = cast(SupportsTokenInfo, self._successful_credential).get_token_info(*scopes, options=options) _LOGGER.info( "%s acquired a token from %s", self.__class__.__name__, self._successful_credential.__class__.__name__ ) return token_info within_dac.set(True) try: token_info = cast(SupportsTokenInfo, super()).get_token_info(*scopes, options=options) finally: within_dac.set(False) return token_info