# pylint: disable=line-too-long,useless-suppression,too-many-lines
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
from collections.abc import MutableMapping
from io import IOBase
from typing import Any, Callable, Dict, IO, Iterable, Iterator, Optional, TypeVar, Union, cast, overload
import urllib.parse
from azure.core import PipelineClient
from azure.core.exceptions import (
ClientAuthenticationError,
HttpResponseError,
ResourceExistsError,
ResourceNotFoundError,
ResourceNotModifiedError,
StreamClosedError,
StreamConsumedError,
map_error,
)
from azure.core.paging import ItemPaged
from azure.core.pipeline import PipelineResponse
from azure.core.polling import LROPoller, NoPolling, PollingMethod
from azure.core.rest import HttpRequest, HttpResponse
from azure.core.tracing.decorator import distributed_trace
from azure.core.utils import case_insensitive_dict
from azure.mgmt.core.exceptions import ARMErrorFormat
from azure.mgmt.core.polling.arm_polling import ARMPolling
from .. import models as _models
from .._configuration import CosmosDBManagementClientConfiguration
from .._utils.serialization import Deserializer, Serializer
T = TypeVar("T")
ClsType = Optional[Callable[[PipelineResponse[HttpRequest, HttpResponse], T, Dict[str, Any]], Any]]
_SERIALIZER = Serializer()
_SERIALIZER.client_side_validation = False
def build_list_sql_databases_request(
resource_group_name: str, account_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlDatabases",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_get_sql_database_request(
resource_group_name: str, account_name: str, database_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlDatabases/{databaseName}",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"databaseName": _SERIALIZER.url("database_name", database_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_create_update_sql_database_request(
resource_group_name: str, account_name: str, database_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlDatabases/{databaseName}",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"databaseName": _SERIALIZER.url("database_name", database_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="PUT", url=_url, params=_params, headers=_headers, **kwargs)
def build_delete_sql_database_request(
resource_group_name: str, account_name: str, database_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlDatabases/{databaseName}",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"databaseName": _SERIALIZER.url("database_name", database_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
return HttpRequest(method="DELETE", url=_url, params=_params, **kwargs)
def build_get_sql_database_throughput_request( # pylint: disable=name-too-long
resource_group_name: str, account_name: str, database_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlDatabases/{databaseName}/throughputSettings/default",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"databaseName": _SERIALIZER.url("database_name", database_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_update_sql_database_throughput_request( # pylint: disable=name-too-long
resource_group_name: str, account_name: str, database_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlDatabases/{databaseName}/throughputSettings/default",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"databaseName": _SERIALIZER.url("database_name", database_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="PUT", url=_url, params=_params, headers=_headers, **kwargs)
def build_migrate_sql_database_to_autoscale_request( # pylint: disable=name-too-long
resource_group_name: str, account_name: str, database_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlDatabases/{databaseName}/throughputSettings/default/migrateToAutoscale",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"databaseName": _SERIALIZER.url("database_name", database_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_migrate_sql_database_to_manual_throughput_request( # pylint: disable=name-too-long
resource_group_name: str, account_name: str, database_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlDatabases/{databaseName}/throughputSettings/default/migrateToManualThroughput",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"databaseName": _SERIALIZER.url("database_name", database_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_list_sql_containers_request(
resource_group_name: str, account_name: str, database_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlDatabases/{databaseName}/containers",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"databaseName": _SERIALIZER.url("database_name", database_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_get_sql_container_request(
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
subscription_id: str,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlDatabases/{databaseName}/containers/{containerName}",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"databaseName": _SERIALIZER.url("database_name", database_name, "str"),
"containerName": _SERIALIZER.url("container_name", container_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_create_update_sql_container_request( # pylint: disable=name-too-long
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
subscription_id: str,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlDatabases/{databaseName}/containers/{containerName}",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"databaseName": _SERIALIZER.url("database_name", database_name, "str"),
"containerName": _SERIALIZER.url("container_name", container_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="PUT", url=_url, params=_params, headers=_headers, **kwargs)
def build_delete_sql_container_request(
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
subscription_id: str,
**kwargs: Any
) -> HttpRequest:
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlDatabases/{databaseName}/containers/{containerName}",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"databaseName": _SERIALIZER.url("database_name", database_name, "str"),
"containerName": _SERIALIZER.url("container_name", container_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
return HttpRequest(method="DELETE", url=_url, params=_params, **kwargs)
def build_get_sql_container_throughput_request( # pylint: disable=name-too-long
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
subscription_id: str,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlDatabases/{databaseName}/containers/{containerName}/throughputSettings/default",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"databaseName": _SERIALIZER.url("database_name", database_name, "str"),
"containerName": _SERIALIZER.url("container_name", container_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_update_sql_container_throughput_request( # pylint: disable=name-too-long
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
subscription_id: str,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlDatabases/{databaseName}/containers/{containerName}/throughputSettings/default",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"databaseName": _SERIALIZER.url("database_name", database_name, "str"),
"containerName": _SERIALIZER.url("container_name", container_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="PUT", url=_url, params=_params, headers=_headers, **kwargs)
def build_migrate_sql_container_to_autoscale_request( # pylint: disable=name-too-long
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
subscription_id: str,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlDatabases/{databaseName}/containers/{containerName}/throughputSettings/default/migrateToAutoscale",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"databaseName": _SERIALIZER.url("database_name", database_name, "str"),
"containerName": _SERIALIZER.url("container_name", container_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_migrate_sql_container_to_manual_throughput_request( # pylint: disable=name-too-long
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
subscription_id: str,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlDatabases/{databaseName}/containers/{containerName}/throughputSettings/default/migrateToManualThroughput",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"databaseName": _SERIALIZER.url("database_name", database_name, "str"),
"containerName": _SERIALIZER.url("container_name", container_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_list_client_encryption_keys_request( # pylint: disable=name-too-long
resource_group_name: str, account_name: str, database_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlDatabases/{databaseName}/clientEncryptionKeys",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"databaseName": _SERIALIZER.url("database_name", database_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_get_client_encryption_key_request(
resource_group_name: str,
account_name: str,
database_name: str,
client_encryption_key_name: str,
subscription_id: str,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlDatabases/{databaseName}/clientEncryptionKeys/{clientEncryptionKeyName}",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"databaseName": _SERIALIZER.url("database_name", database_name, "str"),
"clientEncryptionKeyName": _SERIALIZER.url("client_encryption_key_name", client_encryption_key_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_create_update_client_encryption_key_request( # pylint: disable=name-too-long
resource_group_name: str,
account_name: str,
database_name: str,
client_encryption_key_name: str,
subscription_id: str,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlDatabases/{databaseName}/clientEncryptionKeys/{clientEncryptionKeyName}",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"databaseName": _SERIALIZER.url("database_name", database_name, "str"),
"clientEncryptionKeyName": _SERIALIZER.url("client_encryption_key_name", client_encryption_key_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="PUT", url=_url, params=_params, headers=_headers, **kwargs)
def build_list_sql_stored_procedures_request(
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
subscription_id: str,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlDatabases/{databaseName}/containers/{containerName}/storedProcedures",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"databaseName": _SERIALIZER.url("database_name", database_name, "str"),
"containerName": _SERIALIZER.url("container_name", container_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_get_sql_stored_procedure_request(
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
stored_procedure_name: str,
subscription_id: str,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlDatabases/{databaseName}/containers/{containerName}/storedProcedures/{storedProcedureName}",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"databaseName": _SERIALIZER.url("database_name", database_name, "str"),
"containerName": _SERIALIZER.url("container_name", container_name, "str"),
"storedProcedureName": _SERIALIZER.url("stored_procedure_name", stored_procedure_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_create_update_sql_stored_procedure_request( # pylint: disable=name-too-long
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
stored_procedure_name: str,
subscription_id: str,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlDatabases/{databaseName}/containers/{containerName}/storedProcedures/{storedProcedureName}",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"databaseName": _SERIALIZER.url("database_name", database_name, "str"),
"containerName": _SERIALIZER.url("container_name", container_name, "str"),
"storedProcedureName": _SERIALIZER.url("stored_procedure_name", stored_procedure_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="PUT", url=_url, params=_params, headers=_headers, **kwargs)
def build_delete_sql_stored_procedure_request( # pylint: disable=name-too-long
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
stored_procedure_name: str,
subscription_id: str,
**kwargs: Any
) -> HttpRequest:
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlDatabases/{databaseName}/containers/{containerName}/storedProcedures/{storedProcedureName}",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"databaseName": _SERIALIZER.url("database_name", database_name, "str"),
"containerName": _SERIALIZER.url("container_name", container_name, "str"),
"storedProcedureName": _SERIALIZER.url("stored_procedure_name", stored_procedure_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
return HttpRequest(method="DELETE", url=_url, params=_params, **kwargs)
def build_list_sql_user_defined_functions_request( # pylint: disable=name-too-long
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
subscription_id: str,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlDatabases/{databaseName}/containers/{containerName}/userDefinedFunctions",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"databaseName": _SERIALIZER.url("database_name", database_name, "str"),
"containerName": _SERIALIZER.url("container_name", container_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_get_sql_user_defined_function_request( # pylint: disable=name-too-long
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
user_defined_function_name: str,
subscription_id: str,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlDatabases/{databaseName}/containers/{containerName}/userDefinedFunctions/{userDefinedFunctionName}",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"databaseName": _SERIALIZER.url("database_name", database_name, "str"),
"containerName": _SERIALIZER.url("container_name", container_name, "str"),
"userDefinedFunctionName": _SERIALIZER.url("user_defined_function_name", user_defined_function_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_create_update_sql_user_defined_function_request( # pylint: disable=name-too-long
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
user_defined_function_name: str,
subscription_id: str,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlDatabases/{databaseName}/containers/{containerName}/userDefinedFunctions/{userDefinedFunctionName}",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"databaseName": _SERIALIZER.url("database_name", database_name, "str"),
"containerName": _SERIALIZER.url("container_name", container_name, "str"),
"userDefinedFunctionName": _SERIALIZER.url("user_defined_function_name", user_defined_function_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="PUT", url=_url, params=_params, headers=_headers, **kwargs)
def build_delete_sql_user_defined_function_request( # pylint: disable=name-too-long
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
user_defined_function_name: str,
subscription_id: str,
**kwargs: Any
) -> HttpRequest:
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlDatabases/{databaseName}/containers/{containerName}/userDefinedFunctions/{userDefinedFunctionName}",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"databaseName": _SERIALIZER.url("database_name", database_name, "str"),
"containerName": _SERIALIZER.url("container_name", container_name, "str"),
"userDefinedFunctionName": _SERIALIZER.url("user_defined_function_name", user_defined_function_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
return HttpRequest(method="DELETE", url=_url, params=_params, **kwargs)
def build_list_sql_triggers_request(
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
subscription_id: str,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlDatabases/{databaseName}/containers/{containerName}/triggers",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"databaseName": _SERIALIZER.url("database_name", database_name, "str"),
"containerName": _SERIALIZER.url("container_name", container_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_get_sql_trigger_request(
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
trigger_name: str,
subscription_id: str,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlDatabases/{databaseName}/containers/{containerName}/triggers/{triggerName}",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"databaseName": _SERIALIZER.url("database_name", database_name, "str"),
"containerName": _SERIALIZER.url("container_name", container_name, "str"),
"triggerName": _SERIALIZER.url("trigger_name", trigger_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_create_update_sql_trigger_request(
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
trigger_name: str,
subscription_id: str,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlDatabases/{databaseName}/containers/{containerName}/triggers/{triggerName}",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"databaseName": _SERIALIZER.url("database_name", database_name, "str"),
"containerName": _SERIALIZER.url("container_name", container_name, "str"),
"triggerName": _SERIALIZER.url("trigger_name", trigger_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="PUT", url=_url, params=_params, headers=_headers, **kwargs)
def build_delete_sql_trigger_request(
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
trigger_name: str,
subscription_id: str,
**kwargs: Any
) -> HttpRequest:
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlDatabases/{databaseName}/containers/{containerName}/triggers/{triggerName}",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"databaseName": _SERIALIZER.url("database_name", database_name, "str"),
"containerName": _SERIALIZER.url("container_name", container_name, "str"),
"triggerName": _SERIALIZER.url("trigger_name", trigger_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
return HttpRequest(method="DELETE", url=_url, params=_params, **kwargs)
def build_get_sql_role_definition_request(
role_definition_id: str, resource_group_name: str, account_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlRoleDefinitions/{roleDefinitionId}",
)
path_format_arguments = {
"roleDefinitionId": _SERIALIZER.url("role_definition_id", role_definition_id, "str"),
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_create_update_sql_role_definition_request( # pylint: disable=name-too-long
role_definition_id: str, resource_group_name: str, account_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlRoleDefinitions/{roleDefinitionId}",
)
path_format_arguments = {
"roleDefinitionId": _SERIALIZER.url("role_definition_id", role_definition_id, "str"),
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="PUT", url=_url, params=_params, headers=_headers, **kwargs)
def build_delete_sql_role_definition_request(
role_definition_id: str, resource_group_name: str, account_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlRoleDefinitions/{roleDefinitionId}",
)
path_format_arguments = {
"roleDefinitionId": _SERIALIZER.url("role_definition_id", role_definition_id, "str"),
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="DELETE", url=_url, params=_params, headers=_headers, **kwargs)
def build_list_sql_role_definitions_request(
resource_group_name: str, account_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlRoleDefinitions",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_get_sql_role_assignment_request(
role_assignment_id: str, resource_group_name: str, account_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlRoleAssignments/{roleAssignmentId}",
)
path_format_arguments = {
"roleAssignmentId": _SERIALIZER.url("role_assignment_id", role_assignment_id, "str"),
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_create_update_sql_role_assignment_request( # pylint: disable=name-too-long
role_assignment_id: str, resource_group_name: str, account_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlRoleAssignments/{roleAssignmentId}",
)
path_format_arguments = {
"roleAssignmentId": _SERIALIZER.url("role_assignment_id", role_assignment_id, "str"),
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="PUT", url=_url, params=_params, headers=_headers, **kwargs)
def build_delete_sql_role_assignment_request(
role_assignment_id: str, resource_group_name: str, account_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlRoleAssignments/{roleAssignmentId}",
)
path_format_arguments = {
"roleAssignmentId": _SERIALIZER.url("role_assignment_id", role_assignment_id, "str"),
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="DELETE", url=_url, params=_params, headers=_headers, **kwargs)
def build_list_sql_role_assignments_request(
resource_group_name: str, account_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlRoleAssignments",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_retrieve_continuous_backup_information_request( # pylint: disable=name-too-long
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
subscription_id: str,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/sqlDatabases/{databaseName}/containers/{containerName}/retrieveContinuousBackupInformation",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"databaseName": _SERIALIZER.url("database_name", database_name, "str"),
"containerName": _SERIALIZER.url("container_name", container_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
[docs]
class SqlResourcesOperations: # pylint: disable=too-many-public-methods
"""
.. warning::
**DO NOT** instantiate this class directly.
Instead, you should access the following operations through
:class:`~azure.mgmt.cosmosdb.CosmosDBManagementClient`'s
:attr:`sql_resources` attribute.
"""
models = _models
def __init__(self, *args, **kwargs):
input_args = list(args)
self._client: PipelineClient = input_args.pop(0) if input_args else kwargs.pop("client")
self._config: CosmosDBManagementClientConfiguration = input_args.pop(0) if input_args else kwargs.pop("config")
self._serialize: Serializer = input_args.pop(0) if input_args else kwargs.pop("serializer")
self._deserialize: Deserializer = input_args.pop(0) if input_args else kwargs.pop("deserializer")
[docs]
@distributed_trace
def list_sql_databases(
self, resource_group_name: str, account_name: str, **kwargs: Any
) -> Iterable["_models.SqlDatabaseGetResults"]:
"""Lists the SQL databases under an existing Azure Cosmos DB database account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:return: An iterator like instance of either SqlDatabaseGetResults or the result of
cls(response)
:rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.cosmosdb.models.SqlDatabaseGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.SqlDatabaseListResult] = kwargs.pop("cls", None)
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
_request = build_list_sql_databases_request(
resource_group_name=resource_group_name,
account_name=account_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
_request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
_request.url = self._client.format_url(_request.url)
_request.method = "GET"
return _request
def extract_data(pipeline_response):
deserialized = self._deserialize("SqlDatabaseListResult", pipeline_response)
list_of_elem = deserialized.value
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return None, iter(list_of_elem)
def get_next(next_link=None):
_request = prepare_request(next_link)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
return pipeline_response
return ItemPaged(get_next, extract_data)
[docs]
@distributed_trace
def get_sql_database(
self, resource_group_name: str, account_name: str, database_name: str, **kwargs: Any
) -> _models.SqlDatabaseGetResults:
"""Gets the SQL database under an existing Azure Cosmos DB database account with the provided
name.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:return: SqlDatabaseGetResults or the result of cls(response)
:rtype: ~azure.mgmt.cosmosdb.models.SqlDatabaseGetResults
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.SqlDatabaseGetResults] = kwargs.pop("cls", None)
_request = build_get_sql_database_request(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
deserialized = self._deserialize("SqlDatabaseGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
def _create_update_sql_database_initial(
self,
resource_group_name: str,
account_name: str,
database_name: str,
create_update_sql_database_parameters: Union[_models.SqlDatabaseCreateUpdateParameters, IO[bytes]],
**kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(create_update_sql_database_parameters, (IOBase, bytes)):
_content = create_update_sql_database_parameters
else:
_json = self._serialize.body(create_update_sql_database_parameters, "SqlDatabaseCreateUpdateParameters")
_request = build_create_update_sql_database_request(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("azure-AsyncOperation")
)
response_headers["location"] = self._deserialize("str", response.headers.get("location"))
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
@overload
def begin_create_update_sql_database(
self,
resource_group_name: str,
account_name: str,
database_name: str,
create_update_sql_database_parameters: _models.SqlDatabaseCreateUpdateParameters,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.SqlDatabaseGetResults]:
"""Create or update an Azure Cosmos DB SQL database.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param create_update_sql_database_parameters: The parameters to provide for the current SQL
database. Required.
:type create_update_sql_database_parameters:
~azure.mgmt.cosmosdb.models.SqlDatabaseCreateUpdateParameters
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either SqlDatabaseGetResults or the result of
cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.SqlDatabaseGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_create_update_sql_database(
self,
resource_group_name: str,
account_name: str,
database_name: str,
create_update_sql_database_parameters: IO[bytes],
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.SqlDatabaseGetResults]:
"""Create or update an Azure Cosmos DB SQL database.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param create_update_sql_database_parameters: The parameters to provide for the current SQL
database. Required.
:type create_update_sql_database_parameters: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either SqlDatabaseGetResults or the result of
cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.SqlDatabaseGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def begin_create_update_sql_database(
self,
resource_group_name: str,
account_name: str,
database_name: str,
create_update_sql_database_parameters: Union[_models.SqlDatabaseCreateUpdateParameters, IO[bytes]],
**kwargs: Any
) -> LROPoller[_models.SqlDatabaseGetResults]:
"""Create or update an Azure Cosmos DB SQL database.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param create_update_sql_database_parameters: The parameters to provide for the current SQL
database. Is either a SqlDatabaseCreateUpdateParameters type or a IO[bytes] type. Required.
:type create_update_sql_database_parameters:
~azure.mgmt.cosmosdb.models.SqlDatabaseCreateUpdateParameters or IO[bytes]
:return: An instance of LROPoller that returns either SqlDatabaseGetResults or the result of
cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.SqlDatabaseGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.SqlDatabaseGetResults] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._create_update_sql_database_initial(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
create_update_sql_database_parameters=create_update_sql_database_parameters,
api_version=api_version,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response):
deserialized = self._deserialize("SqlDatabaseGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[_models.SqlDatabaseGetResults].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[_models.SqlDatabaseGetResults](
self._client, raw_result, get_long_running_output, polling_method # type: ignore
)
def _delete_sql_database_initial(
self, resource_group_name: str, account_name: str, database_name: str, **kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
_request = build_delete_sql_database_request(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [202, 204]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("azure-AsyncOperation")
)
response_headers["location"] = self._deserialize("str", response.headers.get("location"))
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def begin_delete_sql_database(
self, resource_group_name: str, account_name: str, database_name: str, **kwargs: Any
) -> LROPoller[None]:
"""Deletes an existing Azure Cosmos DB SQL database.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[None] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._delete_sql_database_initial(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
api_version=api_version,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements
if cls:
return cls(pipeline_response, None, {}) # type: ignore
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[None].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[None](self._client, raw_result, get_long_running_output, polling_method) # type: ignore
[docs]
@distributed_trace
def get_sql_database_throughput(
self, resource_group_name: str, account_name: str, database_name: str, **kwargs: Any
) -> _models.ThroughputSettingsGetResults:
"""Gets the RUs per second of the SQL database under an existing Azure Cosmos DB database account
with the provided name.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:return: ThroughputSettingsGetResults or the result of cls(response)
:rtype: ~azure.mgmt.cosmosdb.models.ThroughputSettingsGetResults
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.ThroughputSettingsGetResults] = kwargs.pop("cls", None)
_request = build_get_sql_database_throughput_request(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
deserialized = self._deserialize("ThroughputSettingsGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
def _update_sql_database_throughput_initial(
self,
resource_group_name: str,
account_name: str,
database_name: str,
update_throughput_parameters: Union[_models.ThroughputSettingsUpdateParameters, IO[bytes]],
**kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(update_throughput_parameters, (IOBase, bytes)):
_content = update_throughput_parameters
else:
_json = self._serialize.body(update_throughput_parameters, "ThroughputSettingsUpdateParameters")
_request = build_update_sql_database_throughput_request(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("azure-AsyncOperation")
)
response_headers["location"] = self._deserialize("str", response.headers.get("location"))
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
@overload
def begin_update_sql_database_throughput(
self,
resource_group_name: str,
account_name: str,
database_name: str,
update_throughput_parameters: _models.ThroughputSettingsUpdateParameters,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.ThroughputSettingsGetResults]:
"""Update RUs per second of an Azure Cosmos DB SQL database.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param update_throughput_parameters: The parameters to provide for the RUs per second of the
current SQL database. Required.
:type update_throughput_parameters:
~azure.mgmt.cosmosdb.models.ThroughputSettingsUpdateParameters
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either ThroughputSettingsGetResults or the
result of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ThroughputSettingsGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_update_sql_database_throughput(
self,
resource_group_name: str,
account_name: str,
database_name: str,
update_throughput_parameters: IO[bytes],
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.ThroughputSettingsGetResults]:
"""Update RUs per second of an Azure Cosmos DB SQL database.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param update_throughput_parameters: The parameters to provide for the RUs per second of the
current SQL database. Required.
:type update_throughput_parameters: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either ThroughputSettingsGetResults or the
result of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ThroughputSettingsGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def begin_update_sql_database_throughput(
self,
resource_group_name: str,
account_name: str,
database_name: str,
update_throughput_parameters: Union[_models.ThroughputSettingsUpdateParameters, IO[bytes]],
**kwargs: Any
) -> LROPoller[_models.ThroughputSettingsGetResults]:
"""Update RUs per second of an Azure Cosmos DB SQL database.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param update_throughput_parameters: The parameters to provide for the RUs per second of the
current SQL database. Is either a ThroughputSettingsUpdateParameters type or a IO[bytes] type.
Required.
:type update_throughput_parameters:
~azure.mgmt.cosmosdb.models.ThroughputSettingsUpdateParameters or IO[bytes]
:return: An instance of LROPoller that returns either ThroughputSettingsGetResults or the
result of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ThroughputSettingsGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.ThroughputSettingsGetResults] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._update_sql_database_throughput_initial(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
update_throughput_parameters=update_throughput_parameters,
api_version=api_version,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response):
deserialized = self._deserialize("ThroughputSettingsGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[_models.ThroughputSettingsGetResults].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[_models.ThroughputSettingsGetResults](
self._client, raw_result, get_long_running_output, polling_method # type: ignore
)
def _migrate_sql_database_to_autoscale_initial( # pylint: disable=name-too-long
self, resource_group_name: str, account_name: str, database_name: str, **kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
_request = build_migrate_sql_database_to_autoscale_request(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("azure-AsyncOperation")
)
response_headers["location"] = self._deserialize("str", response.headers.get("location"))
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def begin_migrate_sql_database_to_autoscale(
self, resource_group_name: str, account_name: str, database_name: str, **kwargs: Any
) -> LROPoller[_models.ThroughputSettingsGetResults]:
"""Migrate an Azure Cosmos DB SQL database from manual throughput to autoscale.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:return: An instance of LROPoller that returns either ThroughputSettingsGetResults or the
result of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ThroughputSettingsGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.ThroughputSettingsGetResults] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._migrate_sql_database_to_autoscale_initial(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
api_version=api_version,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response):
deserialized = self._deserialize("ThroughputSettingsGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[_models.ThroughputSettingsGetResults].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[_models.ThroughputSettingsGetResults](
self._client, raw_result, get_long_running_output, polling_method # type: ignore
)
def _migrate_sql_database_to_manual_throughput_initial( # pylint: disable=name-too-long
self, resource_group_name: str, account_name: str, database_name: str, **kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
_request = build_migrate_sql_database_to_manual_throughput_request(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("azure-AsyncOperation")
)
response_headers["location"] = self._deserialize("str", response.headers.get("location"))
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def begin_migrate_sql_database_to_manual_throughput( # pylint: disable=name-too-long
self, resource_group_name: str, account_name: str, database_name: str, **kwargs: Any
) -> LROPoller[_models.ThroughputSettingsGetResults]:
"""Migrate an Azure Cosmos DB SQL database from autoscale to manual throughput.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:return: An instance of LROPoller that returns either ThroughputSettingsGetResults or the
result of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ThroughputSettingsGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.ThroughputSettingsGetResults] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._migrate_sql_database_to_manual_throughput_initial(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
api_version=api_version,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response):
deserialized = self._deserialize("ThroughputSettingsGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[_models.ThroughputSettingsGetResults].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[_models.ThroughputSettingsGetResults](
self._client, raw_result, get_long_running_output, polling_method # type: ignore
)
[docs]
@distributed_trace
def list_sql_containers(
self, resource_group_name: str, account_name: str, database_name: str, **kwargs: Any
) -> Iterable["_models.SqlContainerGetResults"]:
"""Lists the SQL container under an existing Azure Cosmos DB database account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:return: An iterator like instance of either SqlContainerGetResults or the result of
cls(response)
:rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.cosmosdb.models.SqlContainerGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.SqlContainerListResult] = kwargs.pop("cls", None)
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
_request = build_list_sql_containers_request(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
_request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
_request.url = self._client.format_url(_request.url)
_request.method = "GET"
return _request
def extract_data(pipeline_response):
deserialized = self._deserialize("SqlContainerListResult", pipeline_response)
list_of_elem = deserialized.value
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return None, iter(list_of_elem)
def get_next(next_link=None):
_request = prepare_request(next_link)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
return pipeline_response
return ItemPaged(get_next, extract_data)
[docs]
@distributed_trace
def get_sql_container(
self, resource_group_name: str, account_name: str, database_name: str, container_name: str, **kwargs: Any
) -> _models.SqlContainerGetResults:
"""Gets the SQL container under an existing Azure Cosmos DB database account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param container_name: Cosmos DB container name. Required.
:type container_name: str
:return: SqlContainerGetResults or the result of cls(response)
:rtype: ~azure.mgmt.cosmosdb.models.SqlContainerGetResults
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.SqlContainerGetResults] = kwargs.pop("cls", None)
_request = build_get_sql_container_request(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
container_name=container_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
deserialized = self._deserialize("SqlContainerGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
def _create_update_sql_container_initial(
self,
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
create_update_sql_container_parameters: Union[_models.SqlContainerCreateUpdateParameters, IO[bytes]],
**kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(create_update_sql_container_parameters, (IOBase, bytes)):
_content = create_update_sql_container_parameters
else:
_json = self._serialize.body(create_update_sql_container_parameters, "SqlContainerCreateUpdateParameters")
_request = build_create_update_sql_container_request(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
container_name=container_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("azure-AsyncOperation")
)
response_headers["location"] = self._deserialize("str", response.headers.get("location"))
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
@overload
def begin_create_update_sql_container(
self,
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
create_update_sql_container_parameters: _models.SqlContainerCreateUpdateParameters,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.SqlContainerGetResults]:
"""Create or update an Azure Cosmos DB SQL container.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param container_name: Cosmos DB container name. Required.
:type container_name: str
:param create_update_sql_container_parameters: The parameters to provide for the current SQL
container. Required.
:type create_update_sql_container_parameters:
~azure.mgmt.cosmosdb.models.SqlContainerCreateUpdateParameters
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either SqlContainerGetResults or the result of
cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.SqlContainerGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_create_update_sql_container(
self,
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
create_update_sql_container_parameters: IO[bytes],
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.SqlContainerGetResults]:
"""Create or update an Azure Cosmos DB SQL container.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param container_name: Cosmos DB container name. Required.
:type container_name: str
:param create_update_sql_container_parameters: The parameters to provide for the current SQL
container. Required.
:type create_update_sql_container_parameters: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either SqlContainerGetResults or the result of
cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.SqlContainerGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def begin_create_update_sql_container(
self,
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
create_update_sql_container_parameters: Union[_models.SqlContainerCreateUpdateParameters, IO[bytes]],
**kwargs: Any
) -> LROPoller[_models.SqlContainerGetResults]:
"""Create or update an Azure Cosmos DB SQL container.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param container_name: Cosmos DB container name. Required.
:type container_name: str
:param create_update_sql_container_parameters: The parameters to provide for the current SQL
container. Is either a SqlContainerCreateUpdateParameters type or a IO[bytes] type. Required.
:type create_update_sql_container_parameters:
~azure.mgmt.cosmosdb.models.SqlContainerCreateUpdateParameters or IO[bytes]
:return: An instance of LROPoller that returns either SqlContainerGetResults or the result of
cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.SqlContainerGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.SqlContainerGetResults] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._create_update_sql_container_initial(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
container_name=container_name,
create_update_sql_container_parameters=create_update_sql_container_parameters,
api_version=api_version,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response):
deserialized = self._deserialize("SqlContainerGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[_models.SqlContainerGetResults].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[_models.SqlContainerGetResults](
self._client, raw_result, get_long_running_output, polling_method # type: ignore
)
def _delete_sql_container_initial(
self, resource_group_name: str, account_name: str, database_name: str, container_name: str, **kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
_request = build_delete_sql_container_request(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
container_name=container_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [202, 204]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("azure-AsyncOperation")
)
response_headers["location"] = self._deserialize("str", response.headers.get("location"))
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def begin_delete_sql_container(
self, resource_group_name: str, account_name: str, database_name: str, container_name: str, **kwargs: Any
) -> LROPoller[None]:
"""Deletes an existing Azure Cosmos DB SQL container.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param container_name: Cosmos DB container name. Required.
:type container_name: str
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[None] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._delete_sql_container_initial(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
container_name=container_name,
api_version=api_version,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements
if cls:
return cls(pipeline_response, None, {}) # type: ignore
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[None].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[None](self._client, raw_result, get_long_running_output, polling_method) # type: ignore
[docs]
@distributed_trace
def get_sql_container_throughput(
self, resource_group_name: str, account_name: str, database_name: str, container_name: str, **kwargs: Any
) -> _models.ThroughputSettingsGetResults:
"""Gets the RUs per second of the SQL container under an existing Azure Cosmos DB database
account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param container_name: Cosmos DB container name. Required.
:type container_name: str
:return: ThroughputSettingsGetResults or the result of cls(response)
:rtype: ~azure.mgmt.cosmosdb.models.ThroughputSettingsGetResults
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.ThroughputSettingsGetResults] = kwargs.pop("cls", None)
_request = build_get_sql_container_throughput_request(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
container_name=container_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
deserialized = self._deserialize("ThroughputSettingsGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
def _update_sql_container_throughput_initial(
self,
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
update_throughput_parameters: Union[_models.ThroughputSettingsUpdateParameters, IO[bytes]],
**kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(update_throughput_parameters, (IOBase, bytes)):
_content = update_throughput_parameters
else:
_json = self._serialize.body(update_throughput_parameters, "ThroughputSettingsUpdateParameters")
_request = build_update_sql_container_throughput_request(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
container_name=container_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("azure-AsyncOperation")
)
response_headers["location"] = self._deserialize("str", response.headers.get("location"))
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
@overload
def begin_update_sql_container_throughput(
self,
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
update_throughput_parameters: _models.ThroughputSettingsUpdateParameters,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.ThroughputSettingsGetResults]:
"""Update RUs per second of an Azure Cosmos DB SQL container.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param container_name: Cosmos DB container name. Required.
:type container_name: str
:param update_throughput_parameters: The parameters to provide for the RUs per second of the
current SQL container. Required.
:type update_throughput_parameters:
~azure.mgmt.cosmosdb.models.ThroughputSettingsUpdateParameters
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either ThroughputSettingsGetResults or the
result of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ThroughputSettingsGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_update_sql_container_throughput(
self,
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
update_throughput_parameters: IO[bytes],
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.ThroughputSettingsGetResults]:
"""Update RUs per second of an Azure Cosmos DB SQL container.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param container_name: Cosmos DB container name. Required.
:type container_name: str
:param update_throughput_parameters: The parameters to provide for the RUs per second of the
current SQL container. Required.
:type update_throughput_parameters: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either ThroughputSettingsGetResults or the
result of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ThroughputSettingsGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def begin_update_sql_container_throughput(
self,
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
update_throughput_parameters: Union[_models.ThroughputSettingsUpdateParameters, IO[bytes]],
**kwargs: Any
) -> LROPoller[_models.ThroughputSettingsGetResults]:
"""Update RUs per second of an Azure Cosmos DB SQL container.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param container_name: Cosmos DB container name. Required.
:type container_name: str
:param update_throughput_parameters: The parameters to provide for the RUs per second of the
current SQL container. Is either a ThroughputSettingsUpdateParameters type or a IO[bytes] type.
Required.
:type update_throughput_parameters:
~azure.mgmt.cosmosdb.models.ThroughputSettingsUpdateParameters or IO[bytes]
:return: An instance of LROPoller that returns either ThroughputSettingsGetResults or the
result of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ThroughputSettingsGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.ThroughputSettingsGetResults] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._update_sql_container_throughput_initial(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
container_name=container_name,
update_throughput_parameters=update_throughput_parameters,
api_version=api_version,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response):
deserialized = self._deserialize("ThroughputSettingsGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[_models.ThroughputSettingsGetResults].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[_models.ThroughputSettingsGetResults](
self._client, raw_result, get_long_running_output, polling_method # type: ignore
)
def _migrate_sql_container_to_autoscale_initial( # pylint: disable=name-too-long
self, resource_group_name: str, account_name: str, database_name: str, container_name: str, **kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
_request = build_migrate_sql_container_to_autoscale_request(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
container_name=container_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("azure-AsyncOperation")
)
response_headers["location"] = self._deserialize("str", response.headers.get("location"))
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def begin_migrate_sql_container_to_autoscale(
self, resource_group_name: str, account_name: str, database_name: str, container_name: str, **kwargs: Any
) -> LROPoller[_models.ThroughputSettingsGetResults]:
"""Migrate an Azure Cosmos DB SQL container from manual throughput to autoscale.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param container_name: Cosmos DB container name. Required.
:type container_name: str
:return: An instance of LROPoller that returns either ThroughputSettingsGetResults or the
result of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ThroughputSettingsGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.ThroughputSettingsGetResults] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._migrate_sql_container_to_autoscale_initial(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
container_name=container_name,
api_version=api_version,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response):
deserialized = self._deserialize("ThroughputSettingsGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[_models.ThroughputSettingsGetResults].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[_models.ThroughputSettingsGetResults](
self._client, raw_result, get_long_running_output, polling_method # type: ignore
)
def _migrate_sql_container_to_manual_throughput_initial( # pylint: disable=name-too-long
self, resource_group_name: str, account_name: str, database_name: str, container_name: str, **kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
_request = build_migrate_sql_container_to_manual_throughput_request(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
container_name=container_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("azure-AsyncOperation")
)
response_headers["location"] = self._deserialize("str", response.headers.get("location"))
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def begin_migrate_sql_container_to_manual_throughput( # pylint: disable=name-too-long
self, resource_group_name: str, account_name: str, database_name: str, container_name: str, **kwargs: Any
) -> LROPoller[_models.ThroughputSettingsGetResults]:
"""Migrate an Azure Cosmos DB SQL container from autoscale to manual throughput.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param container_name: Cosmos DB container name. Required.
:type container_name: str
:return: An instance of LROPoller that returns either ThroughputSettingsGetResults or the
result of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ThroughputSettingsGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.ThroughputSettingsGetResults] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._migrate_sql_container_to_manual_throughput_initial(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
container_name=container_name,
api_version=api_version,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response):
deserialized = self._deserialize("ThroughputSettingsGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[_models.ThroughputSettingsGetResults].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[_models.ThroughputSettingsGetResults](
self._client, raw_result, get_long_running_output, polling_method # type: ignore
)
[docs]
@distributed_trace
def list_client_encryption_keys(
self, resource_group_name: str, account_name: str, database_name: str, **kwargs: Any
) -> Iterable["_models.ClientEncryptionKeyGetResults"]:
"""Lists the ClientEncryptionKeys under an existing Azure Cosmos DB SQL database.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:return: An iterator like instance of either ClientEncryptionKeyGetResults or the result of
cls(response)
:rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.cosmosdb.models.ClientEncryptionKeyGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.ClientEncryptionKeysListResult] = kwargs.pop("cls", None)
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
_request = build_list_client_encryption_keys_request(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
_request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
_request.url = self._client.format_url(_request.url)
_request.method = "GET"
return _request
def extract_data(pipeline_response):
deserialized = self._deserialize("ClientEncryptionKeysListResult", pipeline_response)
list_of_elem = deserialized.value
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return None, iter(list_of_elem)
def get_next(next_link=None):
_request = prepare_request(next_link)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
return pipeline_response
return ItemPaged(get_next, extract_data)
[docs]
@distributed_trace
def get_client_encryption_key(
self,
resource_group_name: str,
account_name: str,
database_name: str,
client_encryption_key_name: str,
**kwargs: Any
) -> _models.ClientEncryptionKeyGetResults:
"""Gets the ClientEncryptionKey under an existing Azure Cosmos DB SQL database.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param client_encryption_key_name: Cosmos DB ClientEncryptionKey name. Required.
:type client_encryption_key_name: str
:return: ClientEncryptionKeyGetResults or the result of cls(response)
:rtype: ~azure.mgmt.cosmosdb.models.ClientEncryptionKeyGetResults
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.ClientEncryptionKeyGetResults] = kwargs.pop("cls", None)
_request = build_get_client_encryption_key_request(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
client_encryption_key_name=client_encryption_key_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
deserialized = self._deserialize("ClientEncryptionKeyGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
def _create_update_client_encryption_key_initial( # pylint: disable=name-too-long
self,
resource_group_name: str,
account_name: str,
database_name: str,
client_encryption_key_name: str,
create_update_client_encryption_key_parameters: Union[
_models.ClientEncryptionKeyCreateUpdateParameters, IO[bytes]
],
**kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(create_update_client_encryption_key_parameters, (IOBase, bytes)):
_content = create_update_client_encryption_key_parameters
else:
_json = self._serialize.body(
create_update_client_encryption_key_parameters, "ClientEncryptionKeyCreateUpdateParameters"
)
_request = build_create_update_client_encryption_key_request(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
client_encryption_key_name=client_encryption_key_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("azure-AsyncOperation")
)
response_headers["location"] = self._deserialize("str", response.headers.get("location"))
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
@overload
def begin_create_update_client_encryption_key( # pylint: disable=name-too-long
self,
resource_group_name: str,
account_name: str,
database_name: str,
client_encryption_key_name: str,
create_update_client_encryption_key_parameters: _models.ClientEncryptionKeyCreateUpdateParameters,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.ClientEncryptionKeyGetResults]:
"""Create or update a ClientEncryptionKey. This API is meant to be invoked via tools such as the
Azure Powershell (instead of directly).
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param client_encryption_key_name: Cosmos DB ClientEncryptionKey name. Required.
:type client_encryption_key_name: str
:param create_update_client_encryption_key_parameters: The parameters to provide for the client
encryption key. Required.
:type create_update_client_encryption_key_parameters:
~azure.mgmt.cosmosdb.models.ClientEncryptionKeyCreateUpdateParameters
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either ClientEncryptionKeyGetResults or the
result of cls(response)
:rtype:
~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ClientEncryptionKeyGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_create_update_client_encryption_key( # pylint: disable=name-too-long
self,
resource_group_name: str,
account_name: str,
database_name: str,
client_encryption_key_name: str,
create_update_client_encryption_key_parameters: IO[bytes],
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.ClientEncryptionKeyGetResults]:
"""Create or update a ClientEncryptionKey. This API is meant to be invoked via tools such as the
Azure Powershell (instead of directly).
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param client_encryption_key_name: Cosmos DB ClientEncryptionKey name. Required.
:type client_encryption_key_name: str
:param create_update_client_encryption_key_parameters: The parameters to provide for the client
encryption key. Required.
:type create_update_client_encryption_key_parameters: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either ClientEncryptionKeyGetResults or the
result of cls(response)
:rtype:
~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ClientEncryptionKeyGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def begin_create_update_client_encryption_key( # pylint: disable=name-too-long
self,
resource_group_name: str,
account_name: str,
database_name: str,
client_encryption_key_name: str,
create_update_client_encryption_key_parameters: Union[
_models.ClientEncryptionKeyCreateUpdateParameters, IO[bytes]
],
**kwargs: Any
) -> LROPoller[_models.ClientEncryptionKeyGetResults]:
"""Create or update a ClientEncryptionKey. This API is meant to be invoked via tools such as the
Azure Powershell (instead of directly).
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param client_encryption_key_name: Cosmos DB ClientEncryptionKey name. Required.
:type client_encryption_key_name: str
:param create_update_client_encryption_key_parameters: The parameters to provide for the client
encryption key. Is either a ClientEncryptionKeyCreateUpdateParameters type or a IO[bytes] type.
Required.
:type create_update_client_encryption_key_parameters:
~azure.mgmt.cosmosdb.models.ClientEncryptionKeyCreateUpdateParameters or IO[bytes]
:return: An instance of LROPoller that returns either ClientEncryptionKeyGetResults or the
result of cls(response)
:rtype:
~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ClientEncryptionKeyGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.ClientEncryptionKeyGetResults] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._create_update_client_encryption_key_initial(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
client_encryption_key_name=client_encryption_key_name,
create_update_client_encryption_key_parameters=create_update_client_encryption_key_parameters,
api_version=api_version,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response):
deserialized = self._deserialize("ClientEncryptionKeyGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[_models.ClientEncryptionKeyGetResults].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[_models.ClientEncryptionKeyGetResults](
self._client, raw_result, get_long_running_output, polling_method # type: ignore
)
[docs]
@distributed_trace
def list_sql_stored_procedures(
self, resource_group_name: str, account_name: str, database_name: str, container_name: str, **kwargs: Any
) -> Iterable["_models.SqlStoredProcedureGetResults"]:
"""Lists the SQL storedProcedure under an existing Azure Cosmos DB database account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param container_name: Cosmos DB container name. Required.
:type container_name: str
:return: An iterator like instance of either SqlStoredProcedureGetResults or the result of
cls(response)
:rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.cosmosdb.models.SqlStoredProcedureGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.SqlStoredProcedureListResult] = kwargs.pop("cls", None)
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
_request = build_list_sql_stored_procedures_request(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
container_name=container_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
_request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
_request.url = self._client.format_url(_request.url)
_request.method = "GET"
return _request
def extract_data(pipeline_response):
deserialized = self._deserialize("SqlStoredProcedureListResult", pipeline_response)
list_of_elem = deserialized.value
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return None, iter(list_of_elem)
def get_next(next_link=None):
_request = prepare_request(next_link)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
return pipeline_response
return ItemPaged(get_next, extract_data)
[docs]
@distributed_trace
def get_sql_stored_procedure(
self,
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
stored_procedure_name: str,
**kwargs: Any
) -> _models.SqlStoredProcedureGetResults:
"""Gets the SQL storedProcedure under an existing Azure Cosmos DB database account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param container_name: Cosmos DB container name. Required.
:type container_name: str
:param stored_procedure_name: Cosmos DB storedProcedure name. Required.
:type stored_procedure_name: str
:return: SqlStoredProcedureGetResults or the result of cls(response)
:rtype: ~azure.mgmt.cosmosdb.models.SqlStoredProcedureGetResults
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.SqlStoredProcedureGetResults] = kwargs.pop("cls", None)
_request = build_get_sql_stored_procedure_request(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
container_name=container_name,
stored_procedure_name=stored_procedure_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
deserialized = self._deserialize("SqlStoredProcedureGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
def _create_update_sql_stored_procedure_initial( # pylint: disable=name-too-long
self,
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
stored_procedure_name: str,
create_update_sql_stored_procedure_parameters: Union[
_models.SqlStoredProcedureCreateUpdateParameters, IO[bytes]
],
**kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(create_update_sql_stored_procedure_parameters, (IOBase, bytes)):
_content = create_update_sql_stored_procedure_parameters
else:
_json = self._serialize.body(
create_update_sql_stored_procedure_parameters, "SqlStoredProcedureCreateUpdateParameters"
)
_request = build_create_update_sql_stored_procedure_request(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
container_name=container_name,
stored_procedure_name=stored_procedure_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("azure-AsyncOperation")
)
response_headers["location"] = self._deserialize("str", response.headers.get("location"))
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
@overload
def begin_create_update_sql_stored_procedure(
self,
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
stored_procedure_name: str,
create_update_sql_stored_procedure_parameters: _models.SqlStoredProcedureCreateUpdateParameters,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.SqlStoredProcedureGetResults]:
"""Create or update an Azure Cosmos DB SQL storedProcedure.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param container_name: Cosmos DB container name. Required.
:type container_name: str
:param stored_procedure_name: Cosmos DB storedProcedure name. Required.
:type stored_procedure_name: str
:param create_update_sql_stored_procedure_parameters: The parameters to provide for the current
SQL storedProcedure. Required.
:type create_update_sql_stored_procedure_parameters:
~azure.mgmt.cosmosdb.models.SqlStoredProcedureCreateUpdateParameters
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either SqlStoredProcedureGetResults or the
result of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.SqlStoredProcedureGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_create_update_sql_stored_procedure(
self,
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
stored_procedure_name: str,
create_update_sql_stored_procedure_parameters: IO[bytes],
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.SqlStoredProcedureGetResults]:
"""Create or update an Azure Cosmos DB SQL storedProcedure.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param container_name: Cosmos DB container name. Required.
:type container_name: str
:param stored_procedure_name: Cosmos DB storedProcedure name. Required.
:type stored_procedure_name: str
:param create_update_sql_stored_procedure_parameters: The parameters to provide for the current
SQL storedProcedure. Required.
:type create_update_sql_stored_procedure_parameters: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either SqlStoredProcedureGetResults or the
result of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.SqlStoredProcedureGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def begin_create_update_sql_stored_procedure(
self,
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
stored_procedure_name: str,
create_update_sql_stored_procedure_parameters: Union[
_models.SqlStoredProcedureCreateUpdateParameters, IO[bytes]
],
**kwargs: Any
) -> LROPoller[_models.SqlStoredProcedureGetResults]:
"""Create or update an Azure Cosmos DB SQL storedProcedure.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param container_name: Cosmos DB container name. Required.
:type container_name: str
:param stored_procedure_name: Cosmos DB storedProcedure name. Required.
:type stored_procedure_name: str
:param create_update_sql_stored_procedure_parameters: The parameters to provide for the current
SQL storedProcedure. Is either a SqlStoredProcedureCreateUpdateParameters type or a IO[bytes]
type. Required.
:type create_update_sql_stored_procedure_parameters:
~azure.mgmt.cosmosdb.models.SqlStoredProcedureCreateUpdateParameters or IO[bytes]
:return: An instance of LROPoller that returns either SqlStoredProcedureGetResults or the
result of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.SqlStoredProcedureGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.SqlStoredProcedureGetResults] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._create_update_sql_stored_procedure_initial(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
container_name=container_name,
stored_procedure_name=stored_procedure_name,
create_update_sql_stored_procedure_parameters=create_update_sql_stored_procedure_parameters,
api_version=api_version,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response):
deserialized = self._deserialize("SqlStoredProcedureGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[_models.SqlStoredProcedureGetResults].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[_models.SqlStoredProcedureGetResults](
self._client, raw_result, get_long_running_output, polling_method # type: ignore
)
def _delete_sql_stored_procedure_initial(
self,
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
stored_procedure_name: str,
**kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
_request = build_delete_sql_stored_procedure_request(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
container_name=container_name,
stored_procedure_name=stored_procedure_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [202, 204]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("azure-AsyncOperation")
)
response_headers["location"] = self._deserialize("str", response.headers.get("location"))
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def begin_delete_sql_stored_procedure(
self,
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
stored_procedure_name: str,
**kwargs: Any
) -> LROPoller[None]:
"""Deletes an existing Azure Cosmos DB SQL storedProcedure.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param container_name: Cosmos DB container name. Required.
:type container_name: str
:param stored_procedure_name: Cosmos DB storedProcedure name. Required.
:type stored_procedure_name: str
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[None] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._delete_sql_stored_procedure_initial(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
container_name=container_name,
stored_procedure_name=stored_procedure_name,
api_version=api_version,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements
if cls:
return cls(pipeline_response, None, {}) # type: ignore
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[None].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[None](self._client, raw_result, get_long_running_output, polling_method) # type: ignore
[docs]
@distributed_trace
def list_sql_user_defined_functions(
self, resource_group_name: str, account_name: str, database_name: str, container_name: str, **kwargs: Any
) -> Iterable["_models.SqlUserDefinedFunctionGetResults"]:
"""Lists the SQL userDefinedFunction under an existing Azure Cosmos DB database account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param container_name: Cosmos DB container name. Required.
:type container_name: str
:return: An iterator like instance of either SqlUserDefinedFunctionGetResults or the result of
cls(response)
:rtype:
~azure.core.paging.ItemPaged[~azure.mgmt.cosmosdb.models.SqlUserDefinedFunctionGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.SqlUserDefinedFunctionListResult] = kwargs.pop("cls", None)
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
_request = build_list_sql_user_defined_functions_request(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
container_name=container_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
_request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
_request.url = self._client.format_url(_request.url)
_request.method = "GET"
return _request
def extract_data(pipeline_response):
deserialized = self._deserialize("SqlUserDefinedFunctionListResult", pipeline_response)
list_of_elem = deserialized.value
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return None, iter(list_of_elem)
def get_next(next_link=None):
_request = prepare_request(next_link)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
return pipeline_response
return ItemPaged(get_next, extract_data)
[docs]
@distributed_trace
def get_sql_user_defined_function(
self,
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
user_defined_function_name: str,
**kwargs: Any
) -> _models.SqlUserDefinedFunctionGetResults:
"""Gets the SQL userDefinedFunction under an existing Azure Cosmos DB database account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param container_name: Cosmos DB container name. Required.
:type container_name: str
:param user_defined_function_name: Cosmos DB userDefinedFunction name. Required.
:type user_defined_function_name: str
:return: SqlUserDefinedFunctionGetResults or the result of cls(response)
:rtype: ~azure.mgmt.cosmosdb.models.SqlUserDefinedFunctionGetResults
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.SqlUserDefinedFunctionGetResults] = kwargs.pop("cls", None)
_request = build_get_sql_user_defined_function_request(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
container_name=container_name,
user_defined_function_name=user_defined_function_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
deserialized = self._deserialize("SqlUserDefinedFunctionGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
def _create_update_sql_user_defined_function_initial( # pylint: disable=name-too-long
self,
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
user_defined_function_name: str,
create_update_sql_user_defined_function_parameters: Union[
_models.SqlUserDefinedFunctionCreateUpdateParameters, IO[bytes]
],
**kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(create_update_sql_user_defined_function_parameters, (IOBase, bytes)):
_content = create_update_sql_user_defined_function_parameters
else:
_json = self._serialize.body(
create_update_sql_user_defined_function_parameters, "SqlUserDefinedFunctionCreateUpdateParameters"
)
_request = build_create_update_sql_user_defined_function_request(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
container_name=container_name,
user_defined_function_name=user_defined_function_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("azure-AsyncOperation")
)
response_headers["location"] = self._deserialize("str", response.headers.get("location"))
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
@overload
def begin_create_update_sql_user_defined_function( # pylint: disable=name-too-long
self,
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
user_defined_function_name: str,
create_update_sql_user_defined_function_parameters: _models.SqlUserDefinedFunctionCreateUpdateParameters,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.SqlUserDefinedFunctionGetResults]:
"""Create or update an Azure Cosmos DB SQL userDefinedFunction.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param container_name: Cosmos DB container name. Required.
:type container_name: str
:param user_defined_function_name: Cosmos DB userDefinedFunction name. Required.
:type user_defined_function_name: str
:param create_update_sql_user_defined_function_parameters: The parameters to provide for the
current SQL userDefinedFunction. Required.
:type create_update_sql_user_defined_function_parameters:
~azure.mgmt.cosmosdb.models.SqlUserDefinedFunctionCreateUpdateParameters
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either SqlUserDefinedFunctionGetResults or the
result of cls(response)
:rtype:
~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.SqlUserDefinedFunctionGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_create_update_sql_user_defined_function( # pylint: disable=name-too-long
self,
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
user_defined_function_name: str,
create_update_sql_user_defined_function_parameters: IO[bytes],
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.SqlUserDefinedFunctionGetResults]:
"""Create or update an Azure Cosmos DB SQL userDefinedFunction.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param container_name: Cosmos DB container name. Required.
:type container_name: str
:param user_defined_function_name: Cosmos DB userDefinedFunction name. Required.
:type user_defined_function_name: str
:param create_update_sql_user_defined_function_parameters: The parameters to provide for the
current SQL userDefinedFunction. Required.
:type create_update_sql_user_defined_function_parameters: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either SqlUserDefinedFunctionGetResults or the
result of cls(response)
:rtype:
~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.SqlUserDefinedFunctionGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def begin_create_update_sql_user_defined_function( # pylint: disable=name-too-long
self,
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
user_defined_function_name: str,
create_update_sql_user_defined_function_parameters: Union[
_models.SqlUserDefinedFunctionCreateUpdateParameters, IO[bytes]
],
**kwargs: Any
) -> LROPoller[_models.SqlUserDefinedFunctionGetResults]:
"""Create or update an Azure Cosmos DB SQL userDefinedFunction.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param container_name: Cosmos DB container name. Required.
:type container_name: str
:param user_defined_function_name: Cosmos DB userDefinedFunction name. Required.
:type user_defined_function_name: str
:param create_update_sql_user_defined_function_parameters: The parameters to provide for the
current SQL userDefinedFunction. Is either a SqlUserDefinedFunctionCreateUpdateParameters type
or a IO[bytes] type. Required.
:type create_update_sql_user_defined_function_parameters:
~azure.mgmt.cosmosdb.models.SqlUserDefinedFunctionCreateUpdateParameters or IO[bytes]
:return: An instance of LROPoller that returns either SqlUserDefinedFunctionGetResults or the
result of cls(response)
:rtype:
~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.SqlUserDefinedFunctionGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.SqlUserDefinedFunctionGetResults] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._create_update_sql_user_defined_function_initial(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
container_name=container_name,
user_defined_function_name=user_defined_function_name,
create_update_sql_user_defined_function_parameters=create_update_sql_user_defined_function_parameters,
api_version=api_version,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response):
deserialized = self._deserialize("SqlUserDefinedFunctionGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[_models.SqlUserDefinedFunctionGetResults].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[_models.SqlUserDefinedFunctionGetResults](
self._client, raw_result, get_long_running_output, polling_method # type: ignore
)
def _delete_sql_user_defined_function_initial( # pylint: disable=name-too-long
self,
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
user_defined_function_name: str,
**kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
_request = build_delete_sql_user_defined_function_request(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
container_name=container_name,
user_defined_function_name=user_defined_function_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [202, 204]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("azure-AsyncOperation")
)
response_headers["location"] = self._deserialize("str", response.headers.get("location"))
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def begin_delete_sql_user_defined_function(
self,
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
user_defined_function_name: str,
**kwargs: Any
) -> LROPoller[None]:
"""Deletes an existing Azure Cosmos DB SQL userDefinedFunction.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param container_name: Cosmos DB container name. Required.
:type container_name: str
:param user_defined_function_name: Cosmos DB userDefinedFunction name. Required.
:type user_defined_function_name: str
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[None] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._delete_sql_user_defined_function_initial(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
container_name=container_name,
user_defined_function_name=user_defined_function_name,
api_version=api_version,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements
if cls:
return cls(pipeline_response, None, {}) # type: ignore
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[None].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[None](self._client, raw_result, get_long_running_output, polling_method) # type: ignore
[docs]
@distributed_trace
def list_sql_triggers(
self, resource_group_name: str, account_name: str, database_name: str, container_name: str, **kwargs: Any
) -> Iterable["_models.SqlTriggerGetResults"]:
"""Lists the SQL trigger under an existing Azure Cosmos DB database account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param container_name: Cosmos DB container name. Required.
:type container_name: str
:return: An iterator like instance of either SqlTriggerGetResults or the result of
cls(response)
:rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.cosmosdb.models.SqlTriggerGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.SqlTriggerListResult] = kwargs.pop("cls", None)
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
_request = build_list_sql_triggers_request(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
container_name=container_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
_request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
_request.url = self._client.format_url(_request.url)
_request.method = "GET"
return _request
def extract_data(pipeline_response):
deserialized = self._deserialize("SqlTriggerListResult", pipeline_response)
list_of_elem = deserialized.value
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return None, iter(list_of_elem)
def get_next(next_link=None):
_request = prepare_request(next_link)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
return pipeline_response
return ItemPaged(get_next, extract_data)
[docs]
@distributed_trace
def get_sql_trigger(
self,
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
trigger_name: str,
**kwargs: Any
) -> _models.SqlTriggerGetResults:
"""Gets the SQL trigger under an existing Azure Cosmos DB database account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param container_name: Cosmos DB container name. Required.
:type container_name: str
:param trigger_name: Cosmos DB trigger name. Required.
:type trigger_name: str
:return: SqlTriggerGetResults or the result of cls(response)
:rtype: ~azure.mgmt.cosmosdb.models.SqlTriggerGetResults
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.SqlTriggerGetResults] = kwargs.pop("cls", None)
_request = build_get_sql_trigger_request(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
container_name=container_name,
trigger_name=trigger_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
deserialized = self._deserialize("SqlTriggerGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
def _create_update_sql_trigger_initial(
self,
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
trigger_name: str,
create_update_sql_trigger_parameters: Union[_models.SqlTriggerCreateUpdateParameters, IO[bytes]],
**kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(create_update_sql_trigger_parameters, (IOBase, bytes)):
_content = create_update_sql_trigger_parameters
else:
_json = self._serialize.body(create_update_sql_trigger_parameters, "SqlTriggerCreateUpdateParameters")
_request = build_create_update_sql_trigger_request(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
container_name=container_name,
trigger_name=trigger_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("azure-AsyncOperation")
)
response_headers["location"] = self._deserialize("str", response.headers.get("location"))
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
@overload
def begin_create_update_sql_trigger(
self,
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
trigger_name: str,
create_update_sql_trigger_parameters: _models.SqlTriggerCreateUpdateParameters,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.SqlTriggerGetResults]:
"""Create or update an Azure Cosmos DB SQL trigger.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param container_name: Cosmos DB container name. Required.
:type container_name: str
:param trigger_name: Cosmos DB trigger name. Required.
:type trigger_name: str
:param create_update_sql_trigger_parameters: The parameters to provide for the current SQL
trigger. Required.
:type create_update_sql_trigger_parameters:
~azure.mgmt.cosmosdb.models.SqlTriggerCreateUpdateParameters
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either SqlTriggerGetResults or the result of
cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.SqlTriggerGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_create_update_sql_trigger(
self,
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
trigger_name: str,
create_update_sql_trigger_parameters: IO[bytes],
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.SqlTriggerGetResults]:
"""Create or update an Azure Cosmos DB SQL trigger.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param container_name: Cosmos DB container name. Required.
:type container_name: str
:param trigger_name: Cosmos DB trigger name. Required.
:type trigger_name: str
:param create_update_sql_trigger_parameters: The parameters to provide for the current SQL
trigger. Required.
:type create_update_sql_trigger_parameters: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either SqlTriggerGetResults or the result of
cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.SqlTriggerGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def begin_create_update_sql_trigger(
self,
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
trigger_name: str,
create_update_sql_trigger_parameters: Union[_models.SqlTriggerCreateUpdateParameters, IO[bytes]],
**kwargs: Any
) -> LROPoller[_models.SqlTriggerGetResults]:
"""Create or update an Azure Cosmos DB SQL trigger.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param container_name: Cosmos DB container name. Required.
:type container_name: str
:param trigger_name: Cosmos DB trigger name. Required.
:type trigger_name: str
:param create_update_sql_trigger_parameters: The parameters to provide for the current SQL
trigger. Is either a SqlTriggerCreateUpdateParameters type or a IO[bytes] type. Required.
:type create_update_sql_trigger_parameters:
~azure.mgmt.cosmosdb.models.SqlTriggerCreateUpdateParameters or IO[bytes]
:return: An instance of LROPoller that returns either SqlTriggerGetResults or the result of
cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.SqlTriggerGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.SqlTriggerGetResults] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._create_update_sql_trigger_initial(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
container_name=container_name,
trigger_name=trigger_name,
create_update_sql_trigger_parameters=create_update_sql_trigger_parameters,
api_version=api_version,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response):
deserialized = self._deserialize("SqlTriggerGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[_models.SqlTriggerGetResults].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[_models.SqlTriggerGetResults](
self._client, raw_result, get_long_running_output, polling_method # type: ignore
)
def _delete_sql_trigger_initial(
self,
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
trigger_name: str,
**kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
_request = build_delete_sql_trigger_request(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
container_name=container_name,
trigger_name=trigger_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [202, 204]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("azure-AsyncOperation")
)
response_headers["location"] = self._deserialize("str", response.headers.get("location"))
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def begin_delete_sql_trigger(
self,
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
trigger_name: str,
**kwargs: Any
) -> LROPoller[None]:
"""Deletes an existing Azure Cosmos DB SQL trigger.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param container_name: Cosmos DB container name. Required.
:type container_name: str
:param trigger_name: Cosmos DB trigger name. Required.
:type trigger_name: str
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[None] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._delete_sql_trigger_initial(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
container_name=container_name,
trigger_name=trigger_name,
api_version=api_version,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements
if cls:
return cls(pipeline_response, None, {}) # type: ignore
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[None].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[None](self._client, raw_result, get_long_running_output, polling_method) # type: ignore
[docs]
@distributed_trace
def get_sql_role_definition(
self, role_definition_id: str, resource_group_name: str, account_name: str, **kwargs: Any
) -> _models.SqlRoleDefinitionGetResults:
"""Retrieves the properties of an existing Azure Cosmos DB SQL Role Definition with the given Id.
:param role_definition_id: The GUID for the Role Definition. Required.
:type role_definition_id: str
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:return: SqlRoleDefinitionGetResults or the result of cls(response)
:rtype: ~azure.mgmt.cosmosdb.models.SqlRoleDefinitionGetResults
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.SqlRoleDefinitionGetResults] = kwargs.pop("cls", None)
_request = build_get_sql_role_definition_request(
role_definition_id=role_definition_id,
resource_group_name=resource_group_name,
account_name=account_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
deserialized = self._deserialize("SqlRoleDefinitionGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
def _create_update_sql_role_definition_initial( # pylint: disable=name-too-long
self,
role_definition_id: str,
resource_group_name: str,
account_name: str,
create_update_sql_role_definition_parameters: Union[_models.SqlRoleDefinitionCreateUpdateParameters, IO[bytes]],
**kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(create_update_sql_role_definition_parameters, (IOBase, bytes)):
_content = create_update_sql_role_definition_parameters
else:
_json = self._serialize.body(
create_update_sql_role_definition_parameters, "SqlRoleDefinitionCreateUpdateParameters"
)
_request = build_create_update_sql_role_definition_request(
role_definition_id=role_definition_id,
resource_group_name=resource_group_name,
account_name=account_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
@overload
def begin_create_update_sql_role_definition(
self,
role_definition_id: str,
resource_group_name: str,
account_name: str,
create_update_sql_role_definition_parameters: _models.SqlRoleDefinitionCreateUpdateParameters,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.SqlRoleDefinitionGetResults]:
"""Creates or updates an Azure Cosmos DB SQL Role Definition.
:param role_definition_id: The GUID for the Role Definition. Required.
:type role_definition_id: str
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param create_update_sql_role_definition_parameters: The properties required to create or
update a Role Definition. Required.
:type create_update_sql_role_definition_parameters:
~azure.mgmt.cosmosdb.models.SqlRoleDefinitionCreateUpdateParameters
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either SqlRoleDefinitionGetResults or the result
of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.SqlRoleDefinitionGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_create_update_sql_role_definition(
self,
role_definition_id: str,
resource_group_name: str,
account_name: str,
create_update_sql_role_definition_parameters: IO[bytes],
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.SqlRoleDefinitionGetResults]:
"""Creates or updates an Azure Cosmos DB SQL Role Definition.
:param role_definition_id: The GUID for the Role Definition. Required.
:type role_definition_id: str
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param create_update_sql_role_definition_parameters: The properties required to create or
update a Role Definition. Required.
:type create_update_sql_role_definition_parameters: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either SqlRoleDefinitionGetResults or the result
of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.SqlRoleDefinitionGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def begin_create_update_sql_role_definition(
self,
role_definition_id: str,
resource_group_name: str,
account_name: str,
create_update_sql_role_definition_parameters: Union[_models.SqlRoleDefinitionCreateUpdateParameters, IO[bytes]],
**kwargs: Any
) -> LROPoller[_models.SqlRoleDefinitionGetResults]:
"""Creates or updates an Azure Cosmos DB SQL Role Definition.
:param role_definition_id: The GUID for the Role Definition. Required.
:type role_definition_id: str
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param create_update_sql_role_definition_parameters: The properties required to create or
update a Role Definition. Is either a SqlRoleDefinitionCreateUpdateParameters type or a
IO[bytes] type. Required.
:type create_update_sql_role_definition_parameters:
~azure.mgmt.cosmosdb.models.SqlRoleDefinitionCreateUpdateParameters or IO[bytes]
:return: An instance of LROPoller that returns either SqlRoleDefinitionGetResults or the result
of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.SqlRoleDefinitionGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.SqlRoleDefinitionGetResults] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._create_update_sql_role_definition_initial(
role_definition_id=role_definition_id,
resource_group_name=resource_group_name,
account_name=account_name,
create_update_sql_role_definition_parameters=create_update_sql_role_definition_parameters,
api_version=api_version,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response):
deserialized = self._deserialize("SqlRoleDefinitionGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[_models.SqlRoleDefinitionGetResults].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[_models.SqlRoleDefinitionGetResults](
self._client, raw_result, get_long_running_output, polling_method # type: ignore
)
def _delete_sql_role_definition_initial(
self, role_definition_id: str, resource_group_name: str, account_name: str, **kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
_request = build_delete_sql_role_definition_request(
role_definition_id=role_definition_id,
resource_group_name=resource_group_name,
account_name=account_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202, 204]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def begin_delete_sql_role_definition(
self, role_definition_id: str, resource_group_name: str, account_name: str, **kwargs: Any
) -> LROPoller[None]:
"""Deletes an existing Azure Cosmos DB SQL Role Definition.
:param role_definition_id: The GUID for the Role Definition. Required.
:type role_definition_id: str
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[None] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._delete_sql_role_definition_initial(
role_definition_id=role_definition_id,
resource_group_name=resource_group_name,
account_name=account_name,
api_version=api_version,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements
if cls:
return cls(pipeline_response, None, {}) # type: ignore
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[None].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[None](self._client, raw_result, get_long_running_output, polling_method) # type: ignore
[docs]
@distributed_trace
def list_sql_role_definitions(
self, resource_group_name: str, account_name: str, **kwargs: Any
) -> Iterable["_models.SqlRoleDefinitionGetResults"]:
"""Retrieves the list of all Azure Cosmos DB SQL Role Definitions.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:return: An iterator like instance of either SqlRoleDefinitionGetResults or the result of
cls(response)
:rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.cosmosdb.models.SqlRoleDefinitionGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.SqlRoleDefinitionListResult] = kwargs.pop("cls", None)
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
_request = build_list_sql_role_definitions_request(
resource_group_name=resource_group_name,
account_name=account_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
_request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
_request.url = self._client.format_url(_request.url)
_request.method = "GET"
return _request
def extract_data(pipeline_response):
deserialized = self._deserialize("SqlRoleDefinitionListResult", pipeline_response)
list_of_elem = deserialized.value
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return None, iter(list_of_elem)
def get_next(next_link=None):
_request = prepare_request(next_link)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
return pipeline_response
return ItemPaged(get_next, extract_data)
[docs]
@distributed_trace
def get_sql_role_assignment(
self, role_assignment_id: str, resource_group_name: str, account_name: str, **kwargs: Any
) -> _models.SqlRoleAssignmentGetResults:
"""Retrieves the properties of an existing Azure Cosmos DB SQL Role Assignment with the given Id.
:param role_assignment_id: The GUID for the Role Assignment. Required.
:type role_assignment_id: str
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:return: SqlRoleAssignmentGetResults or the result of cls(response)
:rtype: ~azure.mgmt.cosmosdb.models.SqlRoleAssignmentGetResults
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.SqlRoleAssignmentGetResults] = kwargs.pop("cls", None)
_request = build_get_sql_role_assignment_request(
role_assignment_id=role_assignment_id,
resource_group_name=resource_group_name,
account_name=account_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
deserialized = self._deserialize("SqlRoleAssignmentGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
def _create_update_sql_role_assignment_initial( # pylint: disable=name-too-long
self,
role_assignment_id: str,
resource_group_name: str,
account_name: str,
create_update_sql_role_assignment_parameters: Union[_models.SqlRoleAssignmentCreateUpdateParameters, IO[bytes]],
**kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(create_update_sql_role_assignment_parameters, (IOBase, bytes)):
_content = create_update_sql_role_assignment_parameters
else:
_json = self._serialize.body(
create_update_sql_role_assignment_parameters, "SqlRoleAssignmentCreateUpdateParameters"
)
_request = build_create_update_sql_role_assignment_request(
role_assignment_id=role_assignment_id,
resource_group_name=resource_group_name,
account_name=account_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
@overload
def begin_create_update_sql_role_assignment(
self,
role_assignment_id: str,
resource_group_name: str,
account_name: str,
create_update_sql_role_assignment_parameters: _models.SqlRoleAssignmentCreateUpdateParameters,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.SqlRoleAssignmentGetResults]:
"""Creates or updates an Azure Cosmos DB SQL Role Assignment.
:param role_assignment_id: The GUID for the Role Assignment. Required.
:type role_assignment_id: str
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param create_update_sql_role_assignment_parameters: The properties required to create or
update a Role Assignment. Required.
:type create_update_sql_role_assignment_parameters:
~azure.mgmt.cosmosdb.models.SqlRoleAssignmentCreateUpdateParameters
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either SqlRoleAssignmentGetResults or the result
of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.SqlRoleAssignmentGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_create_update_sql_role_assignment(
self,
role_assignment_id: str,
resource_group_name: str,
account_name: str,
create_update_sql_role_assignment_parameters: IO[bytes],
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.SqlRoleAssignmentGetResults]:
"""Creates or updates an Azure Cosmos DB SQL Role Assignment.
:param role_assignment_id: The GUID for the Role Assignment. Required.
:type role_assignment_id: str
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param create_update_sql_role_assignment_parameters: The properties required to create or
update a Role Assignment. Required.
:type create_update_sql_role_assignment_parameters: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either SqlRoleAssignmentGetResults or the result
of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.SqlRoleAssignmentGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def begin_create_update_sql_role_assignment(
self,
role_assignment_id: str,
resource_group_name: str,
account_name: str,
create_update_sql_role_assignment_parameters: Union[_models.SqlRoleAssignmentCreateUpdateParameters, IO[bytes]],
**kwargs: Any
) -> LROPoller[_models.SqlRoleAssignmentGetResults]:
"""Creates or updates an Azure Cosmos DB SQL Role Assignment.
:param role_assignment_id: The GUID for the Role Assignment. Required.
:type role_assignment_id: str
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param create_update_sql_role_assignment_parameters: The properties required to create or
update a Role Assignment. Is either a SqlRoleAssignmentCreateUpdateParameters type or a
IO[bytes] type. Required.
:type create_update_sql_role_assignment_parameters:
~azure.mgmt.cosmosdb.models.SqlRoleAssignmentCreateUpdateParameters or IO[bytes]
:return: An instance of LROPoller that returns either SqlRoleAssignmentGetResults or the result
of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.SqlRoleAssignmentGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.SqlRoleAssignmentGetResults] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._create_update_sql_role_assignment_initial(
role_assignment_id=role_assignment_id,
resource_group_name=resource_group_name,
account_name=account_name,
create_update_sql_role_assignment_parameters=create_update_sql_role_assignment_parameters,
api_version=api_version,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response):
deserialized = self._deserialize("SqlRoleAssignmentGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[_models.SqlRoleAssignmentGetResults].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[_models.SqlRoleAssignmentGetResults](
self._client, raw_result, get_long_running_output, polling_method # type: ignore
)
def _delete_sql_role_assignment_initial(
self, role_assignment_id: str, resource_group_name: str, account_name: str, **kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
_request = build_delete_sql_role_assignment_request(
role_assignment_id=role_assignment_id,
resource_group_name=resource_group_name,
account_name=account_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202, 204]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def begin_delete_sql_role_assignment(
self, role_assignment_id: str, resource_group_name: str, account_name: str, **kwargs: Any
) -> LROPoller[None]:
"""Deletes an existing Azure Cosmos DB SQL Role Assignment.
:param role_assignment_id: The GUID for the Role Assignment. Required.
:type role_assignment_id: str
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[None] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._delete_sql_role_assignment_initial(
role_assignment_id=role_assignment_id,
resource_group_name=resource_group_name,
account_name=account_name,
api_version=api_version,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements
if cls:
return cls(pipeline_response, None, {}) # type: ignore
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[None].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[None](self._client, raw_result, get_long_running_output, polling_method) # type: ignore
[docs]
@distributed_trace
def list_sql_role_assignments(
self, resource_group_name: str, account_name: str, **kwargs: Any
) -> Iterable["_models.SqlRoleAssignmentGetResults"]:
"""Retrieves the list of all Azure Cosmos DB SQL Role Assignments.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:return: An iterator like instance of either SqlRoleAssignmentGetResults or the result of
cls(response)
:rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.cosmosdb.models.SqlRoleAssignmentGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.SqlRoleAssignmentListResult] = kwargs.pop("cls", None)
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
_request = build_list_sql_role_assignments_request(
resource_group_name=resource_group_name,
account_name=account_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
_request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
_request.url = self._client.format_url(_request.url)
_request.method = "GET"
return _request
def extract_data(pipeline_response):
deserialized = self._deserialize("SqlRoleAssignmentListResult", pipeline_response)
list_of_elem = deserialized.value
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return None, iter(list_of_elem)
def get_next(next_link=None):
_request = prepare_request(next_link)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
return pipeline_response
return ItemPaged(get_next, extract_data)
def _retrieve_continuous_backup_information_initial( # pylint: disable=name-too-long
self,
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
location: Union[_models.ContinuousBackupRestoreLocation, IO[bytes]],
**kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(location, (IOBase, bytes)):
_content = location
else:
_json = self._serialize.body(location, "ContinuousBackupRestoreLocation")
_request = build_retrieve_continuous_backup_information_request(
resource_group_name=resource_group_name,
account_name=account_name,
database_name=database_name,
container_name=container_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
@overload
def begin_retrieve_continuous_backup_information( # pylint: disable=name-too-long
self,
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
location: _models.ContinuousBackupRestoreLocation,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.BackupInformation]:
"""Retrieves continuous backup information for a container resource.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param container_name: Cosmos DB container name. Required.
:type container_name: str
:param location: The name of the continuous backup restore location. Required.
:type location: ~azure.mgmt.cosmosdb.models.ContinuousBackupRestoreLocation
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either BackupInformation or the result of
cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.BackupInformation]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_retrieve_continuous_backup_information( # pylint: disable=name-too-long
self,
resource_group_name: str,
account_name: str,
database_name: str,
container_name: str,
location: IO[bytes],
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.BackupInformation]:
"""Retrieves continuous backup information for a container resource.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param database_name: Cosmos DB database name. Required.
:type database_name: str
:param container_name: Cosmos DB container name. Required.
:type container_name: str
:param location: The name of the continuous backup restore location. Required.
:type location: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either BackupInformation or the result of
cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.BackupInformation]
:raises ~azure.core.exceptions.HttpResponseError:
"""