Source code for azure.mgmt.redisenterprise.operations._access_policy_assignment_operations

# pylint: disable=line-too-long,useless-suppression
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
from collections.abc import MutableMapping
from io import IOBase
from typing import Any, Callable, Dict, IO, Iterable, Iterator, Optional, TypeVar, Union, cast, overload
import urllib.parse

from azure.core import PipelineClient
from azure.core.exceptions import (
    ClientAuthenticationError,
    HttpResponseError,
    ResourceExistsError,
    ResourceNotFoundError,
    ResourceNotModifiedError,
    StreamClosedError,
    StreamConsumedError,
    map_error,
)
from azure.core.paging import ItemPaged
from azure.core.pipeline import PipelineResponse
from azure.core.polling import LROPoller, NoPolling, PollingMethod
from azure.core.rest import HttpRequest, HttpResponse
from azure.core.tracing.decorator import distributed_trace
from azure.core.utils import case_insensitive_dict
from azure.mgmt.core.exceptions import ARMErrorFormat
from azure.mgmt.core.polling.arm_polling import ARMPolling

from .. import models as _models
from .._configuration import RedisEnterpriseManagementClientConfiguration
from .._serialization import Deserializer, Serializer

T = TypeVar("T")
ClsType = Optional[Callable[[PipelineResponse[HttpRequest, HttpResponse], T, Dict[str, Any]], Any]]

_SERIALIZER = Serializer()
_SERIALIZER.client_side_validation = False


def build_create_update_request(
    resource_group_name: str,
    cluster_name: str,
    database_name: str,
    access_policy_assignment_name: str,
    subscription_id: str,
    **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-05-01-preview"))
    content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Cache/redisEnterprise/{clusterName}/databases/{databaseName}/accessPolicyAssignments/{accessPolicyAssignmentName}",
    )
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "clusterName": _SERIALIZER.url(
            "cluster_name", cluster_name, "str", pattern=r"^(?=.{1,60}$)[A-Za-z0-9]+(-[A-Za-z0-9]+)*$"
        ),
        "databaseName": _SERIALIZER.url(
            "database_name", database_name, "str", pattern=r"^(?=.{1,60}$)[A-Za-z0-9]+(-[A-Za-z0-9]+)*$"
        ),
        "accessPolicyAssignmentName": _SERIALIZER.url(
            "access_policy_assignment_name", access_policy_assignment_name, "str", pattern=r"^[A-Za-z0-9]{1,60}$"
        ),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    if content_type is not None:
        _headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="PUT", url=_url, params=_params, headers=_headers, **kwargs)


def build_get_request(
    resource_group_name: str,
    cluster_name: str,
    database_name: str,
    access_policy_assignment_name: str,
    subscription_id: str,
    **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-05-01-preview"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Cache/redisEnterprise/{clusterName}/databases/{databaseName}/accessPolicyAssignments/{accessPolicyAssignmentName}",
    )
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "clusterName": _SERIALIZER.url(
            "cluster_name", cluster_name, "str", pattern=r"^(?=.{1,60}$)[A-Za-z0-9]+(-[A-Za-z0-9]+)*$"
        ),
        "databaseName": _SERIALIZER.url(
            "database_name", database_name, "str", pattern=r"^(?=.{1,60}$)[A-Za-z0-9]+(-[A-Za-z0-9]+)*$"
        ),
        "accessPolicyAssignmentName": _SERIALIZER.url(
            "access_policy_assignment_name", access_policy_assignment_name, "str", pattern=r"^[A-Za-z0-9]{1,60}$"
        ),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)


def build_delete_request(
    resource_group_name: str,
    cluster_name: str,
    database_name: str,
    access_policy_assignment_name: str,
    subscription_id: str,
    **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-05-01-preview"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Cache/redisEnterprise/{clusterName}/databases/{databaseName}/accessPolicyAssignments/{accessPolicyAssignmentName}",
    )
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "clusterName": _SERIALIZER.url(
            "cluster_name", cluster_name, "str", pattern=r"^(?=.{1,60}$)[A-Za-z0-9]+(-[A-Za-z0-9]+)*$"
        ),
        "databaseName": _SERIALIZER.url(
            "database_name", database_name, "str", pattern=r"^(?=.{1,60}$)[A-Za-z0-9]+(-[A-Za-z0-9]+)*$"
        ),
        "accessPolicyAssignmentName": _SERIALIZER.url(
            "access_policy_assignment_name", access_policy_assignment_name, "str", pattern=r"^[A-Za-z0-9]{1,60}$"
        ),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="DELETE", url=_url, params=_params, headers=_headers, **kwargs)


