# pylint: disable=line-too-long,useless-suppression,too-many-lines
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
from collections.abc import MutableMapping
from io import IOBase
from typing import Any, Callable, Dict, IO, Iterable, Iterator, Optional, TypeVar, Union, cast, overload
import urllib.parse
from azure.core import PipelineClient
from azure.core.exceptions import (
ClientAuthenticationError,
HttpResponseError,
ResourceExistsError,
ResourceNotFoundError,
ResourceNotModifiedError,
StreamClosedError,
StreamConsumedError,
map_error,
)
from azure.core.paging import ItemPaged
from azure.core.pipeline import PipelineResponse
from azure.core.polling import LROPoller, NoPolling, PollingMethod
from azure.core.rest import HttpRequest, HttpResponse
from azure.core.tracing.decorator import distributed_trace
from azure.core.utils import case_insensitive_dict
from azure.mgmt.core.exceptions import ARMErrorFormat
from azure.mgmt.core.polling.arm_polling import ARMPolling
from .. import models as _models
from .._configuration import CosmosDBManagementClientConfiguration
from .._utils.serialization import Deserializer, Serializer
T = TypeVar("T")
ClsType = Optional[Callable[[PipelineResponse[HttpRequest, HttpResponse], T, Dict[str, Any]], Any]]
_SERIALIZER = Serializer()
_SERIALIZER.client_side_validation = False
def build_list_cassandra_keyspaces_request(
resource_group_name: str, account_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/cassandraKeyspaces",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_get_cassandra_keyspace_request(
resource_group_name: str, account_name: str, keyspace_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/cassandraKeyspaces/{keyspaceName}",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"keyspaceName": _SERIALIZER.url("keyspace_name", keyspace_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_create_update_cassandra_keyspace_request( # pylint: disable=name-too-long
resource_group_name: str, account_name: str, keyspace_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/cassandraKeyspaces/{keyspaceName}",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"keyspaceName": _SERIALIZER.url("keyspace_name", keyspace_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="PUT", url=_url, params=_params, headers=_headers, **kwargs)
def build_delete_cassandra_keyspace_request(
resource_group_name: str, account_name: str, keyspace_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/cassandraKeyspaces/{keyspaceName}",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"keyspaceName": _SERIALIZER.url("keyspace_name", keyspace_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
return HttpRequest(method="DELETE", url=_url, params=_params, **kwargs)
def build_get_cassandra_keyspace_throughput_request( # pylint: disable=name-too-long
resource_group_name: str, account_name: str, keyspace_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/cassandraKeyspaces/{keyspaceName}/throughputSettings/default",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"keyspaceName": _SERIALIZER.url("keyspace_name", keyspace_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_update_cassandra_keyspace_throughput_request( # pylint: disable=name-too-long
resource_group_name: str, account_name: str, keyspace_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/cassandraKeyspaces/{keyspaceName}/throughputSettings/default",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"keyspaceName": _SERIALIZER.url("keyspace_name", keyspace_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="PUT", url=_url, params=_params, headers=_headers, **kwargs)
def build_migrate_cassandra_keyspace_to_autoscale_request( # pylint: disable=name-too-long
resource_group_name: str, account_name: str, keyspace_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/cassandraKeyspaces/{keyspaceName}/throughputSettings/default/migrateToAutoscale",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"keyspaceName": _SERIALIZER.url("keyspace_name", keyspace_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_migrate_cassandra_keyspace_to_manual_throughput_request( # pylint: disable=name-too-long
resource_group_name: str, account_name: str, keyspace_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/cassandraKeyspaces/{keyspaceName}/throughputSettings/default/migrateToManualThroughput",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"keyspaceName": _SERIALIZER.url("keyspace_name", keyspace_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_list_cassandra_tables_request(
resource_group_name: str, account_name: str, keyspace_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/cassandraKeyspaces/{keyspaceName}/tables",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"keyspaceName": _SERIALIZER.url("keyspace_name", keyspace_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_get_cassandra_table_request(
resource_group_name: str,
account_name: str,
keyspace_name: str,
table_name: str,
subscription_id: str,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/cassandraKeyspaces/{keyspaceName}/tables/{tableName}",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"keyspaceName": _SERIALIZER.url("keyspace_name", keyspace_name, "str"),
"tableName": _SERIALIZER.url("table_name", table_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_create_update_cassandra_table_request( # pylint: disable=name-too-long
resource_group_name: str,
account_name: str,
keyspace_name: str,
table_name: str,
subscription_id: str,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/cassandraKeyspaces/{keyspaceName}/tables/{tableName}",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"keyspaceName": _SERIALIZER.url("keyspace_name", keyspace_name, "str"),
"tableName": _SERIALIZER.url("table_name", table_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="PUT", url=_url, params=_params, headers=_headers, **kwargs)
def build_delete_cassandra_table_request(
resource_group_name: str,
account_name: str,
keyspace_name: str,
table_name: str,
subscription_id: str,
**kwargs: Any
) -> HttpRequest:
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/cassandraKeyspaces/{keyspaceName}/tables/{tableName}",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"keyspaceName": _SERIALIZER.url("keyspace_name", keyspace_name, "str"),
"tableName": _SERIALIZER.url("table_name", table_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
return HttpRequest(method="DELETE", url=_url, params=_params, **kwargs)
def build_get_cassandra_table_throughput_request( # pylint: disable=name-too-long
resource_group_name: str,
account_name: str,
keyspace_name: str,
table_name: str,
subscription_id: str,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/cassandraKeyspaces/{keyspaceName}/tables/{tableName}/throughputSettings/default",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"keyspaceName": _SERIALIZER.url("keyspace_name", keyspace_name, "str"),
"tableName": _SERIALIZER.url("table_name", table_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_update_cassandra_table_throughput_request( # pylint: disable=name-too-long
resource_group_name: str,
account_name: str,
keyspace_name: str,
table_name: str,
subscription_id: str,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/cassandraKeyspaces/{keyspaceName}/tables/{tableName}/throughputSettings/default",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"keyspaceName": _SERIALIZER.url("keyspace_name", keyspace_name, "str"),
"tableName": _SERIALIZER.url("table_name", table_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="PUT", url=_url, params=_params, headers=_headers, **kwargs)
def build_migrate_cassandra_table_to_autoscale_request( # pylint: disable=name-too-long
resource_group_name: str,
account_name: str,
keyspace_name: str,
table_name: str,
subscription_id: str,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/cassandraKeyspaces/{keyspaceName}/tables/{tableName}/throughputSettings/default/migrateToAutoscale",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"keyspaceName": _SERIALIZER.url("keyspace_name", keyspace_name, "str"),
"tableName": _SERIALIZER.url("table_name", table_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_migrate_cassandra_table_to_manual_throughput_request( # pylint: disable=name-too-long
resource_group_name: str,
account_name: str,
keyspace_name: str,
table_name: str,
subscription_id: str,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/cassandraKeyspaces/{keyspaceName}/tables/{tableName}/throughputSettings/default/migrateToManualThroughput",
)
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"accountName": _SERIALIZER.url(
"account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
),
"keyspaceName": _SERIALIZER.url("keyspace_name", keyspace_name, "str"),
"tableName": _SERIALIZER.url("table_name", table_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
[docs]
class CassandraResourcesOperations: # pylint: disable=too-many-public-methods
"""
.. warning::
**DO NOT** instantiate this class directly.
Instead, you should access the following operations through
:class:`~azure.mgmt.cosmosdb.CosmosDBManagementClient`'s
:attr:`cassandra_resources` attribute.
"""
models = _models
def __init__(self, *args, **kwargs):
input_args = list(args)
self._client: PipelineClient = input_args.pop(0) if input_args else kwargs.pop("client")
self._config: CosmosDBManagementClientConfiguration = input_args.pop(0) if input_args else kwargs.pop("config")
self._serialize: Serializer = input_args.pop(0) if input_args else kwargs.pop("serializer")
self._deserialize: Deserializer = input_args.pop(0) if input_args else kwargs.pop("deserializer")
[docs]
@distributed_trace
def list_cassandra_keyspaces(
self, resource_group_name: str, account_name: str, **kwargs: Any
) -> Iterable["_models.CassandraKeyspaceGetResults"]:
"""Lists the Cassandra keyspaces under an existing Azure Cosmos DB database account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:return: An iterator like instance of either CassandraKeyspaceGetResults or the result of
cls(response)
:rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.cosmosdb.models.CassandraKeyspaceGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.CassandraKeyspaceListResult] = kwargs.pop("cls", None)
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
_request = build_list_cassandra_keyspaces_request(
resource_group_name=resource_group_name,
account_name=account_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
_request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
_request.url = self._client.format_url(_request.url)
_request.method = "GET"
return _request
def extract_data(pipeline_response):
deserialized = self._deserialize("CassandraKeyspaceListResult", pipeline_response)
list_of_elem = deserialized.value
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return None, iter(list_of_elem)
def get_next(next_link=None):
_request = prepare_request(next_link)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
return pipeline_response
return ItemPaged(get_next, extract_data)
[docs]
@distributed_trace
def get_cassandra_keyspace(
self, resource_group_name: str, account_name: str, keyspace_name: str, **kwargs: Any
) -> _models.CassandraKeyspaceGetResults:
"""Gets the Cassandra keyspaces under an existing Azure Cosmos DB database account with the
provided name.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param keyspace_name: Cosmos DB keyspace name. Required.
:type keyspace_name: str
:return: CassandraKeyspaceGetResults or the result of cls(response)
:rtype: ~azure.mgmt.cosmosdb.models.CassandraKeyspaceGetResults
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.CassandraKeyspaceGetResults] = kwargs.pop("cls", None)
_request = build_get_cassandra_keyspace_request(
resource_group_name=resource_group_name,
account_name=account_name,
keyspace_name=keyspace_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
deserialized = self._deserialize("CassandraKeyspaceGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
def _create_update_cassandra_keyspace_initial( # pylint: disable=name-too-long
self,
resource_group_name: str,
account_name: str,
keyspace_name: str,
create_update_cassandra_keyspace_parameters: Union[_models.CassandraKeyspaceCreateUpdateParameters, IO[bytes]],
**kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(create_update_cassandra_keyspace_parameters, (IOBase, bytes)):
_content = create_update_cassandra_keyspace_parameters
else:
_json = self._serialize.body(
create_update_cassandra_keyspace_parameters, "CassandraKeyspaceCreateUpdateParameters"
)
_request = build_create_update_cassandra_keyspace_request(
resource_group_name=resource_group_name,
account_name=account_name,
keyspace_name=keyspace_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("azure-AsyncOperation")
)
response_headers["location"] = self._deserialize("str", response.headers.get("location"))
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
@overload
def begin_create_update_cassandra_keyspace(
self,
resource_group_name: str,
account_name: str,
keyspace_name: str,
create_update_cassandra_keyspace_parameters: _models.CassandraKeyspaceCreateUpdateParameters,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.CassandraKeyspaceGetResults]:
"""Create or update an Azure Cosmos DB Cassandra keyspace.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param keyspace_name: Cosmos DB keyspace name. Required.
:type keyspace_name: str
:param create_update_cassandra_keyspace_parameters: The parameters to provide for the current
Cassandra keyspace. Required.
:type create_update_cassandra_keyspace_parameters:
~azure.mgmt.cosmosdb.models.CassandraKeyspaceCreateUpdateParameters
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either CassandraKeyspaceGetResults or the result
of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.CassandraKeyspaceGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_create_update_cassandra_keyspace(
self,
resource_group_name: str,
account_name: str,
keyspace_name: str,
create_update_cassandra_keyspace_parameters: IO[bytes],
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.CassandraKeyspaceGetResults]:
"""Create or update an Azure Cosmos DB Cassandra keyspace.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param keyspace_name: Cosmos DB keyspace name. Required.
:type keyspace_name: str
:param create_update_cassandra_keyspace_parameters: The parameters to provide for the current
Cassandra keyspace. Required.
:type create_update_cassandra_keyspace_parameters: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either CassandraKeyspaceGetResults or the result
of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.CassandraKeyspaceGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def begin_create_update_cassandra_keyspace(
self,
resource_group_name: str,
account_name: str,
keyspace_name: str,
create_update_cassandra_keyspace_parameters: Union[_models.CassandraKeyspaceCreateUpdateParameters, IO[bytes]],
**kwargs: Any
) -> LROPoller[_models.CassandraKeyspaceGetResults]:
"""Create or update an Azure Cosmos DB Cassandra keyspace.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param keyspace_name: Cosmos DB keyspace name. Required.
:type keyspace_name: str
:param create_update_cassandra_keyspace_parameters: The parameters to provide for the current
Cassandra keyspace. Is either a CassandraKeyspaceCreateUpdateParameters type or a IO[bytes]
type. Required.
:type create_update_cassandra_keyspace_parameters:
~azure.mgmt.cosmosdb.models.CassandraKeyspaceCreateUpdateParameters or IO[bytes]
:return: An instance of LROPoller that returns either CassandraKeyspaceGetResults or the result
of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.CassandraKeyspaceGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.CassandraKeyspaceGetResults] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._create_update_cassandra_keyspace_initial(
resource_group_name=resource_group_name,
account_name=account_name,
keyspace_name=keyspace_name,
create_update_cassandra_keyspace_parameters=create_update_cassandra_keyspace_parameters,
api_version=api_version,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response):
deserialized = self._deserialize("CassandraKeyspaceGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[_models.CassandraKeyspaceGetResults].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[_models.CassandraKeyspaceGetResults](
self._client, raw_result, get_long_running_output, polling_method # type: ignore
)
def _delete_cassandra_keyspace_initial(
self, resource_group_name: str, account_name: str, keyspace_name: str, **kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
_request = build_delete_cassandra_keyspace_request(
resource_group_name=resource_group_name,
account_name=account_name,
keyspace_name=keyspace_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [202, 204]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("azure-AsyncOperation")
)
response_headers["location"] = self._deserialize("str", response.headers.get("location"))
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def begin_delete_cassandra_keyspace(
self, resource_group_name: str, account_name: str, keyspace_name: str, **kwargs: Any
) -> LROPoller[None]:
"""Deletes an existing Azure Cosmos DB Cassandra keyspace.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param keyspace_name: Cosmos DB keyspace name. Required.
:type keyspace_name: str
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[None] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._delete_cassandra_keyspace_initial(
resource_group_name=resource_group_name,
account_name=account_name,
keyspace_name=keyspace_name,
api_version=api_version,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements
if cls:
return cls(pipeline_response, None, {}) # type: ignore
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[None].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[None](self._client, raw_result, get_long_running_output, polling_method) # type: ignore
[docs]
@distributed_trace
def get_cassandra_keyspace_throughput(
self, resource_group_name: str, account_name: str, keyspace_name: str, **kwargs: Any
) -> _models.ThroughputSettingsGetResults:
"""Gets the RUs per second of the Cassandra Keyspace under an existing Azure Cosmos DB database
account with the provided name.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param keyspace_name: Cosmos DB keyspace name. Required.
:type keyspace_name: str
:return: ThroughputSettingsGetResults or the result of cls(response)
:rtype: ~azure.mgmt.cosmosdb.models.ThroughputSettingsGetResults
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.ThroughputSettingsGetResults] = kwargs.pop("cls", None)
_request = build_get_cassandra_keyspace_throughput_request(
resource_group_name=resource_group_name,
account_name=account_name,
keyspace_name=keyspace_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
deserialized = self._deserialize("ThroughputSettingsGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
def _update_cassandra_keyspace_throughput_initial( # pylint: disable=name-too-long
self,
resource_group_name: str,
account_name: str,
keyspace_name: str,
update_throughput_parameters: Union[_models.ThroughputSettingsUpdateParameters, IO[bytes]],
**kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(update_throughput_parameters, (IOBase, bytes)):
_content = update_throughput_parameters
else:
_json = self._serialize.body(update_throughput_parameters, "ThroughputSettingsUpdateParameters")
_request = build_update_cassandra_keyspace_throughput_request(
resource_group_name=resource_group_name,
account_name=account_name,
keyspace_name=keyspace_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("azure-AsyncOperation")
)
response_headers["location"] = self._deserialize("str", response.headers.get("location"))
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
@overload
def begin_update_cassandra_keyspace_throughput( # pylint: disable=name-too-long
self,
resource_group_name: str,
account_name: str,
keyspace_name: str,
update_throughput_parameters: _models.ThroughputSettingsUpdateParameters,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.ThroughputSettingsGetResults]:
"""Update RUs per second of an Azure Cosmos DB Cassandra Keyspace.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param keyspace_name: Cosmos DB keyspace name. Required.
:type keyspace_name: str
:param update_throughput_parameters: The RUs per second of the parameters to provide for the
current Cassandra Keyspace. Required.
:type update_throughput_parameters:
~azure.mgmt.cosmosdb.models.ThroughputSettingsUpdateParameters
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either ThroughputSettingsGetResults or the
result of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ThroughputSettingsGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_update_cassandra_keyspace_throughput( # pylint: disable=name-too-long
self,
resource_group_name: str,
account_name: str,
keyspace_name: str,
update_throughput_parameters: IO[bytes],
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.ThroughputSettingsGetResults]:
"""Update RUs per second of an Azure Cosmos DB Cassandra Keyspace.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param keyspace_name: Cosmos DB keyspace name. Required.
:type keyspace_name: str
:param update_throughput_parameters: The RUs per second of the parameters to provide for the
current Cassandra Keyspace. Required.
:type update_throughput_parameters: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either ThroughputSettingsGetResults or the
result of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ThroughputSettingsGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def begin_update_cassandra_keyspace_throughput( # pylint: disable=name-too-long
self,
resource_group_name: str,
account_name: str,
keyspace_name: str,
update_throughput_parameters: Union[_models.ThroughputSettingsUpdateParameters, IO[bytes]],
**kwargs: Any
) -> LROPoller[_models.ThroughputSettingsGetResults]:
"""Update RUs per second of an Azure Cosmos DB Cassandra Keyspace.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param keyspace_name: Cosmos DB keyspace name. Required.
:type keyspace_name: str
:param update_throughput_parameters: The RUs per second of the parameters to provide for the
current Cassandra Keyspace. Is either a ThroughputSettingsUpdateParameters type or a IO[bytes]
type. Required.
:type update_throughput_parameters:
~azure.mgmt.cosmosdb.models.ThroughputSettingsUpdateParameters or IO[bytes]
:return: An instance of LROPoller that returns either ThroughputSettingsGetResults or the
result of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ThroughputSettingsGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.ThroughputSettingsGetResults] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._update_cassandra_keyspace_throughput_initial(
resource_group_name=resource_group_name,
account_name=account_name,
keyspace_name=keyspace_name,
update_throughput_parameters=update_throughput_parameters,
api_version=api_version,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response):
deserialized = self._deserialize("ThroughputSettingsGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[_models.ThroughputSettingsGetResults].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[_models.ThroughputSettingsGetResults](
self._client, raw_result, get_long_running_output, polling_method # type: ignore
)
def _migrate_cassandra_keyspace_to_autoscale_initial( # pylint: disable=name-too-long
self, resource_group_name: str, account_name: str, keyspace_name: str, **kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
_request = build_migrate_cassandra_keyspace_to_autoscale_request(
resource_group_name=resource_group_name,
account_name=account_name,
keyspace_name=keyspace_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("azure-AsyncOperation")
)
response_headers["location"] = self._deserialize("str", response.headers.get("location"))
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def begin_migrate_cassandra_keyspace_to_autoscale( # pylint: disable=name-too-long
self, resource_group_name: str, account_name: str, keyspace_name: str, **kwargs: Any
) -> LROPoller[_models.ThroughputSettingsGetResults]:
"""Migrate an Azure Cosmos DB Cassandra Keyspace from manual throughput to autoscale.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param keyspace_name: Cosmos DB keyspace name. Required.
:type keyspace_name: str
:return: An instance of LROPoller that returns either ThroughputSettingsGetResults or the
result of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ThroughputSettingsGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.ThroughputSettingsGetResults] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._migrate_cassandra_keyspace_to_autoscale_initial(
resource_group_name=resource_group_name,
account_name=account_name,
keyspace_name=keyspace_name,
api_version=api_version,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response):
deserialized = self._deserialize("ThroughputSettingsGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[_models.ThroughputSettingsGetResults].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[_models.ThroughputSettingsGetResults](
self._client, raw_result, get_long_running_output, polling_method # type: ignore
)
def _migrate_cassandra_keyspace_to_manual_throughput_initial( # pylint: disable=name-too-long
self, resource_group_name: str, account_name: str, keyspace_name: str, **kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
_request = build_migrate_cassandra_keyspace_to_manual_throughput_request(
resource_group_name=resource_group_name,
account_name=account_name,
keyspace_name=keyspace_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("azure-AsyncOperation")
)
response_headers["location"] = self._deserialize("str", response.headers.get("location"))
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def begin_migrate_cassandra_keyspace_to_manual_throughput( # pylint: disable=name-too-long
self, resource_group_name: str, account_name: str, keyspace_name: str, **kwargs: Any
) -> LROPoller[_models.ThroughputSettingsGetResults]:
"""Migrate an Azure Cosmos DB Cassandra Keyspace from autoscale to manual throughput.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param keyspace_name: Cosmos DB keyspace name. Required.
:type keyspace_name: str
:return: An instance of LROPoller that returns either ThroughputSettingsGetResults or the
result of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ThroughputSettingsGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.ThroughputSettingsGetResults] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._migrate_cassandra_keyspace_to_manual_throughput_initial(
resource_group_name=resource_group_name,
account_name=account_name,
keyspace_name=keyspace_name,
api_version=api_version,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response):
deserialized = self._deserialize("ThroughputSettingsGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[_models.ThroughputSettingsGetResults].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[_models.ThroughputSettingsGetResults](
self._client, raw_result, get_long_running_output, polling_method # type: ignore
)
[docs]
@distributed_trace
def list_cassandra_tables(
self, resource_group_name: str, account_name: str, keyspace_name: str, **kwargs: Any
) -> Iterable["_models.CassandraTableGetResults"]:
"""Lists the Cassandra table under an existing Azure Cosmos DB database account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param keyspace_name: Cosmos DB keyspace name. Required.
:type keyspace_name: str
:return: An iterator like instance of either CassandraTableGetResults or the result of
cls(response)
:rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.cosmosdb.models.CassandraTableGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.CassandraTableListResult] = kwargs.pop("cls", None)
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
_request = build_list_cassandra_tables_request(
resource_group_name=resource_group_name,
account_name=account_name,
keyspace_name=keyspace_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
_request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
_request.url = self._client.format_url(_request.url)
_request.method = "GET"
return _request
def extract_data(pipeline_response):
deserialized = self._deserialize("CassandraTableListResult", pipeline_response)
list_of_elem = deserialized.value
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return None, iter(list_of_elem)
def get_next(next_link=None):
_request = prepare_request(next_link)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
return pipeline_response
return ItemPaged(get_next, extract_data)
[docs]
@distributed_trace
def get_cassandra_table(
self, resource_group_name: str, account_name: str, keyspace_name: str, table_name: str, **kwargs: Any
) -> _models.CassandraTableGetResults:
"""Gets the Cassandra table under an existing Azure Cosmos DB database account.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param keyspace_name: Cosmos DB keyspace name. Required.
:type keyspace_name: str
:param table_name: Cosmos DB table name. Required.
:type table_name: str
:return: CassandraTableGetResults or the result of cls(response)
:rtype: ~azure.mgmt.cosmosdb.models.CassandraTableGetResults
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.CassandraTableGetResults] = kwargs.pop("cls", None)
_request = build_get_cassandra_table_request(
resource_group_name=resource_group_name,
account_name=account_name,
keyspace_name=keyspace_name,
table_name=table_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
deserialized = self._deserialize("CassandraTableGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
def _create_update_cassandra_table_initial(
self,
resource_group_name: str,
account_name: str,
keyspace_name: str,
table_name: str,
create_update_cassandra_table_parameters: Union[_models.CassandraTableCreateUpdateParameters, IO[bytes]],
**kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(create_update_cassandra_table_parameters, (IOBase, bytes)):
_content = create_update_cassandra_table_parameters
else:
_json = self._serialize.body(
create_update_cassandra_table_parameters, "CassandraTableCreateUpdateParameters"
)
_request = build_create_update_cassandra_table_request(
resource_group_name=resource_group_name,
account_name=account_name,
keyspace_name=keyspace_name,
table_name=table_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("azure-AsyncOperation")
)
response_headers["location"] = self._deserialize("str", response.headers.get("location"))
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
@overload
def begin_create_update_cassandra_table(
self,
resource_group_name: str,
account_name: str,
keyspace_name: str,
table_name: str,
create_update_cassandra_table_parameters: _models.CassandraTableCreateUpdateParameters,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.CassandraTableGetResults]:
"""Create or update an Azure Cosmos DB Cassandra Table.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param keyspace_name: Cosmos DB keyspace name. Required.
:type keyspace_name: str
:param table_name: Cosmos DB table name. Required.
:type table_name: str
:param create_update_cassandra_table_parameters: The parameters to provide for the current
Cassandra Table. Required.
:type create_update_cassandra_table_parameters:
~azure.mgmt.cosmosdb.models.CassandraTableCreateUpdateParameters
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either CassandraTableGetResults or the result of
cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.CassandraTableGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_create_update_cassandra_table(
self,
resource_group_name: str,
account_name: str,
keyspace_name: str,
table_name: str,
create_update_cassandra_table_parameters: IO[bytes],
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.CassandraTableGetResults]:
"""Create or update an Azure Cosmos DB Cassandra Table.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param keyspace_name: Cosmos DB keyspace name. Required.
:type keyspace_name: str
:param table_name: Cosmos DB table name. Required.
:type table_name: str
:param create_update_cassandra_table_parameters: The parameters to provide for the current
Cassandra Table. Required.
:type create_update_cassandra_table_parameters: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either CassandraTableGetResults or the result of
cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.CassandraTableGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def begin_create_update_cassandra_table(
self,
resource_group_name: str,
account_name: str,
keyspace_name: str,
table_name: str,
create_update_cassandra_table_parameters: Union[_models.CassandraTableCreateUpdateParameters, IO[bytes]],
**kwargs: Any
) -> LROPoller[_models.CassandraTableGetResults]:
"""Create or update an Azure Cosmos DB Cassandra Table.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param keyspace_name: Cosmos DB keyspace name. Required.
:type keyspace_name: str
:param table_name: Cosmos DB table name. Required.
:type table_name: str
:param create_update_cassandra_table_parameters: The parameters to provide for the current
Cassandra Table. Is either a CassandraTableCreateUpdateParameters type or a IO[bytes] type.
Required.
:type create_update_cassandra_table_parameters:
~azure.mgmt.cosmosdb.models.CassandraTableCreateUpdateParameters or IO[bytes]
:return: An instance of LROPoller that returns either CassandraTableGetResults or the result of
cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.CassandraTableGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.CassandraTableGetResults] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._create_update_cassandra_table_initial(
resource_group_name=resource_group_name,
account_name=account_name,
keyspace_name=keyspace_name,
table_name=table_name,
create_update_cassandra_table_parameters=create_update_cassandra_table_parameters,
api_version=api_version,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response):
deserialized = self._deserialize("CassandraTableGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[_models.CassandraTableGetResults].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[_models.CassandraTableGetResults](
self._client, raw_result, get_long_running_output, polling_method # type: ignore
)
def _delete_cassandra_table_initial(
self, resource_group_name: str, account_name: str, keyspace_name: str, table_name: str, **kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
_request = build_delete_cassandra_table_request(
resource_group_name=resource_group_name,
account_name=account_name,
keyspace_name=keyspace_name,
table_name=table_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [202, 204]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("azure-AsyncOperation")
)
response_headers["location"] = self._deserialize("str", response.headers.get("location"))
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def begin_delete_cassandra_table(
self, resource_group_name: str, account_name: str, keyspace_name: str, table_name: str, **kwargs: Any
) -> LROPoller[None]:
"""Deletes an existing Azure Cosmos DB Cassandra table.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param keyspace_name: Cosmos DB keyspace name. Required.
:type keyspace_name: str
:param table_name: Cosmos DB table name. Required.
:type table_name: str
:return: An instance of LROPoller that returns either None or the result of cls(response)
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[None] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._delete_cassandra_table_initial(
resource_group_name=resource_group_name,
account_name=account_name,
keyspace_name=keyspace_name,
table_name=table_name,
api_version=api_version,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements
if cls:
return cls(pipeline_response, None, {}) # type: ignore
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[None].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[None](self._client, raw_result, get_long_running_output, polling_method) # type: ignore
[docs]
@distributed_trace
def get_cassandra_table_throughput(
self, resource_group_name: str, account_name: str, keyspace_name: str, table_name: str, **kwargs: Any
) -> _models.ThroughputSettingsGetResults:
"""Gets the RUs per second of the Cassandra table under an existing Azure Cosmos DB database
account with the provided name.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param keyspace_name: Cosmos DB keyspace name. Required.
:type keyspace_name: str
:param table_name: Cosmos DB table name. Required.
:type table_name: str
:return: ThroughputSettingsGetResults or the result of cls(response)
:rtype: ~azure.mgmt.cosmosdb.models.ThroughputSettingsGetResults
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.ThroughputSettingsGetResults] = kwargs.pop("cls", None)
_request = build_get_cassandra_table_throughput_request(
resource_group_name=resource_group_name,
account_name=account_name,
keyspace_name=keyspace_name,
table_name=table_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
deserialized = self._deserialize("ThroughputSettingsGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
def _update_cassandra_table_throughput_initial( # pylint: disable=name-too-long
self,
resource_group_name: str,
account_name: str,
keyspace_name: str,
table_name: str,
update_throughput_parameters: Union[_models.ThroughputSettingsUpdateParameters, IO[bytes]],
**kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(update_throughput_parameters, (IOBase, bytes)):
_content = update_throughput_parameters
else:
_json = self._serialize.body(update_throughput_parameters, "ThroughputSettingsUpdateParameters")
_request = build_update_cassandra_table_throughput_request(
resource_group_name=resource_group_name,
account_name=account_name,
keyspace_name=keyspace_name,
table_name=table_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("azure-AsyncOperation")
)
response_headers["location"] = self._deserialize("str", response.headers.get("location"))
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
@overload
def begin_update_cassandra_table_throughput(
self,
resource_group_name: str,
account_name: str,
keyspace_name: str,
table_name: str,
update_throughput_parameters: _models.ThroughputSettingsUpdateParameters,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.ThroughputSettingsGetResults]:
"""Update RUs per second of an Azure Cosmos DB Cassandra table.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param keyspace_name: Cosmos DB keyspace name. Required.
:type keyspace_name: str
:param table_name: Cosmos DB table name. Required.
:type table_name: str
:param update_throughput_parameters: The RUs per second of the parameters to provide for the
current Cassandra table. Required.
:type update_throughput_parameters:
~azure.mgmt.cosmosdb.models.ThroughputSettingsUpdateParameters
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either ThroughputSettingsGetResults or the
result of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ThroughputSettingsGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_update_cassandra_table_throughput(
self,
resource_group_name: str,
account_name: str,
keyspace_name: str,
table_name: str,
update_throughput_parameters: IO[bytes],
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.ThroughputSettingsGetResults]:
"""Update RUs per second of an Azure Cosmos DB Cassandra table.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param keyspace_name: Cosmos DB keyspace name. Required.
:type keyspace_name: str
:param table_name: Cosmos DB table name. Required.
:type table_name: str
:param update_throughput_parameters: The RUs per second of the parameters to provide for the
current Cassandra table. Required.
:type update_throughput_parameters: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either ThroughputSettingsGetResults or the
result of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ThroughputSettingsGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def begin_update_cassandra_table_throughput(
self,
resource_group_name: str,
account_name: str,
keyspace_name: str,
table_name: str,
update_throughput_parameters: Union[_models.ThroughputSettingsUpdateParameters, IO[bytes]],
**kwargs: Any
) -> LROPoller[_models.ThroughputSettingsGetResults]:
"""Update RUs per second of an Azure Cosmos DB Cassandra table.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param keyspace_name: Cosmos DB keyspace name. Required.
:type keyspace_name: str
:param table_name: Cosmos DB table name. Required.
:type table_name: str
:param update_throughput_parameters: The RUs per second of the parameters to provide for the
current Cassandra table. Is either a ThroughputSettingsUpdateParameters type or a IO[bytes]
type. Required.
:type update_throughput_parameters:
~azure.mgmt.cosmosdb.models.ThroughputSettingsUpdateParameters or IO[bytes]
:return: An instance of LROPoller that returns either ThroughputSettingsGetResults or the
result of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ThroughputSettingsGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.ThroughputSettingsGetResults] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._update_cassandra_table_throughput_initial(
resource_group_name=resource_group_name,
account_name=account_name,
keyspace_name=keyspace_name,
table_name=table_name,
update_throughput_parameters=update_throughput_parameters,
api_version=api_version,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response):
deserialized = self._deserialize("ThroughputSettingsGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[_models.ThroughputSettingsGetResults].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[_models.ThroughputSettingsGetResults](
self._client, raw_result, get_long_running_output, polling_method # type: ignore
)
def _migrate_cassandra_table_to_autoscale_initial( # pylint: disable=name-too-long
self, resource_group_name: str, account_name: str, keyspace_name: str, table_name: str, **kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
_request = build_migrate_cassandra_table_to_autoscale_request(
resource_group_name=resource_group_name,
account_name=account_name,
keyspace_name=keyspace_name,
table_name=table_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("azure-AsyncOperation")
)
response_headers["location"] = self._deserialize("str", response.headers.get("location"))
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def begin_migrate_cassandra_table_to_autoscale( # pylint: disable=name-too-long
self, resource_group_name: str, account_name: str, keyspace_name: str, table_name: str, **kwargs: Any
) -> LROPoller[_models.ThroughputSettingsGetResults]:
"""Migrate an Azure Cosmos DB Cassandra table from manual throughput to autoscale.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param keyspace_name: Cosmos DB keyspace name. Required.
:type keyspace_name: str
:param table_name: Cosmos DB table name. Required.
:type table_name: str
:return: An instance of LROPoller that returns either ThroughputSettingsGetResults or the
result of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ThroughputSettingsGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.ThroughputSettingsGetResults] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._migrate_cassandra_table_to_autoscale_initial(
resource_group_name=resource_group_name,
account_name=account_name,
keyspace_name=keyspace_name,
table_name=table_name,
api_version=api_version,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response):
deserialized = self._deserialize("ThroughputSettingsGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[_models.ThroughputSettingsGetResults].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[_models.ThroughputSettingsGetResults](
self._client, raw_result, get_long_running_output, polling_method # type: ignore
)
def _migrate_cassandra_table_to_manual_throughput_initial( # pylint: disable=name-too-long
self, resource_group_name: str, account_name: str, keyspace_name: str, table_name: str, **kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
_request = build_migrate_cassandra_table_to_manual_throughput_request(
resource_group_name=resource_group_name,
account_name=account_name,
keyspace_name=keyspace_name,
table_name=table_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request.url = self._client.format_url(_request.url)
_decompress = kwargs.pop("decompress", True)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("azure-AsyncOperation")
)
response_headers["location"] = self._deserialize("str", response.headers.get("location"))
deserialized = response.stream_download(self._client._pipeline, decompress=_decompress)
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def begin_migrate_cassandra_table_to_manual_throughput( # pylint: disable=name-too-long
self, resource_group_name: str, account_name: str, keyspace_name: str, table_name: str, **kwargs: Any
) -> LROPoller[_models.ThroughputSettingsGetResults]:
"""Migrate an Azure Cosmos DB Cassandra table from autoscale to manual throughput.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param account_name: Cosmos DB database account name. Required.
:type account_name: str
:param keyspace_name: Cosmos DB keyspace name. Required.
:type keyspace_name: str
:param table_name: Cosmos DB table name. Required.
:type table_name: str
:return: An instance of LROPoller that returns either ThroughputSettingsGetResults or the
result of cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ThroughputSettingsGetResults]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.ThroughputSettingsGetResults] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._migrate_cassandra_table_to_manual_throughput_initial(
resource_group_name=resource_group_name,
account_name=account_name,
keyspace_name=keyspace_name,
table_name=table_name,
api_version=api_version,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response):
deserialized = self._deserialize("ThroughputSettingsGetResults", pipeline_response.http_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized
if polling is True:
polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs))
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[_models.ThroughputSettingsGetResults].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[_models.ThroughputSettingsGetResults](
self._client, raw_result, get_long_running_output, polling_method # type: ignore
)