# pylint: disable=line-too-long,useless-suppression,too-many-lines
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
from collections.abc import MutableMapping
from io import IOBase
from typing import Any, Callable, Dict, IO, Iterable, Iterator, Optional, TypeVar, Union, cast, overload
import urllib.parse
from azure.core import PipelineClient
from azure.core.exceptions import (
ClientAuthenticationError,
HttpResponseError,
ResourceExistsError,
ResourceNotFoundError,
ResourceNotModifiedError,
StreamClosedError,
StreamConsumedError,
map_error,
)
from azure.core.paging import ItemPaged
from azure.core.pipeline import PipelineResponse
from azure.core.polling import LROPoller, NoPolling, PollingMethod
from azure.core.rest import HttpRequest, HttpResponse
from azure.core.tracing.decorator import distributed_trace
from azure.core.utils import case_insensitive_dict
from azure.mgmt.core.exceptions import ARMErrorFormat
from azure.mgmt.core.polling.arm_polling import ARMPolling
from .. import models as _models
from .._configuration import RedisEnterpriseManagementClientConfiguration
from .._serialization import Deserializer, Serializer
T = TypeVar("T")
ClsType = Optional[Callable[[PipelineResponse[HttpRequest, HttpResponse], T, Dict[str, Any]], Any]]
_SERIALIZER = Serializer()
_SERIALIZER.client_side_validation = False
def build_list_by_cluster_request(
resource_group_name: str, cluster_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-05-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Cache/redisEnterprise/{clusterName}/databases",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"clusterName": _SERIALIZER.url(
"cluster_name", cluster_name, "str", pattern=r"^(?=.{1,60}$)[A-Za-z0-9]+(-[A-Za-z0-9]+)*$"
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_create_request(
resource_group_name: str, cluster_name: str, database_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-05-01-preview"))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Cache/redisEnterprise/{clusterName}/databases/{databaseName}",
)
path_format_arguments = {
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"clusterName": _SERIALIZER.url(
"cluster_name", cluster_name, "str", pattern=r"^(?=.{1,60}$)[A-Za-z0-9]+(-[A-Za-z0-9]+)*$"
),
"databaseName": _SERIALIZER.url(
"database_name", database_name, "str", pattern=r"^(?=.{1,60}$)[A-Za-z0-9]+(-[A-Za-z0-9]+)*$"
),
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="PUT", url=_url, params=_params, headers=_headers, **kwargs)
def build_update_request(
resource_group_name: str, cluster_name: str, database_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-05-01-preview"))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Cache/redisEnterprise/{clusterName}/databases/{databaseName}",
)
path_format_arguments = {
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"clusterName": _SERIALIZER.url(
"cluster_name", cluster_name, "str", pattern=r"^(?=.{1,60}$)[A-Za-z0-9]+(-[A-Za-z0-9]+)*$"
),
"databaseName": _SERIALIZER.url(
"database_name", database_name, "str", pattern=r"^(?=.{1,60}$)[A-Za-z0-9]+(-[A-Za-z0-9]+)*$"
),
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="PATCH", url=_url, params=_params, headers=_headers, **kwargs)
def build_get_request(
resource_group_name: str, cluster_name: str, database_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-05-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Cache/redisEnterprise/{clusterName}/databases/{databaseName}",
)
path_format_arguments = {
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"clusterName": _SERIALIZER.url(
"cluster_name", cluster_name, "str", pattern=r"^(?=.{1,60}$)[A-Za-z0-9]+(-[A-Za-z0-9]+)*$"
),
"databaseName": _SERIALIZER.url(
"database_name", database_name, "str", pattern=r"^(?=.{1,60}$)[A-Za-z0-9]+(-[A-Za-z0-9]+)*$"
),
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_delete_request(
resource_group_name: str, cluster_name: str, database_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-05-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Cache/redisEnterprise/{clusterName}/databases/{databaseName}",
)
path_format_arguments = {
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"clusterName": _SERIALIZER.url(
"cluster_name", cluster_name, "str", pattern=r"^(?=.{1,60}$)[A-Za-z0-9]+(-[A-Za-z0-9]+)*$"
),
"databaseName": _SERIALIZER.url(
"database_name", database_name, "str", pattern=r"^(?=.{1,60}$)[A-Za-z0-9]+(-[A-Za-z0-9]+)*$"
),
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="DELETE", url=_url, params=_params, headers=_headers, **kwargs)
def build_list_keys_request(
resource_group_name: str, cluster_name: str, database_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-05-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Cache/redisEnterprise/{clusterName}/databases/{databaseName}/listKeys",
)
path_format_arguments = {
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"clusterName": _SERIALIZER.url(
"cluster_name", cluster_name, "str", pattern=r"^(?=.{1,60}$)[A-Za-z0-9]+(-[A-Za-z0-9]+)*$"
),
"databaseName": _SERIALIZER.url(
"database_name", database_name, "str", pattern=r"^(?=.{1,60}$)[A-Za-z0-9]+(-[A-Za-z0-9]+)*$"
),
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_regenerate_key_request(
resource_group_name: str, cluster_name: str, database_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-05-01-preview"))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Cache/redisEnterprise/{clusterName}/databases/{databaseName}/regenerateKey",
)
path_format_arguments = {
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"clusterName": _SERIALIZER.url(
"cluster_name", cluster_name, "str", pattern=r"^(?=.{1,60}$)[A-Za-z0-9]+(-[A-Za-z0-9]+)*$"
),
"databaseName": _SERIALIZER.url(
"database_name", database_name, "str", pattern=r"^(?=.{1,60}$)[A-Za-z0-9]+(-[A-Za-z0-9]+)*$"
),
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_import_method_request(
resource_group_name: str, cluster_name: str, database_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-05-01-preview"))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Cache/redisEnterprise/{clusterName}/databases/{databaseName}/import",
)
path_format_arguments = {
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"clusterName": _SERIALIZER.url(
"cluster_name", cluster_name, "str", pattern=r"^(?=.{1,60}$)[A-Za-z0-9]+(-[A-Za-z0-9]+)*$"
),
"databaseName": _SERIALIZER.url(
"database_name", database_name, "str", pattern=r"^(?=.{1,60}$)[A-Za-z0-9]+(-[A-Za-z0-9]+)*$"
),
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_export_request(
resource_group_name: str, cluster_name: str, database_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-05-01-preview"))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Cache/redisEnterprise/{clusterName}/databases/{databaseName}/export",
)
path_format_arguments = {
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"clusterName": _SERIALIZER.url(
"cluster_name", cluster_name, "str", pattern=r"^(?=.{1,60}$)[A-Za-z0-9]+(-[A-Za-z0-9]+)*$"
),
"databaseName": _SERIALIZER.url(
"database_name", database_name, "str", pattern=r"^(?=.{1,60}$)[A-Za-z0-9]+(-[A-Za-z0-9]+)*$"
),
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_force_unlink_request(
resource_group_name: str, cluster_name: str, database_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-05-01-preview"))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Cache/redisEnterprise/{clusterName}/databases/{databaseName}/forceUnlink",
)
path_format_arguments = {
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"clusterName": _SERIALIZER.url(
"cluster_name", cluster_name, "str", pattern=r"^(?=.{1,60}$)[A-Za-z0-9]+(-[A-Za-z0-9]+)*$"
),
"databaseName": _SERIALIZER.url(
"database_name", database_name, "str", pattern=r"^(?=.{1,60}$)[A-Za-z0-9]+(-[A-Za-z0-9]+)*$"
),
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_force_link_to_replication_group_request( # pylint: disable=name-too-long
resource_group_name: str, cluster_name: str, database_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-05-01-preview"))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Cache/redisEnterprise/{clusterName}/databases/{databaseName}/forceLinkToReplicationGroup",
)
path_format_arguments = {
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"clusterName": _SERIALIZER.url(
"cluster_name", cluster_name, "str", pattern=r"^(?=.{1,60}$)[A-Za-z0-9]+(-[A-Za-z0-9]+)*$"
),
"databaseName": _SERIALIZER.url(
"database_name", database_name, "str", pattern=r"^(?=.{1,60}$)[A-Za-z0-9]+(-[A-Za-z0-9]+)*$"
),
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_flush_request(
resource_group_name: str, cluster_name: str, database_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-05-01-preview"))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Cache/redisEnterprise/{clusterName}/databases/{databaseName}/flush",
)
path_format_arguments = {
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"clusterName": _SERIALIZER.url(
"cluster_name", cluster_name, "str", pattern=r"^(?=.{1,60}$)[A-Za-z0-9]+(-[A-Za-z0-9]+)*$"
),
"databaseName": _SERIALIZER.url(
"database_name", database_name, "str", pattern=r"^(?=.{1,60}$)[A-Za-z0-9]+(-[A-Za-z0-9]+)*$"
),
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_upgrade_db_redis_version_request(
resource_group_name: str, cluster_name: str, database_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-05-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Cache/redisEnterprise/{clusterName}/databases/{databaseName}/upgradeDBRedisVersion",
)
path_format_arguments = {
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"clusterName": _SERIALIZER.url(
"cluster_name", cluster_name, "str", pattern=r"^(?=.{1,60}$)[A-Za-z0-9]+(-[A-Za-z0-9]+)*$"
),
"databaseName": _SERIALIZER.url(
"database_name", database_name, "str", pattern=r"^(?=.{1,60}$)[A-Za-z0-9]+(-[A-Za-z0-9]+)*$"
),
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
[docs]
class DatabasesOperations: # pylint: disable=too-many-public-methods
"""
.. warning::
**DO NOT** instantiate this class directly.
Instead, you should access the following operations through
:class:`~azure.mgmt.redisenterprise.RedisEnterpriseManagementClient`'s
:attr:`databases` attribute.
"""
models = _models
def __init__(self, *args, **kwargs):
input_args = list(args)
self._client: PipelineClient = input_args.pop(0) if input_args else kwargs.pop("client")
self._config: RedisEnterpriseManagementClientConfiguration = (
input_args.pop(0) if input_args else kwargs.pop("config")
)
self._serialize: Serializer = input_args.pop(0) if input_args else kwargs.pop("serializer")
self._deserialize: Deserializer = input_args.pop(0) if input_args else kwargs.pop("deserializer")
[docs]
@distributed_trace
def list_by_cluster(
self, resource_group_name: str, cluster_name: str, **kwargs: Any
) -> Iterable["_models.Database"]:
"""Gets all databases in the specified Redis Enterprise cluster.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: The name of the Redis Enterprise cluster. Name must be 1-60 characters
long. Allowed characters(A-Z, a-z, 0-9) and hyphen(-). There can be no leading nor trailing nor
consecutive hyphens. Required.
:type cluster_name: str
:return: An iterator like instance of either Database or the result of cls(response)
:rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.redisenterprise.models.Database]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.DatabaseList] = kwargs.pop("cls", None)
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
_request = build_list_by_cluster_request(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
_request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
_request.url = self._client.format_url(_request.url)
_request.method = "GET"
return _request
def extract_data(pipeline_response):
deserialized = self._deserialize("DatabaseList", pipeline_response)
list_of_elem = deserialized.value
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return deserialized.next_link or None, iter(list_of_elem)
def get_next(next_link=None):
_request = prepare_request(next_link)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response)
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
return pipeline_response
return ItemPaged(get_next, extract_data)
def _create_initial(
self,
resource_group_name: str,
cluster_name: str,
database_name: str,
parameters: Union[_models.Database, IO[bytes]],
**kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(parameters, (IOBase, bytes)):
_content = parameters
else:
_json = self._serialize.body(parameters, "Database")
_request = build_create_request(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
database_name=database_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 201]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response)
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
@overload
def begin_create(
self,
resource_group_name: str,
cluster_name: str,
database_name: str,
parameters: _models.Database,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.Database]:
"""Creates a database.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: The name of the Redis Enterprise cluster. Name must be 1-60 characters
long. Allowed characters(A-Z, a-z, 0-9) and hyphen(-). There can be no leading nor trailing nor
consecutive hyphens. Required.
:type cluster_name: str
:param database_name: The name of the Redis Enterprise database. Required.
:type database_name: str
:param parameters: Parameters supplied to the create or update database operation. Required.
:type parameters: ~azure.mgmt.redisenterprise.models.Database
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either Database or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.redisenterprise.models.Database]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_create(
self,
resource_group_name: str,
cluster_name: str,
database_name: str,
parameters: IO[bytes],
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.Database]:
"""Creates a database.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: The name of the Redis Enterprise cluster. Name must be 1-60 characters
long. Allowed characters(A-Z, a-z, 0-9) and hyphen(-). There can be no leading nor trailing nor
consecutive hyphens. Required.
:type cluster_name: str
:param database_name: The name of the Redis Enterprise database. Required.
:type database_name: str
:param parameters: Parameters supplied to the create or update database operation. Required.
:type parameters: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either Database or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.redisenterprise.models.Database]
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def begin_create(
self,
resource_group_name: str,
cluster_name: str,
database_name: str,
parameters: Union[_models.Database, IO[bytes]],
**kwargs: Any
) -> LROPoller[_models.Database]:
"""Creates a database.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: The name of the Redis Enterprise cluster. Name must be 1-60 characters
long. Allowed characters(A-Z, a-z, 0-9) and hyphen(-). There can be no leading nor trailing nor
consecutive hyphens. Required.
:type cluster_name: str
:param database_name: The name of the Redis Enterprise database. Required.
:type database_name: str
:param parameters: Parameters supplied to the create or update database operation. Is either a
Database type or a IO[bytes] type. Required.
:type parameters: ~azure.mgmt.redisenterprise.models.Database or IO[bytes]
:return: An instance of LROPoller that returns either Database or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.redisenterprise.models.Database]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.Database] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._create_initial(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
database_name=database_name,
parameters=parameters,
api_version=api_version,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response):
deserialized = self._deserialize("Database", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized
if polling is True:
polling_method: PollingMethod = cast(
PollingMethod, ARMPolling(lro_delay, lro_options={"final-state-via": "original-uri"}, **kwargs)
)
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[_models.Database].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[_models.Database](
self._client, raw_result, get_long_running_output, polling_method # type: ignore
)
def _update_initial(
self,
resource_group_name: str,
cluster_name: str,
database_name: str,
parameters: Union[_models.DatabaseUpdate, IO[bytes]],
**kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(parameters, (IOBase, bytes)):
_content = parameters
else:
_json = self._serialize.body(parameters, "DatabaseUpdate")
_request = build_update_request(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
database_name=database_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response)
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["Location"] = self._deserialize("str", response.headers.get("Location"))
response_headers["Azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("Azure-AsyncOperation")
)
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
@overload
def begin_update(
self,
resource_group_name: str,
cluster_name: str,
database_name: str,
parameters: _models.DatabaseUpdate,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.Database]:
"""Updates a database.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: The name of the Redis Enterprise cluster. Name must be 1-60 characters
long. Allowed characters(A-Z, a-z, 0-9) and hyphen(-). There can be no leading nor trailing nor
consecutive hyphens. Required.
:type cluster_name: str
:param database_name: The name of the Redis Enterprise database. Required.
:type database_name: str
:param parameters: Parameters supplied to the create or update database operation. Required.
:type parameters: ~azure.mgmt.redisenterprise.models.DatabaseUpdate
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either Database or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.redisenterprise.models.Database]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_update(
self,
resource_group_name: str,
cluster_name: str,
database_name: str,
parameters: IO[bytes],
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.Database]:
"""Updates a database.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: The name of the Redis Enterprise cluster. Name must be 1-60 characters
long. Allowed characters(A-Z, a-z, 0-9) and hyphen(-). There can be no leading nor trailing nor
consecutive hyphens. Required.
:type cluster_name: str
:param database_name: The name of the Redis Enterprise database. Required.
:type database_name: str
:param parameters: Parameters supplied to the create or update database operation. Required.
:type parameters: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either Database or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.redisenterprise.models.Database]
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def begin_update(
self,
resource_group_name: str,
cluster_name: str,
database_name: str,
parameters: Union[_models.DatabaseUpdate, IO[bytes]],
**kwargs: Any
) -> LROPoller[_models.Database]:
"""Updates a database.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: The name of the Redis Enterprise cluster. Name must be 1-60 characters
long. Allowed characters(A-Z, a-z, 0-9) and hyphen(-). There can be no leading nor trailing nor
consecutive hyphens. Required.
:type cluster_name: str
:param database_name: The name of the Redis Enterprise database. Required.
:type database_name: str
:param parameters: Parameters supplied to the create or update database operation. Is either a
DatabaseUpdate type or a IO[bytes] type. Required.
:type parameters: ~azure.mgmt.redisenterprise.models.DatabaseUpdate or IO[bytes]
:return: An instance of LROPoller that returns either Database or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.redisenterprise.models.Database]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.Database] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._update_initial(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
database_name=database_name,
parameters=parameters,
api_version=api_version,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response):
deserialized = self._deserialize("Database", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized
if polling is True:
polling_method: PollingMethod = cast(
PollingMethod, ARMPolling(lro_delay, lro_options={"final-state-via": "azure-async-operation"}, **kwargs)
)
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[_models.Database].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[_models.Database](
self._client, raw_result, get_long_running_output, polling_method # type: ignore
)
[docs]
@distributed_trace
def get(self, resource_group_name: str, cluster_name: str, database_name: str, **kwargs: Any) -> _models.Database:
"""Gets information about a database in a Redis Enterprise cluster.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: The name of the Redis Enterprise cluster. Name must be 1-60 characters
long. Allowed characters(A-Z, a-z, 0-9) and hyphen(-). There can be no leading nor trailing nor
consecutive hyphens. Required.
:type cluster_name: str
:param database_name: The name of the Redis Enterprise database. Required.
:type database_name: str
:return: Database or the result of cls(response)
:rtype: ~azure.mgmt.redisenterprise.models.Database
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.Database] = kwargs.pop("cls", None)
_request = build_get_request(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
database_name=database_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response)
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
deserialized = self._deserialize("Database", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
def _delete_initial(
self, resource_group_name: str, cluster_name: str, database_name: str, **kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
_request = build_delete_request(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
database_name=database_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202, 204]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response)
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["Location"] = self._deserialize("str", response.headers.get("Location"))
response_headers["Azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("Azure-AsyncOperation")
)
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def begin_delete(
self, resource_group_name: str, cluster_name: str, database_name: str, **kwargs: Any
) -> LROPoller[None]:
"""Deletes a single database.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: The name of the Redis Enterprise cluster. Name must be 1-60 characters
long. Allowed characters(A-Z, a-z, 0-9) and hyphen(-). There can be no leading nor trailing nor
consecutive hyphens. Required.
:type cluster_name: str
:param database_name: The name of the Redis Enterprise database. Required.
:type database_name: str
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[None] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._delete_initial(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
database_name=database_name,
api_version=api_version,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements
if cls:
return cls(pipeline_response, None, {}) # type: ignore
if polling is True:
polling_method: PollingMethod = cast(
PollingMethod, ARMPolling(lro_delay, lro_options={"final-state-via": "azure-async-operation"}, **kwargs)
)
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[None].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[None](self._client, raw_result, get_long_running_output, polling_method) # type: ignore
[docs]
@distributed_trace
def list_keys(
self, resource_group_name: str, cluster_name: str, database_name: str, **kwargs: Any
) -> _models.AccessKeys:
"""Retrieves the access keys for the Redis Enterprise database.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: The name of the Redis Enterprise cluster. Name must be 1-60 characters
long. Allowed characters(A-Z, a-z, 0-9) and hyphen(-). There can be no leading nor trailing nor
consecutive hyphens. Required.
:type cluster_name: str
:param database_name: The name of the Redis Enterprise database. Required.
:type database_name: str
:return: AccessKeys or the result of cls(response)
:rtype: ~azure.mgmt.redisenterprise.models.AccessKeys
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.AccessKeys] = kwargs.pop("cls", None)
_request = build_list_keys_request(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
database_name=database_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response)
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
deserialized = self._deserialize("AccessKeys", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
def _regenerate_key_initial(
self,
resource_group_name: str,
cluster_name: str,
database_name: str,
parameters: Union[_models.RegenerateKeyParameters, IO[bytes]],
**kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(parameters, (IOBase, bytes)):
_content = parameters
else:
_json = self._serialize.body(parameters, "RegenerateKeyParameters")
_request = build_regenerate_key_request(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
database_name=database_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response)
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["Location"] = self._deserialize("str", response.headers.get("Location"))
response_headers["Azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("Azure-AsyncOperation")
)
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
@overload
def begin_regenerate_key(
self,
resource_group_name: str,
cluster_name: str,
database_name: str,
parameters: _models.RegenerateKeyParameters,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.AccessKeys]:
"""Regenerates the Redis Enterprise database's access keys.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: The name of the Redis Enterprise cluster. Name must be 1-60 characters
long. Allowed characters(A-Z, a-z, 0-9) and hyphen(-). There can be no leading nor trailing nor
consecutive hyphens. Required.
:type cluster_name: str
:param database_name: The name of the Redis Enterprise database. Required.
:type database_name: str
:param parameters: Specifies which key to regenerate. Required.
:type parameters: ~azure.mgmt.redisenterprise.models.RegenerateKeyParameters
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either AccessKeys or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.redisenterprise.models.AccessKeys]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_regenerate_key(
self,
resource_group_name: str,
cluster_name: str,
database_name: str,
parameters: IO[bytes],
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.AccessKeys]:
"""Regenerates the Redis Enterprise database's access keys.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: The name of the Redis Enterprise cluster. Name must be 1-60 characters
long. Allowed characters(A-Z, a-z, 0-9) and hyphen(-). There can be no leading nor trailing nor
consecutive hyphens. Required.
:type cluster_name: str
:param database_name: The name of the Redis Enterprise database. Required.
:type database_name: str
:param parameters: Specifies which key to regenerate. Required.
:type parameters: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either AccessKeys or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.redisenterprise.models.AccessKeys]
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def begin_regenerate_key(
self,
resource_group_name: str,
cluster_name: str,
database_name: str,
parameters: Union[_models.RegenerateKeyParameters, IO[bytes]],
**kwargs: Any
) -> LROPoller[_models.AccessKeys]:
"""Regenerates the Redis Enterprise database's access keys.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: The name of the Redis Enterprise cluster. Name must be 1-60 characters
long. Allowed characters(A-Z, a-z, 0-9) and hyphen(-). There can be no leading nor trailing nor
consecutive hyphens. Required.
:type cluster_name: str
:param database_name: The name of the Redis Enterprise database. Required.
:type database_name: str
:param parameters: Specifies which key to regenerate. Is either a RegenerateKeyParameters type
or a IO[bytes] type. Required.
:type parameters: ~azure.mgmt.redisenterprise.models.RegenerateKeyParameters or IO[bytes]
:return: An instance of LROPoller that returns either AccessKeys or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.redisenterprise.models.AccessKeys]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.AccessKeys] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._regenerate_key_initial(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
database_name=database_name,
parameters=parameters,
api_version=api_version,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response):
deserialized = self._deserialize("AccessKeys", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized
if polling is True:
polling_method: PollingMethod = cast(
PollingMethod, ARMPolling(lro_delay, lro_options={"final-state-via": "azure-async-operation"}, **kwargs)
)
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[_models.AccessKeys].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[_models.AccessKeys](
self._client, raw_result, get_long_running_output, polling_method # type: ignore
)
def _import_method_initial(
self,
resource_group_name: str,
cluster_name: str,
database_name: str,
parameters: Union[_models.ImportClusterParameters, IO[bytes]],
**kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(parameters, (IOBase, bytes)):
_content = parameters
else:
_json = self._serialize.body(parameters, "ImportClusterParameters")
_request = build_import_method_request(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
database_name=database_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response)
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["Location"] = self._deserialize("str", response.headers.get("Location"))
response_headers["Azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("Azure-AsyncOperation")
)
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
@overload
def begin_import_method(
self,
resource_group_name: str,
cluster_name: str,
database_name: str,
parameters: _models.ImportClusterParameters,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[None]:
"""Imports database files to target database.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: The name of the Redis Enterprise cluster. Name must be 1-60 characters
long. Allowed characters(A-Z, a-z, 0-9) and hyphen(-). There can be no leading nor trailing nor
consecutive hyphens. Required.
:type cluster_name: str
:param database_name: The name of the Redis Enterprise database. Required.
:type database_name: str
:param parameters: Storage information for importing into the cluster. Required.
:type parameters: ~azure.mgmt.redisenterprise.models.ImportClusterParameters
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_import_method(
self,
resource_group_name: str,
cluster_name: str,
database_name: str,
parameters: IO[bytes],
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[None]:
"""Imports database files to target database.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: The name of the Redis Enterprise cluster. Name must be 1-60 characters
long. Allowed characters(A-Z, a-z, 0-9) and hyphen(-). There can be no leading nor trailing nor
consecutive hyphens. Required.
:type cluster_name: str
:param database_name: The name of the Redis Enterprise database. Required.
:type database_name: str
:param parameters: Storage information for importing into the cluster. Required.
:type parameters: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def begin_import_method(
self,
resource_group_name: str,
cluster_name: str,
database_name: str,
parameters: Union[_models.ImportClusterParameters, IO[bytes]],
**kwargs: Any
) -> LROPoller[None]:
"""Imports database files to target database.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: The name of the Redis Enterprise cluster. Name must be 1-60 characters
long. Allowed characters(A-Z, a-z, 0-9) and hyphen(-). There can be no leading nor trailing nor
consecutive hyphens. Required.
:type cluster_name: str
:param database_name: The name of the Redis Enterprise database. Required.
:type database_name: str
:param parameters: Storage information for importing into the cluster. Is either a
ImportClusterParameters type or a IO[bytes] type. Required.
:type parameters: ~azure.mgmt.redisenterprise.models.ImportClusterParameters or IO[bytes]
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[None] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._import_method_initial(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
database_name=database_name,
parameters=parameters,
api_version=api_version,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements
if cls:
return cls(pipeline_response, None, {}) # type: ignore
if polling is True:
polling_method: PollingMethod = cast(
PollingMethod, ARMPolling(lro_delay, lro_options={"final-state-via": "azure-async-operation"}, **kwargs)
)
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[None].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[None](self._client, raw_result, get_long_running_output, polling_method) # type: ignore
def _export_initial(
self,
resource_group_name: str,
cluster_name: str,
database_name: str,
parameters: Union[_models.ExportClusterParameters, IO[bytes]],
**kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(parameters, (IOBase, bytes)):
_content = parameters
else:
_json = self._serialize.body(parameters, "ExportClusterParameters")
_request = build_export_request(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
database_name=database_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response)
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["Location"] = self._deserialize("str", response.headers.get("Location"))
response_headers["Azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("Azure-AsyncOperation")
)
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
@overload
def begin_export(
self,
resource_group_name: str,
cluster_name: str,
database_name: str,
parameters: _models.ExportClusterParameters,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[None]:
"""Exports a database file from target database.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: The name of the Redis Enterprise cluster. Name must be 1-60 characters
long. Allowed characters(A-Z, a-z, 0-9) and hyphen(-). There can be no leading nor trailing nor
consecutive hyphens. Required.
:type cluster_name: str
:param database_name: The name of the Redis Enterprise database. Required.
:type database_name: str
:param parameters: Storage information for exporting into the cluster. Required.
:type parameters: ~azure.mgmt.redisenterprise.models.ExportClusterParameters
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_export(
self,
resource_group_name: str,
cluster_name: str,
database_name: str,
parameters: IO[bytes],
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[None]:
"""Exports a database file from target database.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: The name of the Redis Enterprise cluster. Name must be 1-60 characters
long. Allowed characters(A-Z, a-z, 0-9) and hyphen(-). There can be no leading nor trailing nor
consecutive hyphens. Required.
:type cluster_name: str
:param database_name: The name of the Redis Enterprise database. Required.
:type database_name: str
:param parameters: Storage information for exporting into the cluster. Required.
:type parameters: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def begin_export(
self,
resource_group_name: str,
cluster_name: str,
database_name: str,
parameters: Union[_models.ExportClusterParameters, IO[bytes]],
**kwargs: Any
) -> LROPoller[None]:
"""Exports a database file from target database.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: The name of the Redis Enterprise cluster. Name must be 1-60 characters
long. Allowed characters(A-Z, a-z, 0-9) and hyphen(-). There can be no leading nor trailing nor
consecutive hyphens. Required.
:type cluster_name: str
:param database_name: The name of the Redis Enterprise database. Required.
:type database_name: str
:param parameters: Storage information for exporting into the cluster. Is either a
ExportClusterParameters type or a IO[bytes] type. Required.
:type parameters: ~azure.mgmt.redisenterprise.models.ExportClusterParameters or IO[bytes]
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[None] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._export_initial(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
database_name=database_name,
parameters=parameters,
api_version=api_version,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements
if cls:
return cls(pipeline_response, None, {}) # type: ignore
if polling is True:
polling_method: PollingMethod = cast(
PollingMethod, ARMPolling(lro_delay, lro_options={"final-state-via": "azure-async-operation"}, **kwargs)
)
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[None].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[None](self._client, raw_result, get_long_running_output, polling_method) # type: ignore
def _force_unlink_initial(
self,
resource_group_name: str,
cluster_name: str,
database_name: str,
parameters: Union[_models.ForceUnlinkParameters, IO[bytes]],
**kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(parameters, (IOBase, bytes)):
_content = parameters
else:
_json = self._serialize.body(parameters, "ForceUnlinkParameters")
_request = build_force_unlink_request(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
database_name=database_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response)
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["Location"] = self._deserialize("str", response.headers.get("Location"))
response_headers["Azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("Azure-AsyncOperation")
)
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
@overload
def begin_force_unlink(
self,
resource_group_name: str,
cluster_name: str,
database_name: str,
parameters: _models.ForceUnlinkParameters,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[None]:
"""Forcibly removes the link to the specified database resource.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: The name of the Redis Enterprise cluster. Name must be 1-60 characters
long. Allowed characters(A-Z, a-z, 0-9) and hyphen(-). There can be no leading nor trailing nor
consecutive hyphens. Required.
:type cluster_name: str
:param database_name: The name of the Redis Enterprise database. Required.
:type database_name: str
:param parameters: Information identifying the database to be unlinked. Required.
:type parameters: ~azure.mgmt.redisenterprise.models.ForceUnlinkParameters
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_force_unlink(
self,
resource_group_name: str,
cluster_name: str,
database_name: str,
parameters: IO[bytes],
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[None]:
"""Forcibly removes the link to the specified database resource.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: The name of the Redis Enterprise cluster. Name must be 1-60 characters
long. Allowed characters(A-Z, a-z, 0-9) and hyphen(-). There can be no leading nor trailing nor
consecutive hyphens. Required.
:type cluster_name: str
:param database_name: The name of the Redis Enterprise database. Required.
:type database_name: str
:param parameters: Information identifying the database to be unlinked. Required.
:type parameters: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def begin_force_unlink(
self,
resource_group_name: str,
cluster_name: str,
database_name: str,
parameters: Union[_models.ForceUnlinkParameters, IO[bytes]],
**kwargs: Any
) -> LROPoller[None]:
"""Forcibly removes the link to the specified database resource.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: The name of the Redis Enterprise cluster. Name must be 1-60 characters
long. Allowed characters(A-Z, a-z, 0-9) and hyphen(-). There can be no leading nor trailing nor
consecutive hyphens. Required.
:type cluster_name: str
:param database_name: The name of the Redis Enterprise database. Required.
:type database_name: str
:param parameters: Information identifying the database to be unlinked. Is either a
ForceUnlinkParameters type or a IO[bytes] type. Required.
:type parameters: ~azure.mgmt.redisenterprise.models.ForceUnlinkParameters or IO[bytes]
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[None] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._force_unlink_initial(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
database_name=database_name,
parameters=parameters,
api_version=api_version,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements
if cls:
return cls(pipeline_response, None, {}) # type: ignore
if polling is True:
polling_method: PollingMethod = cast(
PollingMethod, ARMPolling(lro_delay, lro_options={"final-state-via": "azure-async-operation"}, **kwargs)
)
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[None].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[None](self._client, raw_result, get_long_running_output, polling_method) # type: ignore
def _force_link_to_replication_group_initial(
self,
resource_group_name: str,
cluster_name: str,
database_name: str,
parameters: Union[_models.ForceLinkParameters, IO[bytes]],
**kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(parameters, (IOBase, bytes)):
_content = parameters
else:
_json = self._serialize.body(parameters, "ForceLinkParameters")
_request = build_force_link_to_replication_group_request(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
database_name=database_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [202]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize.failsafe_deserialize(_models.ErrorResponseAutoGenerated, pipeline_response)
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
response_headers = {}
response_headers["Location"] = self._deserialize("str", response.headers.get("Location"))
response_headers["Azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("Azure-AsyncOperation")
)
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
@overload
def begin_force_link_to_replication_group(
self,
resource_group_name: str,
cluster_name: str,
database_name: str,
parameters: _models.ForceLinkParameters,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[None]:
"""Forcibly recreates an existing database on the specified cluster, and rejoins it to an existing
replication group. **IMPORTANT NOTE:** All data in this database will be discarded, and the
database will temporarily be unavailable while rejoining the replication group.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: The name of the Redis Enterprise cluster. Name must be 1-60 characters
long. Allowed characters(A-Z, a-z, 0-9) and hyphen(-). There can be no leading nor trailing nor
consecutive hyphens. Required.
:type cluster_name: str
:param database_name: The name of the Redis Enterprise database. Required.
:type database_name: str
:param parameters: Information identifying the database to be unlinked. Required.
:type parameters: ~azure.mgmt.redisenterprise.models.ForceLinkParameters
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_force_link_to_replication_group(
self,
resource_group_name: str,
cluster_name: str,
database_name: str,
parameters: IO[bytes],
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[None]:
"""Forcibly recreates an existing database on the specified cluster, and rejoins it to an existing
replication group. **IMPORTANT NOTE:** All data in this database will be discarded, and the
database will temporarily be unavailable while rejoining the replication group.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: The name of the Redis Enterprise cluster. Name must be 1-60 characters
long. Allowed characters(A-Z, a-z, 0-9) and hyphen(-). There can be no leading nor trailing nor
consecutive hyphens. Required.
:type cluster_name: str
:param database_name: The name of the Redis Enterprise database. Required.
:type database_name: str
:param parameters: Information identifying the database to be unlinked. Required.
:type parameters: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def begin_force_link_to_replication_group(
self,
resource_group_name: str,
cluster_name: str,
database_name: str,
parameters: Union[_models.ForceLinkParameters, IO[bytes]],
**kwargs: Any
) -> LROPoller[None]:
"""Forcibly recreates an existing database on the specified cluster, and rejoins it to an existing
replication group. **IMPORTANT NOTE:** All data in this database will be discarded, and the
database will temporarily be unavailable while rejoining the replication group.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: The name of the Redis Enterprise cluster. Name must be 1-60 characters
long. Allowed characters(A-Z, a-z, 0-9) and hyphen(-). There can be no leading nor trailing nor
consecutive hyphens. Required.
:type cluster_name: str
:param database_name: The name of the Redis Enterprise database. Required.
:type database_name: str
:param parameters: Information identifying the database to be unlinked. Is either a
ForceLinkParameters type or a IO[bytes] type. Required.
:type parameters: ~azure.mgmt.redisenterprise.models.ForceLinkParameters or IO[bytes]
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[None] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._force_link_to_replication_group_initial(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
database_name=database_name,
parameters=parameters,
api_version=api_version,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements
if cls:
return cls(pipeline_response, None, {}) # type: ignore
if polling is True:
polling_method: PollingMethod = cast(
PollingMethod, ARMPolling(lro_delay, lro_options={"final-state-via": "azure-async-operation"}, **kwargs)
)
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[None].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[None](self._client, raw_result, get_long_running_output, polling_method) # type: ignore
def _flush_initial(
self,
resource_group_name: str,
cluster_name: str,
database_name: str,
parameters: Optional[Union[_models.FlushParameters, IO[bytes]]] = None,
**kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(parameters, (IOBase, bytes)):
_content = parameters
else:
if parameters is not None:
_json = self._serialize.body(parameters, "FlushParameters")
else:
_json = None
_request = build_flush_request(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
database_name=database_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response)
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["Location"] = self._deserialize("str", response.headers.get("Location"))
response_headers["Azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("Azure-AsyncOperation")
)
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
@overload
def begin_flush(
self,
resource_group_name: str,
cluster_name: str,
database_name: str,
parameters: Optional[_models.FlushParameters] = None,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[None]:
"""Flushes all the keys in this database and also from its linked databases.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: The name of the Redis Enterprise cluster. Name must be 1-60 characters
long. Allowed characters(A-Z, a-z, 0-9) and hyphen(-). There can be no leading nor trailing nor
consecutive hyphens. Required.
:type cluster_name: str
:param database_name: The name of the Redis Enterprise database. Required.
:type database_name: str
:param parameters: Information identifying the databases to be flushed. Default value is None.
:type parameters: ~azure.mgmt.redisenterprise.models.FlushParameters
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_flush(
self,
resource_group_name: str,
cluster_name: str,
database_name: str,
parameters: Optional[IO[bytes]] = None,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[None]:
"""Flushes all the keys in this database and also from its linked databases.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: The name of the Redis Enterprise cluster. Name must be 1-60 characters
long. Allowed characters(A-Z, a-z, 0-9) and hyphen(-). There can be no leading nor trailing nor
consecutive hyphens. Required.
:type cluster_name: str
:param database_name: The name of the Redis Enterprise database. Required.
:type database_name: str
:param parameters: Information identifying the databases to be flushed. Default value is None.
:type parameters: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def begin_flush(
self,
resource_group_name: str,
cluster_name: str,
database_name: str,
parameters: Optional[Union[_models.FlushParameters, IO[bytes]]] = None,
**kwargs: Any
) -> LROPoller[None]:
"""Flushes all the keys in this database and also from its linked databases.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: The name of the Redis Enterprise cluster. Name must be 1-60 characters
long. Allowed characters(A-Z, a-z, 0-9) and hyphen(-). There can be no leading nor trailing nor
consecutive hyphens. Required.
:type cluster_name: str
:param database_name: The name of the Redis Enterprise database. Required.
:type database_name: str
:param parameters: Information identifying the databases to be flushed. Is either a
FlushParameters type or a IO[bytes] type. Default value is None.
:type parameters: ~azure.mgmt.redisenterprise.models.FlushParameters or IO[bytes]
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[None] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._flush_initial(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
database_name=database_name,
parameters=parameters,
api_version=api_version,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements
if cls:
return cls(pipeline_response, None, {}) # type: ignore
if polling is True:
polling_method: PollingMethod = cast(
PollingMethod, ARMPolling(lro_delay, lro_options={"final-state-via": "location"}, **kwargs)
)
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[None].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[None](self._client, raw_result, get_long_running_output, polling_method) # type: ignore
def _upgrade_db_redis_version_initial(
self, resource_group_name: str, cluster_name: str, database_name: str, **kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
_request = build_upgrade_db_redis_version_request(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
database_name=database_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [202]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response)
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
response_headers = {}
response_headers["Location"] = self._deserialize("str", response.headers.get("Location"))
response_headers["Azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("Azure-AsyncOperation")
)
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def begin_upgrade_db_redis_version(
self, resource_group_name: str, cluster_name: str, database_name: str, **kwargs: Any
) -> LROPoller[None]:
"""Upgrades the database Redis version to the latest available.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param cluster_name: The name of the Redis Enterprise cluster. Name must be 1-60 characters
long. Allowed characters(A-Z, a-z, 0-9) and hyphen(-). There can be no leading nor trailing nor
consecutive hyphens. Required.
:type cluster_name: str
:param database_name: The name of the Redis Enterprise database. Required.
:type database_name: str
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[None] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._upgrade_db_redis_version_initial(
resource_group_name=resource_group_name,
cluster_name=cluster_name,
database_name=database_name,
api_version=api_version,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements
if cls:
return cls(pipeline_response, None, {}) # type: ignore
if polling is True:
polling_method: PollingMethod = cast(
PollingMethod, ARMPolling(lro_delay, lro_options={"final-state-via": "location"}, **kwargs)
)
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[None].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[None](self._client, raw_result, get_long_running_output, polling_method) # type: ignore