# pylint: disable=line-too-long,useless-suppression,too-many-lines
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
from collections.abc import MutableMapping
from io import IOBase
from typing import Any, Callable, Dict, IO, Iterable, Iterator, Optional, TypeVar, Union, cast, overload
import urllib.parse
from azure.core import PipelineClient
from azure.core.exceptions import (
ClientAuthenticationError,
HttpResponseError,
ResourceExistsError,
ResourceNotFoundError,
ResourceNotModifiedError,
StreamClosedError,
StreamConsumedError,
map_error,
)
from azure.core.paging import ItemPaged
from azure.core.pipeline import PipelineResponse
from azure.core.polling import LROPoller, NoPolling, PollingMethod
from azure.core.rest import HttpRequest, HttpResponse
from azure.core.tracing.decorator import distributed_trace
from azure.core.utils import case_insensitive_dict
from azure.mgmt.core.exceptions import ARMErrorFormat
from azure.mgmt.core.polling.arm_polling import ARMPolling
from .. import models as _models
from .._configuration import CosmosDBManagementClientConfiguration
from .._utils.serialization import Deserializer, Serializer
T = TypeVar("T")
ClsType = Optional[Callable[[PipelineResponse[HttpRequest, HttpResponse], T, Dict[str, Any]], Any]]
_SERIALIZER = Serializer()
_SERIALIZER.client_side_validation = False
def build_get_request(resource_group_name: str, account_name: str, subscription_id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_update_request(
resource_group_name: str, account_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="PATCH", url=_url, params=_params, headers=_headers, **kwargs)
def build_create_or_update_request(
resource_group_name: str, account_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="PUT", url=_url, params=_params, headers=_headers, **kwargs)
def build_delete_request(
resource_group_name: str, account_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
return HttpRequest(method="DELETE", url=_url, params=_params, **kwargs)
def build_failover_priority_change_request(
resource_group_name: str, account_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/failoverPriorityChange",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_list_request(subscription_id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop("template_url", "/subscriptions/{subscriptionId}/providers/Microsoft.DocumentDB/databaseAccounts")
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_list_by_resource_group_request(resource_group_name: str, subscription_id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts",
)
path_format_arguments = {
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_list_keys_request(
resource_group_name: str, account_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/listKeys",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_list_connection_strings_request(
resource_group_name: str, account_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/listConnectionStrings",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_offline_region_request(
resource_group_name: str, account_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/offlineRegion",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_online_region_request(
resource_group_name: str, account_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/onlineRegion",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_get_read_only_keys_request(
resource_group_name: str, account_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/readonlykeys",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_list_read_only_keys_request(
resource_group_name: str, account_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/readonlykeys",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_regenerate_key_request(
resource_group_name: str, account_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/regenerateKey",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_check_name_exists_request(account_name: str, **kwargs: Any) -> HttpRequest:
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
# Construct URL
_url = kwargs.pop("template_url", "/providers/Microsoft.DocumentDB/databaseAccountNames/{accountName}")
path_format_arguments = {
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
return HttpRequest(method="HEAD", url=_url, params=_params, **kwargs)
def build_list_metrics_request(
resource_group_name: str, account_name: str, subscription_id: str, *, filter: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/metrics",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
_params["$filter"] = _SERIALIZER.query("filter", filter, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_list_usages_request(
resource_group_name: str, account_name: str, subscription_id: str, *, filter: Optional[str] = None, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/usages",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
if filter is not None:
_params["$filter"] = _SERIALIZER.query("filter", filter, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_list_metric_definitions_request(
resource_group_name: str, account_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/metricDefinitions",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
[docs]
class DatabaseAccountsOperations: # pylint: disable=too-many-public-methods
"""
.. warning::
**DO NOT** instantiate this class directly.
Instead, you should access the following operations through
:class:`~azure.mgmt.cosmosdb.CosmosDBManagementClient`'s
:attr:`database_accounts` attribute.
"""
models = _models
def __init__(self, *args, **kwargs):
input_args = list(args)
self._client: PipelineClient = input_args.pop(0) if input_args else kwargs.pop("client")
self._config: CosmosDBManagementClientConfiguration = input_args.pop(0) if input_args else kwargs.pop("config")
self._serialize: Serializer = input_args.pop(0) if input_args else kwargs.pop("serializer")
self._deserialize: Deserializer = input_args.pop(0) if input_args else kwargs.pop("deserializer")
[docs]
@distributed_trace
def get(self, resource_group_name: str, account_name: str, **kwargs: Any) -> _models.DatabaseAccountGetResults:
"""Retrieves the properties of an existing Azure Cosmos DB database account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:return: DatabaseAccountGetResults or the result of cls(response)
:rtype: ~azure.mgmt.cosmosdb.models.DatabaseAccountGetResults
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.DatabaseAccountGetResults] = kwargs.pop("cls", None)
_request = build_get_request(
resource_group_name=resource_group_name,
account_name=account_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
deserialized = self._deserialize("DatabaseAccountGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
def _update_initial(
self,
resource_group_name: str,
account_name: str,
update_parameters: Union[_models.DatabaseAccountUpdateParameters, IO[bytes]],
**kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(update_parameters, (IOBase, bytes)):
_content = update_parameters
else:
_json = self._serialize.body(update_parameters, "DatabaseAccountUpdateParameters")
_request = build_update_request(
resource_group_name=resource_group_name,
account_name=account_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
@overload
def begin_update(
self,
resource_group_name: str,
account_name: str,
update_parameters: _models.DatabaseAccountUpdateParameters,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.DatabaseAccountGetResults]:
"""Updates the properties of an existing Azure Cosmos DB database account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param update_parameters: The parameters to provide for the current database account. Required.
:type update_parameters: ~azure.mgmt.cosmosdb.models.DatabaseAccountUpdateParameters
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either DatabaseAccountGetResults or the result
of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.DatabaseAccountGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_update(
self,
resource_group_name: str,
account_name: str,
update_parameters: IO[bytes],
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.DatabaseAccountGetResults]:
"""Updates the properties of an existing Azure Cosmos DB database account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param update_parameters: The parameters to provide for the current database account. Required.
:type update_parameters: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either DatabaseAccountGetResults or the result
of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.DatabaseAccountGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def begin_update(
self,
resource_group_name: str,
account_name: str,
update_parameters: Union[_models.DatabaseAccountUpdateParameters, IO[bytes]],
**kwargs: Any
) -> LROPoller[_models.DatabaseAccountGetResults]:
"""Updates the properties of an existing Azure Cosmos DB database account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param update_parameters: The parameters to provide for the current database account. Is either
a DatabaseAccountUpdateParameters type or a IO[bytes] type. Required.
:type update_parameters: ~azure.mgmt.cosmosdb.models.DatabaseAccountUpdateParameters or
IO[bytes]
:return: An instance of LROPoller that returns either DatabaseAccountGetResults or the result
of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.DatabaseAccountGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.DatabaseAccountGetResults] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._update_initial(
resource_group_name=resource_group_name,
account_name=account_name,
update_parameters=update_parameters,
api_version=api_version,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response):
deserialized = self._deserialize("DatabaseAccountGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[_models.DatabaseAccountGetResults].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[_models.DatabaseAccountGetResults](
self._client, raw_result, get_long_running_output, polling_method # type: ignore
)
def _create_or_update_initial(
self,
resource_group_name: str,
account_name: str,
create_update_parameters: Union[_models.DatabaseAccountCreateUpdateParameters, IO[bytes]],
**kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(create_update_parameters, (IOBase, bytes)):
_content = create_update_parameters
else:
_json = self._serialize.body(create_update_parameters, "DatabaseAccountCreateUpdateParameters")
_request = build_create_or_update_request(
resource_group_name=resource_group_name,
account_name=account_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
@overload
def begin_create_or_update(
self,
resource_group_name: str,
account_name: str,
create_update_parameters: _models.DatabaseAccountCreateUpdateParameters,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.DatabaseAccountGetResults]:
"""Creates or updates an Azure Cosmos DB database account. The "Update" method is preferred when
performing updates on an account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param create_update_parameters: The parameters to provide for the current database account.
Required.
:type create_update_parameters:
~azure.mgmt.cosmosdb.models.DatabaseAccountCreateUpdateParameters
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either DatabaseAccountGetResults or the result
of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.DatabaseAccountGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_create_or_update(
self,
resource_group_name: str,
account_name: str,
create_update_parameters: IO[bytes],
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.DatabaseAccountGetResults]:
"""Creates or updates an Azure Cosmos DB database account. The "Update" method is preferred when
performing updates on an account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param create_update_parameters: The parameters to provide for the current database account.
Required.
:type create_update_parameters: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either DatabaseAccountGetResults or the result
of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.DatabaseAccountGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def begin_create_or_update(
self,
resource_group_name: str,
account_name: str,
create_update_parameters: Union[_models.DatabaseAccountCreateUpdateParameters, IO[bytes]],
**kwargs: Any
) -> LROPoller[_models.DatabaseAccountGetResults]:
"""Creates or updates an Azure Cosmos DB database account. The "Update" method is preferred when
performing updates on an account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param create_update_parameters: The parameters to provide for the current database account. Is
either a DatabaseAccountCreateUpdateParameters type or a IO[bytes] type. Required.
:type create_update_parameters:
~azure.mgmt.cosmosdb.models.DatabaseAccountCreateUpdateParameters or IO[bytes]
:return: An instance of LROPoller that returns either DatabaseAccountGetResults or the result
of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.DatabaseAccountGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.DatabaseAccountGetResults] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._create_or_update_initial(
resource_group_name=resource_group_name,
account_name=account_name,
create_update_parameters=create_update_parameters,
api_version=api_version,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response):
deserialized = self._deserialize("DatabaseAccountGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[_models.DatabaseAccountGetResults].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[_models.DatabaseAccountGetResults](
self._client, raw_result, get_long_running_output, polling_method # type: ignore
)
def _delete_initial(self, resource_group_name: str, account_name: str, **kwargs: Any) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
_request = build_delete_request(
resource_group_name=resource_group_name,
account_name=account_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [202, 204]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("azure-AsyncOperation")
)
response_headers["location"] = self._deserialize("str", response.headers.get("location"))
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def begin_delete(self, resource_group_name: str, account_name: str, **kwargs: Any) -> LROPoller[None]:
"""Deletes an existing Azure Cosmos DB database account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[None] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._delete_initial(
resource_group_name=resource_group_name,
account_name=account_name,
api_version=api_version,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements
if cls:
return cls(pipeline_response, None, {}) # type: ignore
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[None].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[None](self._client, raw_result, get_long_running_output, polling_method) # type: ignore
def _failover_priority_change_initial(
self,
resource_group_name: str,
account_name: str,
failover_parameters: Union[_models.FailoverPolicies, IO[bytes]],
**kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(failover_parameters, (IOBase, bytes)):
_content = failover_parameters
else:
_json = self._serialize.body(failover_parameters, "FailoverPolicies")
_request = build_failover_priority_change_request(
resource_group_name=resource_group_name,
account_name=account_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [202, 204]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("azure-AsyncOperation")
)
response_headers["location"] = self._deserialize("str", response.headers.get("location"))
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
@overload
def begin_failover_priority_change(
self,
resource_group_name: str,
account_name: str,
failover_parameters: _models.FailoverPolicies,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[None]:
"""Changes the failover priority for the Azure Cosmos DB database account. A failover priority of
0 indicates a write region. The maximum value for a failover priority = (total number of
regions - 1). Failover priority values must be unique for each of the regions in which the
database account exists.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param failover_parameters: The new failover policies for the database account. Required.
:type failover_parameters: ~azure.mgmt.cosmosdb.models.FailoverPolicies
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_failover_priority_change(
self,
resource_group_name: str,
account_name: str,
failover_parameters: IO[bytes],
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[None]:
"""Changes the failover priority for the Azure Cosmos DB database account. A failover priority of
0 indicates a write region. The maximum value for a failover priority = (total number of
regions - 1). Failover priority values must be unique for each of the regions in which the
database account exists.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param failover_parameters: The new failover policies for the database account. Required.
:type failover_parameters: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def begin_failover_priority_change(
self,
resource_group_name: str,
account_name: str,
failover_parameters: Union[_models.FailoverPolicies, IO[bytes]],
**kwargs: Any
) -> LROPoller[None]:
"""Changes the failover priority for the Azure Cosmos DB database account. A failover priority of
0 indicates a write region. The maximum value for a failover priority = (total number of
regions - 1). Failover priority values must be unique for each of the regions in which the
database account exists.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param failover_parameters: The new failover policies for the database account. Is either a
FailoverPolicies type or a IO[bytes] type. Required.
:type failover_parameters: ~azure.mgmt.cosmosdb.models.FailoverPolicies or IO[bytes]
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[None] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._failover_priority_change_initial(
resource_group_name=resource_group_name,
account_name=account_name,
failover_parameters=failover_parameters,
api_version=api_version,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements
if cls:
return cls(pipeline_response, None, {}) # type: ignore
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[None].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[None](self._client, raw_result, get_long_running_output, polling_method) # type: ignore
[docs]
@distributed_trace
def list(self, **kwargs: Any) -> Iterable["_models.DatabaseAccountGetResults"]:
"""Lists all the Azure Cosmos DB database accounts available under the subscription.
:return: An iterator like instance of either DatabaseAccountGetResults or the result of
cls(response)
:rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.cosmosdb.models.DatabaseAccountGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.DatabaseAccountsListResult] = kwargs.pop("cls", None)
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
_request = build_list_request(
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
_request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
_request.url = self._client.format_url(_request.url)
_request.method = "GET"
return _request
def extract_data(pipeline_response):
deserialized = self._deserialize("DatabaseAccountsListResult", pipeline_response)
list_of_elem = deserialized.value
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return None, iter(list_of_elem)
def get_next(next_link=None):
_request = prepare_request(next_link)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
return pipeline_response
return ItemPaged(get_next, extract_data)
[docs]
@distributed_trace
def list_by_resource_group(
self, resource_group_name: str, **kwargs: Any
) -> Iterable["_models.DatabaseAccountGetResults"]:
"""Lists all the Azure Cosmos DB database accounts available under the given resource group.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:return: An iterator like instance of either DatabaseAccountGetResults or the result of
cls(response)
:rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.cosmosdb.models.DatabaseAccountGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.DatabaseAccountsListResult] = kwargs.pop("cls", None)
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
_request = build_list_by_resource_group_request(
resource_group_name=resource_group_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
_request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
_request.url = self._client.format_url(_request.url)
_request.method = "GET"
return _request
def extract_data(pipeline_response):
deserialized = self._deserialize("DatabaseAccountsListResult", pipeline_response)
list_of_elem = deserialized.value
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return None, iter(list_of_elem)
def get_next(next_link=None):
_request = prepare_request(next_link)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
return pipeline_response
return ItemPaged(get_next, extract_data)
[docs]
@distributed_trace
def list_keys(
self, resource_group_name: str, account_name: str, **kwargs: Any
) -> _models.DatabaseAccountListKeysResult:
"""Lists the access keys for the specified Azure Cosmos DB database account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:return: DatabaseAccountListKeysResult or the result of cls(response)
:rtype: ~azure.mgmt.cosmosdb.models.DatabaseAccountListKeysResult
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.DatabaseAccountListKeysResult] = kwargs.pop("cls", None)
_request = build_list_keys_request(
resource_group_name=resource_group_name,
account_name=account_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
deserialized = self._deserialize("DatabaseAccountListKeysResult", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def list_connection_strings(
self, resource_group_name: str, account_name: str, **kwargs: Any
) -> _models.DatabaseAccountListConnectionStringsResult:
"""Lists the connection strings for the specified Azure Cosmos DB database account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:return: DatabaseAccountListConnectionStringsResult or the result of cls(response)
:rtype: ~azure.mgmt.cosmosdb.models.DatabaseAccountListConnectionStringsResult
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.DatabaseAccountListConnectionStringsResult] = kwargs.pop("cls", None)
_request = build_list_connection_strings_request(
resource_group_name=resource_group_name,
account_name=account_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
deserialized = self._deserialize("DatabaseAccountListConnectionStringsResult", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
def _offline_region_initial(
self,
resource_group_name: str,
account_name: str,
region_parameter_for_offline: Union[_models.RegionForOnlineOffline, IO[bytes]],
**kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(region_parameter_for_offline, (IOBase, bytes)):
_content = region_parameter_for_offline
else:
_json = self._serialize.body(region_parameter_for_offline, "RegionForOnlineOffline")
_request = build_offline_region_request(
resource_group_name=resource_group_name,
account_name=account_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response)
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("azure-AsyncOperation")
)
response_headers["location"] = self._deserialize("str", response.headers.get("location"))
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
@overload
def begin_offline_region(
self,
resource_group_name: str,
account_name: str,
region_parameter_for_offline: _models.RegionForOnlineOffline,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[None]:
"""Offline the specified region for the specified Azure Cosmos DB database account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param region_parameter_for_offline: Cosmos DB region to offline for the database account.
Required.
:type region_parameter_for_offline: ~azure.mgmt.cosmosdb.models.RegionForOnlineOffline
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_offline_region(
self,
resource_group_name: str,
account_name: str,
region_parameter_for_offline: IO[bytes],
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[None]:
"""Offline the specified region for the specified Azure Cosmos DB database account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param region_parameter_for_offline: Cosmos DB region to offline for the database account.
Required.
:type region_parameter_for_offline: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def begin_offline_region(
self,
resource_group_name: str,
account_name: str,
region_parameter_for_offline: Union[_models.RegionForOnlineOffline, IO[bytes]],
**kwargs: Any
) -> LROPoller[None]:
"""Offline the specified region for the specified Azure Cosmos DB database account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param region_parameter_for_offline: Cosmos DB region to offline for the database account. Is
either a RegionForOnlineOffline type or a IO[bytes] type. Required.
:type region_parameter_for_offline: ~azure.mgmt.cosmosdb.models.RegionForOnlineOffline or
IO[bytes]
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[None] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._offline_region_initial(
resource_group_name=resource_group_name,
account_name=account_name,
region_parameter_for_offline=region_parameter_for_offline,
api_version=api_version,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements
if cls:
return cls(pipeline_response, None, {}) # type: ignore
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[None].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[None](self._client, raw_result, get_long_running_output, polling_method) # type: ignore
def _online_region_initial(
self,
resource_group_name: str,
account_name: str,
region_parameter_for_online: Union[_models.RegionForOnlineOffline, IO[bytes]],
**kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(region_parameter_for_online, (IOBase, bytes)):
_content = region_parameter_for_online
else:
_json = self._serialize.body(region_parameter_for_online, "RegionForOnlineOffline")
_request = build_online_region_request(
resource_group_name=resource_group_name,
account_name=account_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response)
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("azure-AsyncOperation")
)
response_headers["location"] = self._deserialize("str", response.headers.get("location"))
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
@overload
def begin_online_region(
self,
resource_group_name: str,
account_name: str,
region_parameter_for_online: _models.RegionForOnlineOffline,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[None]:
"""Online the specified region for the specified Azure Cosmos DB database account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param region_parameter_for_online: Cosmos DB region to online for the database account.
Required.
:type region_parameter_for_online: ~azure.mgmt.cosmosdb.models.RegionForOnlineOffline
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_online_region(
self,
resource_group_name: str,
account_name: str,
region_parameter_for_online: IO[bytes],
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[None]:
"""Online the specified region for the specified Azure Cosmos DB database account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param region_parameter_for_online: Cosmos DB region to online for the database account.
Required.
:type region_parameter_for_online: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def begin_online_region(
self,
resource_group_name: str,
account_name: str,
region_parameter_for_online: Union[_models.RegionForOnlineOffline, IO[bytes]],
**kwargs: Any
) -> LROPoller[None]:
"""Online the specified region for the specified Azure Cosmos DB database account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param region_parameter_for_online: Cosmos DB region to online for the database account. Is
either a RegionForOnlineOffline type or a IO[bytes] type. Required.
:type region_parameter_for_online: ~azure.mgmt.cosmosdb.models.RegionForOnlineOffline or
IO[bytes]
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[None] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._online_region_initial(
resource_group_name=resource_group_name,
account_name=account_name,
region_parameter_for_online=region_parameter_for_online,
api_version=api_version,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements
if cls:
return cls(pipeline_response, None, {}) # type: ignore
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[None].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[None](self._client, raw_result, get_long_running_output, polling_method) # type: ignore
[docs]
@distributed_trace
def get_read_only_keys(
self, resource_group_name: str, account_name: str, **kwargs: Any
) -> _models.DatabaseAccountListReadOnlyKeysResult:
"""Lists the read-only access keys for the specified Azure Cosmos DB database account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:return: DatabaseAccountListReadOnlyKeysResult or the result of cls(response)
:rtype: ~azure.mgmt.cosmosdb.models.DatabaseAccountListReadOnlyKeysResult
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.DatabaseAccountListReadOnlyKeysResult] = kwargs.pop("cls", None)
_request = build_get_read_only_keys_request(
resource_group_name=resource_group_name,
account_name=account_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
deserialized = self._deserialize("DatabaseAccountListReadOnlyKeysResult", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def list_read_only_keys(
self, resource_group_name: str, account_name: str, **kwargs: Any
) -> _models.DatabaseAccountListReadOnlyKeysResult:
"""Lists the read-only access keys for the specified Azure Cosmos DB database account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:return: DatabaseAccountListReadOnlyKeysResult or the result of cls(response)
:rtype: ~azure.mgmt.cosmosdb.models.DatabaseAccountListReadOnlyKeysResult
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.DatabaseAccountListReadOnlyKeysResult] = kwargs.pop("cls", None)
_request = build_list_read_only_keys_request(
resource_group_name=resource_group_name,
account_name=account_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
deserialized = self._deserialize("DatabaseAccountListReadOnlyKeysResult", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
def _regenerate_key_initial(
self,
resource_group_name: str,
account_name: str,
key_to_regenerate: Union[_models.DatabaseAccountRegenerateKeyParameters, IO[bytes]],
**kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(key_to_regenerate, (IOBase, bytes)):
_content = key_to_regenerate
else:
_json = self._serialize.body(key_to_regenerate, "DatabaseAccountRegenerateKeyParameters")
_request = build_regenerate_key_request(
resource_group_name=resource_group_name,
account_name=account_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("azure-AsyncOperation")
)
response_headers["location"] = self._deserialize("str", response.headers.get("location"))
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
@overload
def begin_regenerate_key(
self,
resource_group_name: str,
account_name: str,
key_to_regenerate: _models.DatabaseAccountRegenerateKeyParameters,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[None]:
"""Regenerates an access key for the specified Azure Cosmos DB database account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param key_to_regenerate: The name of the key to regenerate. Required.
:type key_to_regenerate: ~azure.mgmt.cosmosdb.models.DatabaseAccountRegenerateKeyParameters
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_regenerate_key(
self,
resource_group_name: str,
account_name: str,
key_to_regenerate: IO[bytes],
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[None]:
"""Regenerates an access key for the specified Azure Cosmos DB database account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param key_to_regenerate: The name of the key to regenerate. Required.
:type key_to_regenerate: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def begin_regenerate_key(
self,
resource_group_name: str,
account_name: str,
key_to_regenerate: Union[_models.DatabaseAccountRegenerateKeyParameters, IO[bytes]],
**kwargs: Any
) -> LROPoller[None]:
"""Regenerates an access key for the specified Azure Cosmos DB database account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param key_to_regenerate: The name of the key to regenerate. Is either a
DatabaseAccountRegenerateKeyParameters type or a IO[bytes] type. Required.
:type key_to_regenerate: ~azure.mgmt.cosmosdb.models.DatabaseAccountRegenerateKeyParameters or
IO[bytes]
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[None] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._regenerate_key_initial(
resource_group_name=resource_group_name,
account_name=account_name,
key_to_regenerate=key_to_regenerate,
api_version=api_version,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements
if cls:
return cls(pipeline_response, None, {}) # type: ignore
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[None].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[None](self._client, raw_result, get_long_running_output, polling_method) # type: ignore
[docs]
@distributed_trace
def check_name_exists(self, account_name: str, **kwargs: Any) -> bool:
"""Checks that the Azure Cosmos DB account name already exists. A valid account name may contain
only lowercase letters, numbers, and the '-' character, and must be between 3 and 50
characters.
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:return: bool or the result of cls(response)
:rtype: bool
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[None] = kwargs.pop("cls", None)
_request = build_check_name_exists_request(
account_name=account_name,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 404]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
if cls:
return cls(pipeline_response, None, {}) # type: ignore
return 200 <= response.status_code <= 299
[docs]
@distributed_trace
def list_metrics(
self, resource_group_name: str, account_name: str, filter: str, **kwargs: Any
) -> Iterable["_models.Metric"]:
"""Retrieves the metrics determined by the given filter for the given database account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param filter: An OData filter expression that describes a subset of metrics to return. The
parameters that can be filtered are name.value (name of the metric, can have an or of multiple
names), startTime, endTime, and timeGrain. The supported operator is eq. Required.
:type filter: str
:return: An iterator like instance of either Metric or the result of cls(response)
:rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.cosmosdb.models.Metric]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.MetricListResult] = kwargs.pop("cls", None)
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
_request = build_list_metrics_request(
resource_group_name=resource_group_name,
account_name=account_name,
subscription_id=self._config.subscription_id,
filter=filter,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
_request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
_request.url = self._client.format_url(_request.url)
_request.method = "GET"
return _request
def extract_data(pipeline_response):
deserialized = self._deserialize("MetricListResult", pipeline_response)
list_of_elem = deserialized.value
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return None, iter(list_of_elem)
def get_next(next_link=None):
_request = prepare_request(next_link)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
return pipeline_response
return ItemPaged(get_next, extract_data)
[docs]
@distributed_trace
def list_usages(
self, resource_group_name: str, account_name: str, filter: Optional[str] = None, **kwargs: Any
) -> Iterable["_models.Usage"]:
"""Retrieves the usages (most recent data) for the given database account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param filter: An OData filter expression that describes a subset of usages to return. The
supported parameter is name.value (name of the metric, can have an or of multiple names).
Default value is None.
:type filter: str
:return: An iterator like instance of either Usage or the result of cls(response)
:rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.cosmosdb.models.Usage]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.UsagesResult] = kwargs.pop("cls", None)
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
_request = build_list_usages_request(
resource_group_name=resource_group_name,
account_name=account_name,
subscription_id=self._config.subscription_id,
filter=filter,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
_request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
_request.url = self._client.format_url(_request.url)
_request.method = "GET"
return _request
def extract_data(pipeline_response):
deserialized = self._deserialize("UsagesResult", pipeline_response)
list_of_elem = deserialized.value
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return None, iter(list_of_elem)
def get_next(next_link=None):
_request = prepare_request(next_link)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
return pipeline_response
return ItemPaged(get_next, extract_data)
[docs]
@distributed_trace
def list_metric_definitions(
self, resource_group_name: str, account_name: str, **kwargs: Any
) -> Iterable["_models.MetricDefinition"]:
"""Retrieves metric definitions for the given database account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:return: An iterator like instance of either MetricDefinition or the result of cls(response)
:rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.cosmosdb.models.MetricDefinition]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.MetricDefinitionsListResult] = kwargs.pop("cls", None)
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
_request = build_list_metric_definitions_request(
resource_group_name=resource_group_name,
account_name=account_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
_request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
_request.url = self._client.format_url(_request.url)
_request.method = "GET"
return _request
def extract_data(pipeline_response):
deserialized = self._deserialize("MetricDefinitionsListResult", pipeline_response)
list_of_elem = deserialized.value
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return None, iter(list_of_elem)
def get_next(next_link=None):
_request = prepare_request(next_link)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
return pipeline_response
return ItemPaged(get_next, extract_data)