def build_list_request(
    resource_group_name: str, cluster_name: str, database_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-05-01-preview"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Cache/redisEnterprise/{clusterName}/databases/{databaseName}/accessPolicyAssignments",
    )
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "clusterName": _SERIALIZER.url(
            "cluster_name", cluster_name, "str", pattern=r"^(?=.{1,60}$)[A-Za-z0-9]+(-[A-Za-z0-9]+)*$"
        ),
        "databaseName": _SERIALIZER.url(
            "database_name", database_name, "str", pattern=r"^(?=.{1,60}$)[A-Za-z0-9]+(-[A-Za-z0-9]+)*$"
        ),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)


[docs] class AccessPolicyAssignmentOperations: """ .. warning:: **DO NOT** instantiate this class directly. Instead, you should access the following operations through :class:`~azure.mgmt.redisenterprise.RedisEnterpriseManagementClient`'s :attr:`access_policy_assignment` attribute. """ models = _models def __init__(self, *args, **kwargs): input_args = list(args) self._client: PipelineClient = input_args.pop(0) if input_args else kwargs.pop("client") self._config: RedisEnterpriseManagementClientConfiguration = ( input_args.pop(0) if input_args else kwargs.pop("config") ) self._serialize: Serializer = input_args.pop(0) if input_args else kwargs.pop("serializer") self._deserialize: Deserializer = input_args.pop(0) if input_args else kwargs.pop("deserializer") def _create_update_initial( self, resource_group_name: str, cluster_name: str, database_name: str, access_policy_assignment_name: str, parameters: Union[_models.AccessPolicyAssignment, IO[bytes]], **kwargs: Any ) -> Iterator[bytes]: error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None) content_type = content_type or "application/json" _json = None _content = None if isinstance(parameters, (IOBase, bytes)): _content = parameters else: _json = self._serialize.body(parameters, "AccessPolicyAssignment") _request = build_create_update_request( resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, access_policy_assignment_name=access_policy_assignment_name, subscription_id=self._config.subscription_id, api_version=api_version, content_type=content_type, json=_json, content=_content, headers=_headers, params=_params, ) _request.url = self._client.format_url(_request.url) _decompress = kwargs.pop("decompress", True) _stream = True pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200, 201]: try: response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) deserialized = response.stream_download(self._client._pipeline, decompress=_decompress) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore @overload def begin_create_update( self, resource_group_name: str, cluster_name: str, database_name: str, access_policy_assignment_name: str, parameters: _models.AccessPolicyAssignment, *, content_type: str = "application/json", **kwargs: Any ) -> LROPoller[_models.AccessPolicyAssignment]: """Creates/Updates a particular access policy assignment for a database. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param cluster_name: The name of the Redis Enterprise cluster. Name must be 1-60 characters long. Allowed characters(A-Z, a-z, 0-9) and hyphen(-). There can be no leading nor trailing nor consecutive hyphens. Required. :type cluster_name: str :param database_name: The name of the Redis Enterprise database. Required. :type database_name: str :param access_policy_assignment_name: The name of the Redis Enterprise database access policy assignment. Required. :type access_policy_assignment_name: str :param parameters: Parameters supplied to the create access policy assignment for database. Required. :type parameters: ~azure.mgmt.redisenterprise.models.AccessPolicyAssignment :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: An instance of LROPoller that returns either AccessPolicyAssignment or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.redisenterprise.models.AccessPolicyAssignment] :raises ~azure.core.exceptions.HttpResponseError: """ @overload def begin_create_update( self, resource_group_name: str, cluster_name: str, database_name: str, access_policy_assignment_name: str, parameters: IO[bytes], *, content_type: str = "application/json", **kwargs: Any ) -> LROPoller[_models.AccessPolicyAssignment]: """Creates/Updates a particular access policy assignment for a database. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param cluster_name: The name of the Redis Enterprise cluster. Name must be 1-60 characters long. Allowed characters(A-Z, a-z, 0-9) and hyphen(-). There can be no leading nor trailing nor consecutive hyphens. Required. :type cluster_name: str :param database_name: The name of the Redis Enterprise database. Required. :type database_name: str :param access_policy_assignment_name: The name of the Redis Enterprise database access policy assignment. Required. :type access_policy_assignment_name: str :param parameters: Parameters supplied to the create access policy assignment for database. Required. :type parameters: IO[bytes] :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: An instance of LROPoller that returns either AccessPolicyAssignment or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.redisenterprise.models.AccessPolicyAssignment] :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace def begin_create_update( self, resource_group_name: str, cluster_name: str, database_name: str, access_policy_assignment_name: str, parameters: Union[_models.AccessPolicyAssignment, IO[bytes]], **kwargs: Any ) -> LROPoller[_models.AccessPolicyAssignment]: """Creates/Updates a particular access policy assignment for a database. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param cluster_name: The name of the Redis Enterprise cluster. Name must be 1-60 characters long. Allowed characters(A-Z, a-z, 0-9) and hyphen(-). There can be no leading nor trailing nor consecutive hyphens. Required. :type cluster_name: str :param database_name: The name of the Redis Enterprise database. Required. :type database_name: str :param access_policy_assignment_name: The name of the Redis Enterprise database access policy assignment. Required. :type access_policy_assignment_name: str :param parameters: Parameters supplied to the create access policy assignment for database. Is either a AccessPolicyAssignment type or a IO[bytes] type. Required. :type parameters: ~azure.mgmt.redisenterprise.models.AccessPolicyAssignment or IO[bytes] :return: An instance of LROPoller that returns either AccessPolicyAssignment or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.redisenterprise.models.AccessPolicyAssignment] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[_models.AccessPolicyAssignment] = kwargs.pop("cls", None) polling: Union[bool, PollingMethod] = kwargs.pop("polling", True) lro_delay = kwargs.pop("polling_interval", self._config.polling_interval) cont_token: Optional[str] = kwargs.pop("continuation_token", None) if cont_token is None: raw_result = self._create_update_initial( resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, access_policy_assignment_name=access_policy_assignment_name, parameters=parameters, api_version=api_version, content_type=content_type, cls=lambda x, y, z: x, headers=_headers, params=_params, **kwargs ) raw_result.http_response.read() # type: ignore kwargs.pop("error_map", None) def get_long_running_output(pipeline_response): deserialized = self._deserialize("AccessPolicyAssignment", pipeline_response.http_response) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized if polling is True: polling_method: PollingMethod = cast( PollingMethod, ARMPolling(lro_delay, lro_options={"final-state-via": "original-uri"}, **kwargs) ) elif polling is False: polling_method = cast(PollingMethod, NoPolling()) else: polling_method = polling if cont_token: return LROPoller[_models.AccessPolicyAssignment].from_continuation_token( polling_method=polling_method, continuation_token=cont_token, client=self._client, deserialization_callback=get_long_running_output, ) return LROPoller[_models.AccessPolicyAssignment]( self._client, raw_result, get_long_running_output, polling_method # type: ignore )
[docs] @distributed_trace def get( self, resource_group_name: str, cluster_name: str, database_name: str, access_policy_assignment_name: str, **kwargs: Any ) -> _models.AccessPolicyAssignment: """Gets information about access policy assignment for database. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param cluster_name: The name of the Redis Enterprise cluster. Name must be 1-60 characters long. Allowed characters(A-Z, a-z, 0-9) and hyphen(-). There can be no leading nor trailing nor consecutive hyphens. Required. :type cluster_name: str :param database_name: The name of the Redis Enterprise database. Required. :type database_name: str :param access_policy_assignment_name: The name of the Redis Enterprise database access policy assignment. Required. :type access_policy_assignment_name: str :return: AccessPolicyAssignment or the result of cls(response) :rtype: ~azure.mgmt.redisenterprise.models.AccessPolicyAssignment :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[_models.AccessPolicyAssignment] = kwargs.pop("cls", None) _request = build_get_request( resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, access_policy_assignment_name=access_policy_assignment_name, subscription_id=self._config.subscription_id, api_version=api_version, headers=_headers, params=_params, ) _request.url = self._client.format_url(_request.url) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) deserialized = self._deserialize("AccessPolicyAssignment", pipeline_response.http_response) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
def _delete_initial( self, resource_group_name: str, cluster_name: str, database_name: str, access_policy_assignment_name: str, **kwargs: Any ) -> Iterator[bytes]: error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None) _request = build_delete_request( resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, access_policy_assignment_name=access_policy_assignment_name, subscription_id=self._config.subscription_id, api_version=api_version, headers=_headers, params=_params, ) _request.url = self._client.format_url(_request.url) _decompress = kwargs.pop("decompress", True) _stream = True pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [202, 204]: try: response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) response_headers = {} if response.status_code == 202: response_headers["Location"] = self._deserialize("str", response.headers.get("Location")) response_headers["Azure-AsyncOperation"] = self._deserialize( "str", response.headers.get("Azure-AsyncOperation") ) deserialized = response.stream_download(self._client._pipeline, decompress=_decompress) if cls: return cls(pipeline_response, deserialized, response_headers) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace def begin_delete( self, resource_group_name: str, cluster_name: str, database_name: str, access_policy_assignment_name: str, **kwargs: Any ) -> LROPoller[None]: """Deletes a single access policy assignment. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param cluster_name: The name of the Redis Enterprise cluster. Name must be 1-60 characters long. Allowed characters(A-Z, a-z, 0-9) and hyphen(-). There can be no leading nor trailing nor consecutive hyphens. Required. :type cluster_name: str :param database_name: The name of the Redis Enterprise database. Required. :type database_name: str :param access_policy_assignment_name: The name of the Redis Enterprise database access policy assignment. Required. :type access_policy_assignment_name: str :return: An instance of LROPoller that returns either None or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[None] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[None] = kwargs.pop("cls", None) polling: Union[bool, PollingMethod] = kwargs.pop("polling", True) lro_delay = kwargs.pop("polling_interval", self._config.polling_interval) cont_token: Optional[str] = kwargs.pop("continuation_token", None) if cont_token is None: raw_result = self._delete_initial( resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, access_policy_assignment_name=access_policy_assignment_name, api_version=api_version, cls=lambda x, y, z: x, headers=_headers, params=_params, **kwargs ) raw_result.http_response.read() # type: ignore kwargs.pop("error_map", None) def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements if cls: return cls(pipeline_response, None, {}) # type: ignore if polling is True: polling_method: PollingMethod = cast( PollingMethod, ARMPolling(lro_delay, lro_options={"final-state-via": "azure-async-operation"}, **kwargs) ) elif polling is False: polling_method = cast(PollingMethod, NoPolling()) else: polling_method = polling if cont_token: return LROPoller[None].from_continuation_token( polling_method=polling_method, continuation_token=cont_token, client=self._client, deserialization_callback=get_long_running_output, ) return LROPoller[None](self._client, raw_result, get_long_running_output, polling_method) # type: ignore
[docs] @distributed_trace def list( self, resource_group_name: str, cluster_name: str, database_name: str, **kwargs: Any ) -> Iterable["_models.AccessPolicyAssignment"]: """Gets all access policy assignments.. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param cluster_name: The name of the Redis Enterprise cluster. Name must be 1-60 characters long. Allowed characters(A-Z, a-z, 0-9) and hyphen(-). There can be no leading nor trailing nor consecutive hyphens. Required. :type cluster_name: str :param database_name: The name of the Redis Enterprise database. Required. :type database_name: str :return: An iterator like instance of either AccessPolicyAssignment or the result of cls(response) :rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.redisenterprise.models.AccessPolicyAssignment] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[_models.AccessPolicyAssignmentList] = kwargs.pop("cls", None) error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) def prepare_request(next_link=None): if not next_link: _request = build_list_request( resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, subscription_id=self._config.subscription_id, api_version=api_version, headers=_headers, params=_params, ) _request.url = self._client.format_url(_request.url) else: # make call to next link with the client's api-version _parsed_next_link = urllib.parse.urlparse(next_link) _next_request_params = case_insensitive_dict( { key: [urllib.parse.quote(v) for v in value] for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items() } ) _next_request_params["api-version"] = self._config.api_version _request = HttpRequest( "GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params ) _request.url = self._client.format_url(_request.url) _request.method = "GET" return _request def extract_data(pipeline_response): deserialized = self._deserialize("AccessPolicyAssignmentList", pipeline_response) list_of_elem = deserialized.value if cls: list_of_elem = cls(list_of_elem) # type: ignore return deserialized.next_link or None, iter(list_of_elem) def get_next(next_link=None): _request = prepare_request(next_link) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) return pipeline_response return ItemPaged(get_next, extract_data)