# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import socket
import sys
from typing import Dict, Any, Mapping, Union, cast
import msal
from azure.core.exceptions import ClientAuthenticationError
from azure.core.credentials import TokenRequestOptions
from azure.identity._credentials import (
InteractiveBrowserCredential as _InteractiveBrowserCredential,
) # pylint:disable=protected-access
from azure.identity._exceptions import CredentialUnavailableError # pylint:disable=protected-access
from azure.identity._internal.utils import within_dac # pylint:disable=protected-access
from ._utils import wrap_exceptions, resolve_tenant, is_wsl
[docs]
class PopTokenRequestOptions(TokenRequestOptions):
"""Options to use for Proof-of-Possession (PoP) token requests."""
pop: Union[bool, Mapping[str, str]]
"""PoP token request options.
- If not specified or False, a non-PoP token request is made.
- If True, an mTLS PoP token request is made.
- If a dict, a Signed HTTP Request (SHR) PoP token request is made. The dict
must contain the "resource_request_method", "resource_request_url", and "nonce" keys.
"""
[docs]
class InteractiveBrowserBrokerCredential(_InteractiveBrowserCredential):
"""Uses an authentication broker to interactively sign in a user.
Currently, only the following brokers are supported:
- Web Account Manager (WAM) on Windows
- Company Portal on macOS
Users on Linux will be authenticated through the browser.
:func:`~get_token` opens a browser to a login URL provided by Microsoft Entra ID and authenticates a user
there with the authorization code flow, using PKCE (Proof Key for Code Exchange) internally to protect the code.
:keyword str authority: Authority of a Microsoft Entra endpoint, for example "login.microsoftonline.com",
the authority for Azure Public Cloud (which is the default). :class:`~azure.identity.AzureAuthorityHosts`
defines authorities for other clouds.
:keyword str tenant_id: a Microsoft Entra tenant ID. Defaults to the "organizations" tenant, which can
authenticate work or school accounts.
:keyword str client_id: Client ID of the Microsoft Entra application users will sign in to. If
unspecified, users will authenticate to an Azure development application.
:keyword str login_hint: a username suggestion to pre-fill the login page's username/email address field. A user
may still log in with a different username.
:keyword cache_persistence_options: configuration for persistent token caching. If unspecified, the credential
will cache tokens in memory.
:paramtype cache_persistence_options: ~azure.identity.TokenCachePersistenceOptions
:keyword int timeout: seconds to wait for the user to complete authentication. Defaults to 300 (5 minutes).
:keyword int parent_window_handle: If your app is a GUI app running on Windows 10+ or macOS, you
are required to also provide its window handle, so that the sign in UI window will properly pop up on top
of your window.
:keyword bool use_default_broker_account: Enables automatically using the default broker account for
authentication instead of prompting the user with an account picker. This is currently only supported on Windows
and WSL. Defaults to False.
:keyword bool enable_msa_passthrough: Determines whether Microsoft Account (MSA) passthrough is enabled. Note, this
is only needed for select legacy first-party applications. Defaults to False.
:keyword bool disable_instance_discovery: Determines whether or not instance discovery is performed when attempting
to authenticate. Setting this to true will completely disable both instance discovery and authority validation.
This functionality is intended for use in scenarios where the metadata endpoint cannot be reached, such as in
private clouds or Azure Stack. The process of instance discovery entails retrieving authority metadata from
https://login.microsoft.com/ to validate the authority. By setting this to **True**, the validation of the
authority is disabled. As a result, it is crucial to ensure that the configured authority host is valid and
trustworthy.
:keyword bool enable_support_logging: Enables additional support logging in the underlying MSAL library.
This logging potentially contains personally identifiable information and is intended to be used only for
troubleshooting purposes.
:raises ValueError: invalid **redirect_uri**
"""
def __init__(self, **kwargs: Any) -> None:
self._parent_window_handle = kwargs.pop("parent_window_handle", None)
self._enable_msa_passthrough = kwargs.pop("enable_msa_passthrough", False)
self._use_default_broker_account = kwargs.pop("use_default_broker_account", False)
self._disable_interactive_fallback = kwargs.pop("disable_interactive_fallback", False)
super().__init__(**kwargs)
@wrap_exceptions
def _request_token(self, *scopes: str, **kwargs: Any) -> Dict:
scopes_list = list(scopes)
claims = kwargs.get("claims")
pop = kwargs.get("pop")
app = cast(msal.PublicClientApplication, self._get_app(**kwargs))
auth_scheme = None
if pop:
auth_scheme = msal.PopAuthScheme(
http_method=pop["resource_request_method"], url=pop["resource_request_url"], nonce=pop["nonce"]
)
result = {}
# Try silent authentication first if use_default_broker_account is enabled.
if self._use_default_broker_account:
try:
result = self._attempt_token_acquisition(app, scopes_list, claims, auth_scheme, msal.Prompt.NONE)
if "access_token" in result:
return result
except Exception as ex: # pylint: disable=broad-except
if self._disable_interactive_fallback:
raise CredentialUnavailableError(
message="Failed to authenticate user with default broker account."
) from ex
# Raise on error if interactive fallback is disabled.
if self._disable_interactive_fallback:
self._check_result(result)
# If we reach here, we need to try interactive authentication.
if not self._disable_interactive_fallback:
try:
result = self._attempt_token_acquisition(
app, scopes_list, claims, auth_scheme, msal.Prompt.SELECT_ACCOUNT
)
except socket.error as ex:
raise CredentialUnavailableError(message="Couldn't start an HTTP server.") from ex
except Exception as ex: # pylint: disable=broad-except
# Fallback to non-broker app only on non-Windows/WSL platforms.
if not (sys.platform.startswith("win") or is_wsl()):
app = cast(msal.PublicClientApplication, self._disable_broker_on_app(**kwargs))
result = self._attempt_token_acquisition(app, scopes_list, claims, None, msal.Prompt.SELECT_ACCOUNT)
else:
raise CredentialUnavailableError(message="Failed to authenticate user with broker account.") from ex
self._check_result(result)
return result
def _attempt_token_acquisition(
self,
app: msal.PublicClientApplication,
scopes_list: list,
claims: Any,
auth_scheme: Any,
prompt: str,
) -> Dict:
port = self._parsed_url.port if self._parsed_url else None
return app.acquire_token_interactive(
scopes=scopes_list,
login_hint=self._login_hint,
claims_challenge=claims,
timeout=self._timeout,
prompt=prompt,
port=port,
parent_window_handle=self._parent_window_handle,
enable_msa_passthrough=self._enable_msa_passthrough,
auth_scheme=auth_scheme,
)
def _check_result(self, result: Dict[str, Any]) -> None:
if "access_token" not in result and "error_description" in result:
if within_dac.get():
raise CredentialUnavailableError(message=result["error_description"])
raise ClientAuthenticationError(message=result.get("error_description"))
if "access_token" not in result:
if within_dac.get():
raise CredentialUnavailableError(message="Failed to authenticate user")
raise ClientAuthenticationError(message="Failed to authenticate user")
def _get_app(self, **kwargs: Any) -> msal.ClientApplication:
tenant_id = resolve_tenant(
self._tenant_id, additionally_allowed_tenants=self._additionally_allowed_tenants, **kwargs
)
client_applications_map = self._client_applications
capabilities = None
token_cache = self._cache
app_class = msal.PublicClientApplication
if kwargs.get("enable_cae"):
client_applications_map = self._cae_client_applications
capabilities = ["CP1"]
token_cache = self._cae_cache
if not token_cache:
token_cache = self._initialize_cache(is_cae=bool(kwargs.get("enable_cae")))
if tenant_id not in client_applications_map:
client_applications_map[tenant_id] = app_class(
client_id=self._client_id,
client_credential=self._client_credential,
client_capabilities=capabilities,
authority="{}/{}".format(self._authority, tenant_id),
azure_region=self._regional_authority,
token_cache=token_cache,
http_client=self._client,
instance_discovery=self._instance_discovery,
enable_broker_on_windows=True,
enable_broker_on_mac=True,
enable_broker_on_linux=True,
enable_broker_on_wsl=True,
enable_pii_log=self._enable_support_logging,
)
return client_applications_map[tenant_id]
def _disable_broker_on_app(self, **kwargs: Any) -> msal.ClientApplication:
tenant_id = resolve_tenant(
self._tenant_id, additionally_allowed_tenants=self._additionally_allowed_tenants, **kwargs
)
client_applications_map = self._client_applications
capabilities = None
token_cache = self._cache
app_class = msal.PublicClientApplication
if kwargs.get("enable_cae"):
client_applications_map = self._cae_client_applications
capabilities = ["CP1"]
token_cache = self._cae_cache
if not token_cache:
token_cache = self._initialize_cache(is_cae=bool(kwargs.get("enable_cae")))
client_applications_map[tenant_id] = app_class(
client_id=self._client_id,
client_credential=self._client_credential,
client_capabilities=capabilities,
authority="{}/{}".format(self._authority, tenant_id),
azure_region=self._regional_authority,
token_cache=token_cache,
http_client=self._client,
instance_discovery=self._instance_discovery,
enable_broker_on_windows=False,
enable_broker_on_mac=False,
enable_broker_on_linux=False,
enable_broker_on_wsl=False,
enable_pii_log=self._enable_support_logging,
)
return client_applications_map[tenant_id]