Source code for azure.mgmt.cosmosdb.operations._mongo_db_resources_operations

# pylint: disable=line-too-long,useless-suppression,too-many-lines
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
from collections.abc import MutableMapping
from io import IOBase
from typing import Any, Callable, Dict, IO, Iterable, Iterator, Optional, TypeVar, Union, cast, overload
import urllib.parse

from azure.core import PipelineClient
from azure.core.exceptions import (
    ClientAuthenticationError,
    HttpResponseError,
    ResourceExistsError,
    ResourceNotFoundError,
    ResourceNotModifiedError,
    StreamClosedError,
    StreamConsumedError,
    map_error,
)
from azure.core.paging import ItemPaged
from azure.core.pipeline import PipelineResponse
from azure.core.polling import LROPoller, NoPolling, PollingMethod
from azure.core.rest import HttpRequest, HttpResponse
from azure.core.tracing.decorator import distributed_trace
from azure.core.utils import case_insensitive_dict
from azure.mgmt.core.exceptions import ARMErrorFormat
from azure.mgmt.core.polling.arm_polling import ARMPolling

from .. import models as _models
from .._configuration import CosmosDBManagementClientConfiguration
from .._utils.serialization import Deserializer, Serializer

T = TypeVar("T")
ClsType = Optional[Callable[[PipelineResponse[HttpRequest, HttpResponse], T, Dict[str, Any]], Any]]

_SERIALIZER = Serializer()
_SERIALIZER.client_side_validation = False


def build_list_mongo_db_databases_request(
    resource_group_name: str, account_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/mongodbDatabases",
    )
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "accountName": _SERIALIZER.url(
            "account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
        ),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)


def build_get_mongo_db_database_request(
    resource_group_name: str, account_name: str, database_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/mongodbDatabases/{databaseName}",
    )
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "accountName": _SERIALIZER.url(
            "account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
        ),
        "databaseName": _SERIALIZER.url("database_name", database_name, "str"),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)


def build_create_update_mongo_db_database_request(  # pylint: disable=name-too-long
    resource_group_name: str, account_name: str, database_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
    content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/mongodbDatabases/{databaseName}",
    )
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "accountName": _SERIALIZER.url(
            "account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
        ),
        "databaseName": _SERIALIZER.url("database_name", database_name, "str"),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    if content_type is not None:
        _headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="PUT", url=_url, params=_params, headers=_headers, **kwargs)


def build_delete_mongo_db_database_request(
    resource_group_name: str, account_name: str, database_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/mongodbDatabases/{databaseName}",
    )
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "accountName": _SERIALIZER.url(
            "account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
        ),
        "databaseName": _SERIALIZER.url("database_name", database_name, "str"),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    return HttpRequest(method="DELETE", url=_url, params=_params, **kwargs)


def build_get_mongo_db_database_throughput_request(  # pylint: disable=name-too-long
    resource_group_name: str, account_name: str, database_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/mongodbDatabases/{databaseName}/throughputSettings/default",
    )
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "accountName": _SERIALIZER.url(
            "account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
        ),
        "databaseName": _SERIALIZER.url("database_name", database_name, "str"),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)


def build_update_mongo_db_database_throughput_request(  # pylint: disable=name-too-long
    resource_group_name: str, account_name: str, database_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
    content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/mongodbDatabases/{databaseName}/throughputSettings/default",
    )
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "accountName": _SERIALIZER.url(
            "account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
        ),
        "databaseName": _SERIALIZER.url("database_name", database_name, "str"),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    if content_type is not None:
        _headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="PUT", url=_url, params=_params, headers=_headers, **kwargs)


def build_migrate_mongo_db_database_to_autoscale_request(  # pylint: disable=name-too-long
    resource_group_name: str, account_name: str, database_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/mongodbDatabases/{databaseName}/throughputSettings/default/migrateToAutoscale",
    )
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "accountName": _SERIALIZER.url(
            "account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
        ),
        "databaseName": _SERIALIZER.url("database_name", database_name, "str"),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)


def build_migrate_mongo_db_database_to_manual_throughput_request(  # pylint: disable=name-too-long
    resource_group_name: str, account_name: str, database_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/mongodbDatabases/{databaseName}/throughputSettings/default/migrateToManualThroughput",
    )
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "accountName": _SERIALIZER.url(
            "account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
        ),
        "databaseName": _SERIALIZER.url("database_name", database_name, "str"),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)


def build_list_mongo_db_collections_request(
    resource_group_name: str, account_name: str, database_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/mongodbDatabases/{databaseName}/collections",
    )
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "accountName": _SERIALIZER.url(
            "account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
        ),
        "databaseName": _SERIALIZER.url("database_name", database_name, "str"),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)


def build_get_mongo_db_collection_request(
    resource_group_name: str,
    account_name: str,
    database_name: str,
    collection_name: str,
    subscription_id: str,
    **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/mongodbDatabases/{databaseName}/collections/{collectionName}",
    )
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "accountName": _SERIALIZER.url(
            "account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
        ),
        "databaseName": _SERIALIZER.url("database_name", database_name, "str"),
        "collectionName": _SERIALIZER.url("collection_name", collection_name, "str"),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)


def build_create_update_mongo_db_collection_request(  # pylint: disable=name-too-long
    resource_group_name: str,
    account_name: str,
    database_name: str,
    collection_name: str,
    subscription_id: str,
    **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
    content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/mongodbDatabases/{databaseName}/collections/{collectionName}",
    )
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "accountName": _SERIALIZER.url(
            "account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
        ),
        "databaseName": _SERIALIZER.url("database_name", database_name, "str"),
        "collectionName": _SERIALIZER.url("collection_name", collection_name, "str"),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    if content_type is not None:
        _headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="PUT", url=_url, params=_params, headers=_headers, **kwargs)


def build_delete_mongo_db_collection_request(
    resource_group_name: str,
    account_name: str,
    database_name: str,
    collection_name: str,
    subscription_id: str,
    **kwargs: Any
) -> HttpRequest:
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/mongodbDatabases/{databaseName}/collections/{collectionName}",
    )
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "accountName": _SERIALIZER.url(
            "account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
        ),
        "databaseName": _SERIALIZER.url("database_name", database_name, "str"),
        "collectionName": _SERIALIZER.url("collection_name", collection_name, "str"),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    return HttpRequest(method="DELETE", url=_url, params=_params, **kwargs)


def build_get_mongo_db_collection_throughput_request(  # pylint: disable=name-too-long
    resource_group_name: str,
    account_name: str,
    database_name: str,
    collection_name: str,
    subscription_id: str,
    **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/mongodbDatabases/{databaseName}/collections/{collectionName}/throughputSettings/default",
    )
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "accountName": _SERIALIZER.url(
            "account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
        ),
        "databaseName": _SERIALIZER.url("database_name", database_name, "str"),
        "collectionName": _SERIALIZER.url("collection_name", collection_name, "str"),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)


def build_update_mongo_db_collection_throughput_request(  # pylint: disable=name-too-long
    resource_group_name: str,
    account_name: str,
    database_name: str,
    collection_name: str,
    subscription_id: str,
    **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
    content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/mongodbDatabases/{databaseName}/collections/{collectionName}/throughputSettings/default",
    )
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "accountName": _SERIALIZER.url(
            "account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
        ),
        "databaseName": _SERIALIZER.url("database_name", database_name, "str"),
        "collectionName": _SERIALIZER.url("collection_name", collection_name, "str"),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    if content_type is not None:
        _headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="PUT", url=_url, params=_params, headers=_headers, **kwargs)


def build_migrate_mongo_db_collection_to_autoscale_request(  # pylint: disable=name-too-long
    resource_group_name: str,
    account_name: str,
    database_name: str,
    collection_name: str,
    subscription_id: str,
    **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/mongodbDatabases/{databaseName}/collections/{collectionName}/throughputSettings/default/migrateToAutoscale",
    )
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "accountName": _SERIALIZER.url(
            "account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
        ),
        "databaseName": _SERIALIZER.url("database_name", database_name, "str"),
        "collectionName": _SERIALIZER.url("collection_name", collection_name, "str"),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)


def build_migrate_mongo_db_collection_to_manual_throughput_request(  # pylint: disable=name-too-long
    resource_group_name: str,
    account_name: str,
    database_name: str,
    collection_name: str,
    subscription_id: str,
    **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/mongodbDatabases/{databaseName}/collections/{collectionName}/throughputSettings/default/migrateToManualThroughput",
    )
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "accountName": _SERIALIZER.url(
            "account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
        ),
        "databaseName": _SERIALIZER.url("database_name", database_name, "str"),
        "collectionName": _SERIALIZER.url("collection_name", collection_name, "str"),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)


def build_get_mongo_role_definition_request(
    mongo_role_definition_id: str, resource_group_name: str, account_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/mongodbRoleDefinitions/{mongoRoleDefinitionId}",
    )
    path_format_arguments = {
        "mongoRoleDefinitionId": _SERIALIZER.url("mongo_role_definition_id", mongo_role_definition_id, "str"),
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "accountName": _SERIALIZER.url(
            "account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
        ),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)


def build_create_update_mongo_role_definition_request(  # pylint: disable=name-too-long
    mongo_role_definition_id: str, resource_group_name: str, account_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
    content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/mongodbRoleDefinitions/{mongoRoleDefinitionId}",
    )
    path_format_arguments = {
        "mongoRoleDefinitionId": _SERIALIZER.url("mongo_role_definition_id", mongo_role_definition_id, "str"),
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "accountName": _SERIALIZER.url(
            "account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
        ),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    if content_type is not None:
        _headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="PUT", url=_url, params=_params, headers=_headers, **kwargs)


def build_delete_mongo_role_definition_request(  # pylint: disable=name-too-long
    mongo_role_definition_id: str, resource_group_name: str, account_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/mongodbRoleDefinitions/{mongoRoleDefinitionId}",
    )
    path_format_arguments = {
        "mongoRoleDefinitionId": _SERIALIZER.url("mongo_role_definition_id", mongo_role_definition_id, "str"),
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "accountName": _SERIALIZER.url(
            "account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
        ),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="DELETE", url=_url, params=_params, headers=_headers, **kwargs)


def build_list_mongo_role_definitions_request(  # pylint: disable=name-too-long
    resource_group_name: str, account_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/mongodbRoleDefinitions",
    )
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "accountName": _SERIALIZER.url(
            "account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
        ),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)


def build_get_mongo_user_definition_request(
    mongo_user_definition_id: str, resource_group_name: str, account_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/mongodbUserDefinitions/{mongoUserDefinitionId}",
    )
    path_format_arguments = {
        "mongoUserDefinitionId": _SERIALIZER.url("mongo_user_definition_id", mongo_user_definition_id, "str"),
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "accountName": _SERIALIZER.url(
            "account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
        ),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)


def build_create_update_mongo_user_definition_request(  # pylint: disable=name-too-long
    mongo_user_definition_id: str, resource_group_name: str, account_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
    content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/mongodbUserDefinitions/{mongoUserDefinitionId}",
    )
    path_format_arguments = {
        "mongoUserDefinitionId": _SERIALIZER.url("mongo_user_definition_id", mongo_user_definition_id, "str"),
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "accountName": _SERIALIZER.url(
            "account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
        ),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    if content_type is not None:
        _headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="PUT", url=_url, params=_params, headers=_headers, **kwargs)


def build_delete_mongo_user_definition_request(  # pylint: disable=name-too-long
    mongo_user_definition_id: str, resource_group_name: str, account_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/mongodbUserDefinitions/{mongoUserDefinitionId}",
    )
    path_format_arguments = {
        "mongoUserDefinitionId": _SERIALIZER.url("mongo_user_definition_id", mongo_user_definition_id, "str"),
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "accountName": _SERIALIZER.url(
            "account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
        ),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="DELETE", url=_url, params=_params, headers=_headers, **kwargs)


def build_list_mongo_user_definitions_request(  # pylint: disable=name-too-long
    resource_group_name: str, account_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/mongodbUserDefinitions",
    )
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "accountName": _SERIALIZER.url(
            "account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
        ),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)


def build_retrieve_continuous_backup_information_request(  # pylint: disable=name-too-long
    resource_group_name: str,
    account_name: str,
    database_name: str,
    collection_name: str,
    subscription_id: str,
    **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2025-04-15"))
    content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DocumentDB/databaseAccounts/{accountName}/mongodbDatabases/{databaseName}/collections/{collectionName}/retrieveContinuousBackupInformation",
    )
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str", min_length=1),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "accountName": _SERIALIZER.url(
            "account_name", account_name, "str", max_length=50, min_length=3, pattern=r"^[a-z0-9]+(-[a-z0-9]+)*"
        ),
        "databaseName": _SERIALIZER.url("database_name", database_name, "str"),
        "collectionName": _SERIALIZER.url("collection_name", collection_name, "str"),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    if content_type is not None:
        _headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)


[docs] class MongoDBResourcesOperations: # pylint: disable=too-many-public-methods """ .. warning:: **DO NOT** instantiate this class directly. Instead, you should access the following operations through :class:`~azure.mgmt.cosmosdb.CosmosDBManagementClient`'s :attr:`mongo_db_resources` attribute. """ models = _models def __init__(self, *args, **kwargs): input_args = list(args) self._client: PipelineClient = input_args.pop(0) if input_args else kwargs.pop("client") self._config: CosmosDBManagementClientConfiguration = input_args.pop(0) if input_args else kwargs.pop("config") self._serialize: Serializer = input_args.pop(0) if input_args else kwargs.pop("serializer") self._deserialize: Deserializer = input_args.pop(0) if input_args else kwargs.pop("deserializer")
[docs] @distributed_trace def list_mongo_db_databases( self, resource_group_name: str, account_name: str, **kwargs: Any ) -> Iterable["_models.MongoDBDatabaseGetResults"]: """Lists the MongoDB databases under an existing Azure Cosmos DB database account. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :return: An iterator like instance of either MongoDBDatabaseGetResults or the result of cls(response) :rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.cosmosdb.models.MongoDBDatabaseGetResults] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[_models.MongoDBDatabaseListResult] = kwargs.pop("cls", None) error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) def prepare_request(next_link=None): if not next_link: _request = build_list_mongo_db_databases_request( resource_group_name=resource_group_name, account_name=account_name, subscription_id=self._config.subscription_id, api_version=api_version, headers=_headers, params=_params, ) _request.url = self._client.format_url(_request.url) else: # make call to next link with the client's api-version _parsed_next_link = urllib.parse.urlparse(next_link) _next_request_params = case_insensitive_dict( { key: [urllib.parse.quote(v) for v in value] for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items() } ) _next_request_params["api-version"] = self._config.api_version _request = HttpRequest( "GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params ) _request.url = self._client.format_url(_request.url) _request.method = "GET" return _request def extract_data(pipeline_response): deserialized = self._deserialize("MongoDBDatabaseListResult", pipeline_response) list_of_elem = deserialized.value if cls: list_of_elem = cls(list_of_elem) # type: ignore return None, iter(list_of_elem) def get_next(next_link=None): _request = prepare_request(next_link) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) return pipeline_response return ItemPaged(get_next, extract_data)
[docs] @distributed_trace def get_mongo_db_database( self, resource_group_name: str, account_name: str, database_name: str, **kwargs: Any ) -> _models.MongoDBDatabaseGetResults: """Gets the MongoDB databases under an existing Azure Cosmos DB database account with the provided name. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :param database_name: Cosmos DB database name. Required. :type database_name: str :return: MongoDBDatabaseGetResults or the result of cls(response) :rtype: ~azure.mgmt.cosmosdb.models.MongoDBDatabaseGetResults :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[_models.MongoDBDatabaseGetResults] = kwargs.pop("cls", None) _request = build_get_mongo_db_database_request( resource_group_name=resource_group_name, account_name=account_name, database_name=database_name, subscription_id=self._config.subscription_id, api_version=api_version, headers=_headers, params=_params, ) _request.url = self._client.format_url(_request.url) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) deserialized = self._deserialize("MongoDBDatabaseGetResults", pipeline_response.http_response) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
def _create_update_mongo_db_database_initial( self, resource_group_name: str, account_name: str, database_name: str, create_update_mongo_db_database_parameters: Union[_models.MongoDBDatabaseCreateUpdateParameters, IO[bytes]], **kwargs: Any ) -> Iterator[bytes]: error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None) content_type = content_type or "application/json" _json = None _content = None if isinstance(create_update_mongo_db_database_parameters, (IOBase, bytes)): _content = create_update_mongo_db_database_parameters else: _json = self._serialize.body( create_update_mongo_db_database_parameters, "MongoDBDatabaseCreateUpdateParameters" ) _request = build_create_update_mongo_db_database_request( resource_group_name=resource_group_name, account_name=account_name, database_name=database_name, subscription_id=self._config.subscription_id, api_version=api_version, content_type=content_type, json=_json, content=_content, headers=_headers, params=_params, ) _request.url = self._client.format_url(_request.url) _decompress = kwargs.pop("decompress", True) _stream = True pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200, 202]: try: response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) response_headers = {} if response.status_code == 202: response_headers["azure-AsyncOperation"] = self._deserialize( "str", response.headers.get("azure-AsyncOperation") ) response_headers["location"] = self._deserialize("str", response.headers.get("location")) deserialized = response.stream_download(self._client._pipeline, decompress=_decompress) if cls: return cls(pipeline_response, deserialized, response_headers) # type: ignore return deserialized # type: ignore @overload def begin_create_update_mongo_db_database( self, resource_group_name: str, account_name: str, database_name: str, create_update_mongo_db_database_parameters: _models.MongoDBDatabaseCreateUpdateParameters, *, content_type: str = "application/json", **kwargs: Any ) -> LROPoller[_models.MongoDBDatabaseGetResults]: """Create or updates Azure Cosmos DB MongoDB database. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :param database_name: Cosmos DB database name. Required. :type database_name: str :param create_update_mongo_db_database_parameters: The parameters to provide for the current MongoDB database. Required. :type create_update_mongo_db_database_parameters: ~azure.mgmt.cosmosdb.models.MongoDBDatabaseCreateUpdateParameters :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: An instance of LROPoller that returns either MongoDBDatabaseGetResults or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.MongoDBDatabaseGetResults] :raises ~azure.core.exceptions.HttpResponseError: """ @overload def begin_create_update_mongo_db_database( self, resource_group_name: str, account_name: str, database_name: str, create_update_mongo_db_database_parameters: IO[bytes], *, content_type: str = "application/json", **kwargs: Any ) -> LROPoller[_models.MongoDBDatabaseGetResults]: """Create or updates Azure Cosmos DB MongoDB database. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :param database_name: Cosmos DB database name. Required. :type database_name: str :param create_update_mongo_db_database_parameters: The parameters to provide for the current MongoDB database. Required. :type create_update_mongo_db_database_parameters: IO[bytes] :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: An instance of LROPoller that returns either MongoDBDatabaseGetResults or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.MongoDBDatabaseGetResults] :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace def begin_create_update_mongo_db_database( self, resource_group_name: str, account_name: str, database_name: str, create_update_mongo_db_database_parameters: Union[_models.MongoDBDatabaseCreateUpdateParameters, IO[bytes]], **kwargs: Any ) -> LROPoller[_models.MongoDBDatabaseGetResults]: """Create or updates Azure Cosmos DB MongoDB database. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :param database_name: Cosmos DB database name. Required. :type database_name: str :param create_update_mongo_db_database_parameters: The parameters to provide for the current MongoDB database. Is either a MongoDBDatabaseCreateUpdateParameters type or a IO[bytes] type. Required. :type create_update_mongo_db_database_parameters: ~azure.mgmt.cosmosdb.models.MongoDBDatabaseCreateUpdateParameters or IO[bytes] :return: An instance of LROPoller that returns either MongoDBDatabaseGetResults or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.MongoDBDatabaseGetResults] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[_models.MongoDBDatabaseGetResults] = kwargs.pop("cls", None) polling: Union[bool, PollingMethod] = kwargs.pop("polling", True) lro_delay = kwargs.pop("polling_interval", self._config.polling_interval) cont_token: Optional[str] = kwargs.pop("continuation_token", None) if cont_token is None: raw_result = self._create_update_mongo_db_database_initial( resource_group_name=resource_group_name, account_name=account_name, database_name=database_name, create_update_mongo_db_database_parameters=create_update_mongo_db_database_parameters, api_version=api_version, content_type=content_type, cls=lambda x, y, z: x, headers=_headers, params=_params, **kwargs ) raw_result.http_response.read() # type: ignore kwargs.pop("error_map", None) def get_long_running_output(pipeline_response): deserialized = self._deserialize("MongoDBDatabaseGetResults", pipeline_response.http_response) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized if polling is True: polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs)) elif polling is False: polling_method = cast(PollingMethod, NoPolling()) else: polling_method = polling if cont_token: return LROPoller[_models.MongoDBDatabaseGetResults].from_continuation_token( polling_method=polling_method, continuation_token=cont_token, client=self._client, deserialization_callback=get_long_running_output, ) return LROPoller[_models.MongoDBDatabaseGetResults]( self._client, raw_result, get_long_running_output, polling_method # type: ignore )
def _delete_mongo_db_database_initial( self, resource_group_name: str, account_name: str, database_name: str, **kwargs: Any ) -> Iterator[bytes]: error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None) _request = build_delete_mongo_db_database_request( resource_group_name=resource_group_name, account_name=account_name, database_name=database_name, subscription_id=self._config.subscription_id, api_version=api_version, headers=_headers, params=_params, ) _request.url = self._client.format_url(_request.url) _decompress = kwargs.pop("decompress", True) _stream = True pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [202, 204]: try: response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) response_headers = {} if response.status_code == 202: response_headers["azure-AsyncOperation"] = self._deserialize( "str", response.headers.get("azure-AsyncOperation") ) response_headers["location"] = self._deserialize("str", response.headers.get("location")) deserialized = response.stream_download(self._client._pipeline, decompress=_decompress) if cls: return cls(pipeline_response, deserialized, response_headers) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace def begin_delete_mongo_db_database( self, resource_group_name: str, account_name: str, database_name: str, **kwargs: Any ) -> LROPoller[None]: """Deletes an existing Azure Cosmos DB MongoDB database. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :param database_name: Cosmos DB database name. Required. :type database_name: str :return: An instance of LROPoller that returns either None or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[None] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[None] = kwargs.pop("cls", None) polling: Union[bool, PollingMethod] = kwargs.pop("polling", True) lro_delay = kwargs.pop("polling_interval", self._config.polling_interval) cont_token: Optional[str] = kwargs.pop("continuation_token", None) if cont_token is None: raw_result = self._delete_mongo_db_database_initial( resource_group_name=resource_group_name, account_name=account_name, database_name=database_name, api_version=api_version, cls=lambda x, y, z: x, headers=_headers, params=_params, **kwargs ) raw_result.http_response.read() # type: ignore kwargs.pop("error_map", None) def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements if cls: return cls(pipeline_response, None, {}) # type: ignore if polling is True: polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs)) elif polling is False: polling_method = cast(PollingMethod, NoPolling()) else: polling_method = polling if cont_token: return LROPoller[None].from_continuation_token( polling_method=polling_method, continuation_token=cont_token, client=self._client, deserialization_callback=get_long_running_output, ) return LROPoller[None](self._client, raw_result, get_long_running_output, polling_method) # type: ignore
[docs] @distributed_trace def get_mongo_db_database_throughput( self, resource_group_name: str, account_name: str, database_name: str, **kwargs: Any ) -> _models.ThroughputSettingsGetResults: """Gets the RUs per second of the MongoDB database under an existing Azure Cosmos DB database account with the provided name. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :param database_name: Cosmos DB database name. Required. :type database_name: str :return: ThroughputSettingsGetResults or the result of cls(response) :rtype: ~azure.mgmt.cosmosdb.models.ThroughputSettingsGetResults :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[_models.ThroughputSettingsGetResults] = kwargs.pop("cls", None) _request = build_get_mongo_db_database_throughput_request( resource_group_name=resource_group_name, account_name=account_name, database_name=database_name, subscription_id=self._config.subscription_id, api_version=api_version, headers=_headers, params=_params, ) _request.url = self._client.format_url(_request.url) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) deserialized = self._deserialize("ThroughputSettingsGetResults", pipeline_response.http_response) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
def _update_mongo_db_database_throughput_initial( # pylint: disable=name-too-long self, resource_group_name: str, account_name: str, database_name: str, update_throughput_parameters: Union[_models.ThroughputSettingsUpdateParameters, IO[bytes]], **kwargs: Any ) -> Iterator[bytes]: error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None) content_type = content_type or "application/json" _json = None _content = None if isinstance(update_throughput_parameters, (IOBase, bytes)): _content = update_throughput_parameters else: _json = self._serialize.body(update_throughput_parameters, "ThroughputSettingsUpdateParameters") _request = build_update_mongo_db_database_throughput_request( resource_group_name=resource_group_name, account_name=account_name, database_name=database_name, subscription_id=self._config.subscription_id, api_version=api_version, content_type=content_type, json=_json, content=_content, headers=_headers, params=_params, ) _request.url = self._client.format_url(_request.url) _decompress = kwargs.pop("decompress", True) _stream = True pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200, 202]: try: response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) response_headers = {} if response.status_code == 202: response_headers["azure-AsyncOperation"] = self._deserialize( "str", response.headers.get("azure-AsyncOperation") ) response_headers["location"] = self._deserialize("str", response.headers.get("location")) deserialized = response.stream_download(self._client._pipeline, decompress=_decompress) if cls: return cls(pipeline_response, deserialized, response_headers) # type: ignore return deserialized # type: ignore @overload def begin_update_mongo_db_database_throughput( # pylint: disable=name-too-long self, resource_group_name: str, account_name: str, database_name: str, update_throughput_parameters: _models.ThroughputSettingsUpdateParameters, *, content_type: str = "application/json", **kwargs: Any ) -> LROPoller[_models.ThroughputSettingsGetResults]: """Update RUs per second of the an Azure Cosmos DB MongoDB database. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :param database_name: Cosmos DB database name. Required. :type database_name: str :param update_throughput_parameters: The RUs per second of the parameters to provide for the current MongoDB database. Required. :type update_throughput_parameters: ~azure.mgmt.cosmosdb.models.ThroughputSettingsUpdateParameters :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: An instance of LROPoller that returns either ThroughputSettingsGetResults or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ThroughputSettingsGetResults] :raises ~azure.core.exceptions.HttpResponseError: """ @overload def begin_update_mongo_db_database_throughput( # pylint: disable=name-too-long self, resource_group_name: str, account_name: str, database_name: str, update_throughput_parameters: IO[bytes], *, content_type: str = "application/json", **kwargs: Any ) -> LROPoller[_models.ThroughputSettingsGetResults]: """Update RUs per second of the an Azure Cosmos DB MongoDB database. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :param database_name: Cosmos DB database name. Required. :type database_name: str :param update_throughput_parameters: The RUs per second of the parameters to provide for the current MongoDB database. Required. :type update_throughput_parameters: IO[bytes] :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: An instance of LROPoller that returns either ThroughputSettingsGetResults or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ThroughputSettingsGetResults] :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace def begin_update_mongo_db_database_throughput( # pylint: disable=name-too-long self, resource_group_name: str, account_name: str, database_name: str, update_throughput_parameters: Union[_models.ThroughputSettingsUpdateParameters, IO[bytes]], **kwargs: Any ) -> LROPoller[_models.ThroughputSettingsGetResults]: """Update RUs per second of the an Azure Cosmos DB MongoDB database. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :param database_name: Cosmos DB database name. Required. :type database_name: str :param update_throughput_parameters: The RUs per second of the parameters to provide for the current MongoDB database. Is either a ThroughputSettingsUpdateParameters type or a IO[bytes] type. Required. :type update_throughput_parameters: ~azure.mgmt.cosmosdb.models.ThroughputSettingsUpdateParameters or IO[bytes] :return: An instance of LROPoller that returns either ThroughputSettingsGetResults or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ThroughputSettingsGetResults] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[_models.ThroughputSettingsGetResults] = kwargs.pop("cls", None) polling: Union[bool, PollingMethod] = kwargs.pop("polling", True) lro_delay = kwargs.pop("polling_interval", self._config.polling_interval) cont_token: Optional[str] = kwargs.pop("continuation_token", None) if cont_token is None: raw_result = self._update_mongo_db_database_throughput_initial( resource_group_name=resource_group_name, account_name=account_name, database_name=database_name, update_throughput_parameters=update_throughput_parameters, api_version=api_version, content_type=content_type, cls=lambda x, y, z: x, headers=_headers, params=_params, **kwargs ) raw_result.http_response.read() # type: ignore kwargs.pop("error_map", None) def get_long_running_output(pipeline_response): deserialized = self._deserialize("ThroughputSettingsGetResults", pipeline_response.http_response) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized if polling is True: polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs)) elif polling is False: polling_method = cast(PollingMethod, NoPolling()) else: polling_method = polling if cont_token: return LROPoller[_models.ThroughputSettingsGetResults].from_continuation_token( polling_method=polling_method, continuation_token=cont_token, client=self._client, deserialization_callback=get_long_running_output, ) return LROPoller[_models.ThroughputSettingsGetResults]( self._client, raw_result, get_long_running_output, polling_method # type: ignore )
def _migrate_mongo_db_database_to_autoscale_initial( # pylint: disable=name-too-long self, resource_group_name: str, account_name: str, database_name: str, **kwargs: Any ) -> Iterator[bytes]: error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None) _request = build_migrate_mongo_db_database_to_autoscale_request( resource_group_name=resource_group_name, account_name=account_name, database_name=database_name, subscription_id=self._config.subscription_id, api_version=api_version, headers=_headers, params=_params, ) _request.url = self._client.format_url(_request.url) _decompress = kwargs.pop("decompress", True) _stream = True pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200, 202]: try: response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) response_headers = {} if response.status_code == 202: response_headers["azure-AsyncOperation"] = self._deserialize( "str", response.headers.get("azure-AsyncOperation") ) response_headers["location"] = self._deserialize("str", response.headers.get("location")) deserialized = response.stream_download(self._client._pipeline, decompress=_decompress) if cls: return cls(pipeline_response, deserialized, response_headers) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace def begin_migrate_mongo_db_database_to_autoscale( # pylint: disable=name-too-long self, resource_group_name: str, account_name: str, database_name: str, **kwargs: Any ) -> LROPoller[_models.ThroughputSettingsGetResults]: """Migrate an Azure Cosmos DB MongoDB database from manual throughput to autoscale. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :param database_name: Cosmos DB database name. Required. :type database_name: str :return: An instance of LROPoller that returns either ThroughputSettingsGetResults or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ThroughputSettingsGetResults] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[_models.ThroughputSettingsGetResults] = kwargs.pop("cls", None) polling: Union[bool, PollingMethod] = kwargs.pop("polling", True) lro_delay = kwargs.pop("polling_interval", self._config.polling_interval) cont_token: Optional[str] = kwargs.pop("continuation_token", None) if cont_token is None: raw_result = self._migrate_mongo_db_database_to_autoscale_initial( resource_group_name=resource_group_name, account_name=account_name, database_name=database_name, api_version=api_version, cls=lambda x, y, z: x, headers=_headers, params=_params, **kwargs ) raw_result.http_response.read() # type: ignore kwargs.pop("error_map", None) def get_long_running_output(pipeline_response): deserialized = self._deserialize("ThroughputSettingsGetResults", pipeline_response.http_response) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized if polling is True: polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs)) elif polling is False: polling_method = cast(PollingMethod, NoPolling()) else: polling_method = polling if cont_token: return LROPoller[_models.ThroughputSettingsGetResults].from_continuation_token( polling_method=polling_method, continuation_token=cont_token, client=self._client, deserialization_callback=get_long_running_output, ) return LROPoller[_models.ThroughputSettingsGetResults]( self._client, raw_result, get_long_running_output, polling_method # type: ignore )
def _migrate_mongo_db_database_to_manual_throughput_initial( # pylint: disable=name-too-long self, resource_group_name: str, account_name: str, database_name: str, **kwargs: Any ) -> Iterator[bytes]: error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None) _request = build_migrate_mongo_db_database_to_manual_throughput_request( resource_group_name=resource_group_name, account_name=account_name, database_name=database_name, subscription_id=self._config.subscription_id, api_version=api_version, headers=_headers, params=_params, ) _request.url = self._client.format_url(_request.url) _decompress = kwargs.pop("decompress", True) _stream = True pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200, 202]: try: response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) response_headers = {} if response.status_code == 202: response_headers["azure-AsyncOperation"] = self._deserialize( "str", response.headers.get("azure-AsyncOperation") ) response_headers["location"] = self._deserialize("str", response.headers.get("location")) deserialized = response.stream_download(self._client._pipeline, decompress=_decompress) if cls: return cls(pipeline_response, deserialized, response_headers) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace def begin_migrate_mongo_db_database_to_manual_throughput( # pylint: disable=name-too-long self, resource_group_name: str, account_name: str, database_name: str, **kwargs: Any ) -> LROPoller[_models.ThroughputSettingsGetResults]: """Migrate an Azure Cosmos DB MongoDB database from autoscale to manual throughput. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :param database_name: Cosmos DB database name. Required. :type database_name: str :return: An instance of LROPoller that returns either ThroughputSettingsGetResults or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ThroughputSettingsGetResults] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[_models.ThroughputSettingsGetResults] = kwargs.pop("cls", None) polling: Union[bool, PollingMethod] = kwargs.pop("polling", True) lro_delay = kwargs.pop("polling_interval", self._config.polling_interval) cont_token: Optional[str] = kwargs.pop("continuation_token", None) if cont_token is None: raw_result = self._migrate_mongo_db_database_to_manual_throughput_initial( resource_group_name=resource_group_name, account_name=account_name, database_name=database_name, api_version=api_version, cls=lambda x, y, z: x, headers=_headers, params=_params, **kwargs ) raw_result.http_response.read() # type: ignore kwargs.pop("error_map", None) def get_long_running_output(pipeline_response): deserialized = self._deserialize("ThroughputSettingsGetResults", pipeline_response.http_response) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized if polling is True: polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs)) elif polling is False: polling_method = cast(PollingMethod, NoPolling()) else: polling_method = polling if cont_token: return LROPoller[_models.ThroughputSettingsGetResults].from_continuation_token( polling_method=polling_method, continuation_token=cont_token, client=self._client, deserialization_callback=get_long_running_output, ) return LROPoller[_models.ThroughputSettingsGetResults]( self._client, raw_result, get_long_running_output, polling_method # type: ignore )
[docs] @distributed_trace def list_mongo_db_collections( self, resource_group_name: str, account_name: str, database_name: str, **kwargs: Any ) -> Iterable["_models.MongoDBCollectionGetResults"]: """Lists the MongoDB collection under an existing Azure Cosmos DB database account. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :param database_name: Cosmos DB database name. Required. :type database_name: str :return: An iterator like instance of either MongoDBCollectionGetResults or the result of cls(response) :rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.cosmosdb.models.MongoDBCollectionGetResults] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[_models.MongoDBCollectionListResult] = kwargs.pop("cls", None) error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) def prepare_request(next_link=None): if not next_link: _request = build_list_mongo_db_collections_request( resource_group_name=resource_group_name, account_name=account_name, database_name=database_name, subscription_id=self._config.subscription_id, api_version=api_version, headers=_headers, params=_params, ) _request.url = self._client.format_url(_request.url) else: # make call to next link with the client's api-version _parsed_next_link = urllib.parse.urlparse(next_link) _next_request_params = case_insensitive_dict( { key: [urllib.parse.quote(v) for v in value] for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items() } ) _next_request_params["api-version"] = self._config.api_version _request = HttpRequest( "GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params ) _request.url = self._client.format_url(_request.url) _request.method = "GET" return _request def extract_data(pipeline_response): deserialized = self._deserialize("MongoDBCollectionListResult", pipeline_response) list_of_elem = deserialized.value if cls: list_of_elem = cls(list_of_elem) # type: ignore return None, iter(list_of_elem) def get_next(next_link=None): _request = prepare_request(next_link) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) return pipeline_response return ItemPaged(get_next, extract_data)
[docs] @distributed_trace def get_mongo_db_collection( self, resource_group_name: str, account_name: str, database_name: str, collection_name: str, **kwargs: Any ) -> _models.MongoDBCollectionGetResults: """Gets the MongoDB collection under an existing Azure Cosmos DB database account. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :param database_name: Cosmos DB database name. Required. :type database_name: str :param collection_name: Cosmos DB collection name. Required. :type collection_name: str :return: MongoDBCollectionGetResults or the result of cls(response) :rtype: ~azure.mgmt.cosmosdb.models.MongoDBCollectionGetResults :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[_models.MongoDBCollectionGetResults] = kwargs.pop("cls", None) _request = build_get_mongo_db_collection_request( resource_group_name=resource_group_name, account_name=account_name, database_name=database_name, collection_name=collection_name, subscription_id=self._config.subscription_id, api_version=api_version, headers=_headers, params=_params, ) _request.url = self._client.format_url(_request.url) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) deserialized = self._deserialize("MongoDBCollectionGetResults", pipeline_response.http_response) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
def _create_update_mongo_db_collection_initial( # pylint: disable=name-too-long self, resource_group_name: str, account_name: str, database_name: str, collection_name: str, create_update_mongo_db_collection_parameters: Union[_models.MongoDBCollectionCreateUpdateParameters, IO[bytes]], **kwargs: Any ) -> Iterator[bytes]: error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None) content_type = content_type or "application/json" _json = None _content = None if isinstance(create_update_mongo_db_collection_parameters, (IOBase, bytes)): _content = create_update_mongo_db_collection_parameters else: _json = self._serialize.body( create_update_mongo_db_collection_parameters, "MongoDBCollectionCreateUpdateParameters" ) _request = build_create_update_mongo_db_collection_request( resource_group_name=resource_group_name, account_name=account_name, database_name=database_name, collection_name=collection_name, subscription_id=self._config.subscription_id, api_version=api_version, content_type=content_type, json=_json, content=_content, headers=_headers, params=_params, ) _request.url = self._client.format_url(_request.url) _decompress = kwargs.pop("decompress", True) _stream = True pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200, 202]: try: response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) response_headers = {} if response.status_code == 202: response_headers["azure-AsyncOperation"] = self._deserialize( "str", response.headers.get("azure-AsyncOperation") ) response_headers["location"] = self._deserialize("str", response.headers.get("location")) deserialized = response.stream_download(self._client._pipeline, decompress=_decompress) if cls: return cls(pipeline_response, deserialized, response_headers) # type: ignore return deserialized # type: ignore @overload def begin_create_update_mongo_db_collection( self, resource_group_name: str, account_name: str, database_name: str, collection_name: str, create_update_mongo_db_collection_parameters: _models.MongoDBCollectionCreateUpdateParameters, *, content_type: str = "application/json", **kwargs: Any ) -> LROPoller[_models.MongoDBCollectionGetResults]: """Create or update an Azure Cosmos DB MongoDB Collection. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :param database_name: Cosmos DB database name. Required. :type database_name: str :param collection_name: Cosmos DB collection name. Required. :type collection_name: str :param create_update_mongo_db_collection_parameters: The parameters to provide for the current MongoDB Collection. Required. :type create_update_mongo_db_collection_parameters: ~azure.mgmt.cosmosdb.models.MongoDBCollectionCreateUpdateParameters :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: An instance of LROPoller that returns either MongoDBCollectionGetResults or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.MongoDBCollectionGetResults] :raises ~azure.core.exceptions.HttpResponseError: """ @overload def begin_create_update_mongo_db_collection( self, resource_group_name: str, account_name: str, database_name: str, collection_name: str, create_update_mongo_db_collection_parameters: IO[bytes], *, content_type: str = "application/json", **kwargs: Any ) -> LROPoller[_models.MongoDBCollectionGetResults]: """Create or update an Azure Cosmos DB MongoDB Collection. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :param database_name: Cosmos DB database name. Required. :type database_name: str :param collection_name: Cosmos DB collection name. Required. :type collection_name: str :param create_update_mongo_db_collection_parameters: The parameters to provide for the current MongoDB Collection. Required. :type create_update_mongo_db_collection_parameters: IO[bytes] :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: An instance of LROPoller that returns either MongoDBCollectionGetResults or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.MongoDBCollectionGetResults] :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace def begin_create_update_mongo_db_collection( self, resource_group_name: str, account_name: str, database_name: str, collection_name: str, create_update_mongo_db_collection_parameters: Union[_models.MongoDBCollectionCreateUpdateParameters, IO[bytes]], **kwargs: Any ) -> LROPoller[_models.MongoDBCollectionGetResults]: """Create or update an Azure Cosmos DB MongoDB Collection. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :param database_name: Cosmos DB database name. Required. :type database_name: str :param collection_name: Cosmos DB collection name. Required. :type collection_name: str :param create_update_mongo_db_collection_parameters: The parameters to provide for the current MongoDB Collection. Is either a MongoDBCollectionCreateUpdateParameters type or a IO[bytes] type. Required. :type create_update_mongo_db_collection_parameters: ~azure.mgmt.cosmosdb.models.MongoDBCollectionCreateUpdateParameters or IO[bytes] :return: An instance of LROPoller that returns either MongoDBCollectionGetResults or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.MongoDBCollectionGetResults] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[_models.MongoDBCollectionGetResults] = kwargs.pop("cls", None) polling: Union[bool, PollingMethod] = kwargs.pop("polling", True) lro_delay = kwargs.pop("polling_interval", self._config.polling_interval) cont_token: Optional[str] = kwargs.pop("continuation_token", None) if cont_token is None: raw_result = self._create_update_mongo_db_collection_initial( resource_group_name=resource_group_name, account_name=account_name, database_name=database_name, collection_name=collection_name, create_update_mongo_db_collection_parameters=create_update_mongo_db_collection_parameters, api_version=api_version, content_type=content_type, cls=lambda x, y, z: x, headers=_headers, params=_params, **kwargs ) raw_result.http_response.read() # type: ignore kwargs.pop("error_map", None) def get_long_running_output(pipeline_response): deserialized = self._deserialize("MongoDBCollectionGetResults", pipeline_response.http_response) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized if polling is True: polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs)) elif polling is False: polling_method = cast(PollingMethod, NoPolling()) else: polling_method = polling if cont_token: return LROPoller[_models.MongoDBCollectionGetResults].from_continuation_token( polling_method=polling_method, continuation_token=cont_token, client=self._client, deserialization_callback=get_long_running_output, ) return LROPoller[_models.MongoDBCollectionGetResults]( self._client, raw_result, get_long_running_output, polling_method # type: ignore )
def _delete_mongo_db_collection_initial( self, resource_group_name: str, account_name: str, database_name: str, collection_name: str, **kwargs: Any ) -> Iterator[bytes]: error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None) _request = build_delete_mongo_db_collection_request( resource_group_name=resource_group_name, account_name=account_name, database_name=database_name, collection_name=collection_name, subscription_id=self._config.subscription_id, api_version=api_version, headers=_headers, params=_params, ) _request.url = self._client.format_url(_request.url) _decompress = kwargs.pop("decompress", True) _stream = True pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [202, 204]: try: response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) response_headers = {} if response.status_code == 202: response_headers["azure-AsyncOperation"] = self._deserialize( "str", response.headers.get("azure-AsyncOperation") ) response_headers["location"] = self._deserialize("str", response.headers.get("location")) deserialized = response.stream_download(self._client._pipeline, decompress=_decompress) if cls: return cls(pipeline_response, deserialized, response_headers) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace def begin_delete_mongo_db_collection( self, resource_group_name: str, account_name: str, database_name: str, collection_name: str, **kwargs: Any ) -> LROPoller[None]: """Deletes an existing Azure Cosmos DB MongoDB Collection. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :param database_name: Cosmos DB database name. Required. :type database_name: str :param collection_name: Cosmos DB collection name. Required. :type collection_name: str :return: An instance of LROPoller that returns either None or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[None] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[None] = kwargs.pop("cls", None) polling: Union[bool, PollingMethod] = kwargs.pop("polling", True) lro_delay = kwargs.pop("polling_interval", self._config.polling_interval) cont_token: Optional[str] = kwargs.pop("continuation_token", None) if cont_token is None: raw_result = self._delete_mongo_db_collection_initial( resource_group_name=resource_group_name, account_name=account_name, database_name=database_name, collection_name=collection_name, api_version=api_version, cls=lambda x, y, z: x, headers=_headers, params=_params, **kwargs ) raw_result.http_response.read() # type: ignore kwargs.pop("error_map", None) def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements if cls: return cls(pipeline_response, None, {}) # type: ignore if polling is True: polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs)) elif polling is False: polling_method = cast(PollingMethod, NoPolling()) else: polling_method = polling if cont_token: return LROPoller[None].from_continuation_token( polling_method=polling_method, continuation_token=cont_token, client=self._client, deserialization_callback=get_long_running_output, ) return LROPoller[None](self._client, raw_result, get_long_running_output, polling_method) # type: ignore
[docs] @distributed_trace def get_mongo_db_collection_throughput( self, resource_group_name: str, account_name: str, database_name: str, collection_name: str, **kwargs: Any ) -> _models.ThroughputSettingsGetResults: """Gets the RUs per second of the MongoDB collection under an existing Azure Cosmos DB database account with the provided name. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :param database_name: Cosmos DB database name. Required. :type database_name: str :param collection_name: Cosmos DB collection name. Required. :type collection_name: str :return: ThroughputSettingsGetResults or the result of cls(response) :rtype: ~azure.mgmt.cosmosdb.models.ThroughputSettingsGetResults :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[_models.ThroughputSettingsGetResults] = kwargs.pop("cls", None) _request = build_get_mongo_db_collection_throughput_request( resource_group_name=resource_group_name, account_name=account_name, database_name=database_name, collection_name=collection_name, subscription_id=self._config.subscription_id, api_version=api_version, headers=_headers, params=_params, ) _request.url = self._client.format_url(_request.url) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) deserialized = self._deserialize("ThroughputSettingsGetResults", pipeline_response.http_response) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
def _update_mongo_db_collection_throughput_initial( # pylint: disable=name-too-long self, resource_group_name: str, account_name: str, database_name: str, collection_name: str, update_throughput_parameters: Union[_models.ThroughputSettingsUpdateParameters, IO[bytes]], **kwargs: Any ) -> Iterator[bytes]: error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None) content_type = content_type or "application/json" _json = None _content = None if isinstance(update_throughput_parameters, (IOBase, bytes)): _content = update_throughput_parameters else: _json = self._serialize.body(update_throughput_parameters, "ThroughputSettingsUpdateParameters") _request = build_update_mongo_db_collection_throughput_request( resource_group_name=resource_group_name, account_name=account_name, database_name=database_name, collection_name=collection_name, subscription_id=self._config.subscription_id, api_version=api_version, content_type=content_type, json=_json, content=_content, headers=_headers, params=_params, ) _request.url = self._client.format_url(_request.url) _decompress = kwargs.pop("decompress", True) _stream = True pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200, 202]: try: response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) response_headers = {} if response.status_code == 202: response_headers["azure-AsyncOperation"] = self._deserialize( "str", response.headers.get("azure-AsyncOperation") ) response_headers["location"] = self._deserialize("str", response.headers.get("location")) deserialized = response.stream_download(self._client._pipeline, decompress=_decompress) if cls: return cls(pipeline_response, deserialized, response_headers) # type: ignore return deserialized # type: ignore @overload def begin_update_mongo_db_collection_throughput( # pylint: disable=name-too-long self, resource_group_name: str, account_name: str, database_name: str, collection_name: str, update_throughput_parameters: _models.ThroughputSettingsUpdateParameters, *, content_type: str = "application/json", **kwargs: Any ) -> LROPoller[_models.ThroughputSettingsGetResults]: """Update the RUs per second of an Azure Cosmos DB MongoDB collection. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :param database_name: Cosmos DB database name. Required. :type database_name: str :param collection_name: Cosmos DB collection name. Required. :type collection_name: str :param update_throughput_parameters: The RUs per second of the parameters to provide for the current MongoDB collection. Required. :type update_throughput_parameters: ~azure.mgmt.cosmosdb.models.ThroughputSettingsUpdateParameters :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: An instance of LROPoller that returns either ThroughputSettingsGetResults or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ThroughputSettingsGetResults] :raises ~azure.core.exceptions.HttpResponseError: """ @overload def begin_update_mongo_db_collection_throughput( # pylint: disable=name-too-long self, resource_group_name: str, account_name: str, database_name: str, collection_name: str, update_throughput_parameters: IO[bytes], *, content_type: str = "application/json", **kwargs: Any ) -> LROPoller[_models.ThroughputSettingsGetResults]: """Update the RUs per second of an Azure Cosmos DB MongoDB collection. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :param database_name: Cosmos DB database name. Required. :type database_name: str :param collection_name: Cosmos DB collection name. Required. :type collection_name: str :param update_throughput_parameters: The RUs per second of the parameters to provide for the current MongoDB collection. Required. :type update_throughput_parameters: IO[bytes] :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: An instance of LROPoller that returns either ThroughputSettingsGetResults or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ThroughputSettingsGetResults] :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace def begin_update_mongo_db_collection_throughput( # pylint: disable=name-too-long self, resource_group_name: str, account_name: str, database_name: str, collection_name: str, update_throughput_parameters: Union[_models.ThroughputSettingsUpdateParameters, IO[bytes]], **kwargs: Any ) -> LROPoller[_models.ThroughputSettingsGetResults]: """Update the RUs per second of an Azure Cosmos DB MongoDB collection. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :param database_name: Cosmos DB database name. Required. :type database_name: str :param collection_name: Cosmos DB collection name. Required. :type collection_name: str :param update_throughput_parameters: The RUs per second of the parameters to provide for the current MongoDB collection. Is either a ThroughputSettingsUpdateParameters type or a IO[bytes] type. Required. :type update_throughput_parameters: ~azure.mgmt.cosmosdb.models.ThroughputSettingsUpdateParameters or IO[bytes] :return: An instance of LROPoller that returns either ThroughputSettingsGetResults or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ThroughputSettingsGetResults] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[_models.ThroughputSettingsGetResults] = kwargs.pop("cls", None) polling: Union[bool, PollingMethod] = kwargs.pop("polling", True) lro_delay = kwargs.pop("polling_interval", self._config.polling_interval) cont_token: Optional[str] = kwargs.pop("continuation_token", None) if cont_token is None: raw_result = self._update_mongo_db_collection_throughput_initial( resource_group_name=resource_group_name, account_name=account_name, database_name=database_name, collection_name=collection_name, update_throughput_parameters=update_throughput_parameters, api_version=api_version, content_type=content_type, cls=lambda x, y, z: x, headers=_headers, params=_params, **kwargs ) raw_result.http_response.read() # type: ignore kwargs.pop("error_map", None) def get_long_running_output(pipeline_response): deserialized = self._deserialize("ThroughputSettingsGetResults", pipeline_response.http_response) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized if polling is True: polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs)) elif polling is False: polling_method = cast(PollingMethod, NoPolling()) else: polling_method = polling if cont_token: return LROPoller[_models.ThroughputSettingsGetResults].from_continuation_token( polling_method=polling_method, continuation_token=cont_token, client=self._client, deserialization_callback=get_long_running_output, ) return LROPoller[_models.ThroughputSettingsGetResults]( self._client, raw_result, get_long_running_output, polling_method # type: ignore )
def _migrate_mongo_db_collection_to_autoscale_initial( # pylint: disable=name-too-long self, resource_group_name: str, account_name: str, database_name: str, collection_name: str, **kwargs: Any ) -> Iterator[bytes]: error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None) _request = build_migrate_mongo_db_collection_to_autoscale_request( resource_group_name=resource_group_name, account_name=account_name, database_name=database_name, collection_name=collection_name, subscription_id=self._config.subscription_id, api_version=api_version, headers=_headers, params=_params, ) _request.url = self._client.format_url(_request.url) _decompress = kwargs.pop("decompress", True) _stream = True pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200, 202]: try: response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) response_headers = {} if response.status_code == 202: response_headers["azure-AsyncOperation"] = self._deserialize( "str", response.headers.get("azure-AsyncOperation") ) response_headers["location"] = self._deserialize("str", response.headers.get("location")) deserialized = response.stream_download(self._client._pipeline, decompress=_decompress) if cls: return cls(pipeline_response, deserialized, response_headers) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace def begin_migrate_mongo_db_collection_to_autoscale( # pylint: disable=name-too-long self, resource_group_name: str, account_name: str, database_name: str, collection_name: str, **kwargs: Any ) -> LROPoller[_models.ThroughputSettingsGetResults]: """Migrate an Azure Cosmos DB MongoDB collection from manual throughput to autoscale. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :param database_name: Cosmos DB database name. Required. :type database_name: str :param collection_name: Cosmos DB collection name. Required. :type collection_name: str :return: An instance of LROPoller that returns either ThroughputSettingsGetResults or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ThroughputSettingsGetResults] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[_models.ThroughputSettingsGetResults] = kwargs.pop("cls", None) polling: Union[bool, PollingMethod] = kwargs.pop("polling", True) lro_delay = kwargs.pop("polling_interval", self._config.polling_interval) cont_token: Optional[str] = kwargs.pop("continuation_token", None) if cont_token is None: raw_result = self._migrate_mongo_db_collection_to_autoscale_initial( resource_group_name=resource_group_name, account_name=account_name, database_name=database_name, collection_name=collection_name, api_version=api_version, cls=lambda x, y, z: x, headers=_headers, params=_params, **kwargs ) raw_result.http_response.read() # type: ignore kwargs.pop("error_map", None) def get_long_running_output(pipeline_response): deserialized = self._deserialize("ThroughputSettingsGetResults", pipeline_response.http_response) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized if polling is True: polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs)) elif polling is False: polling_method = cast(PollingMethod, NoPolling()) else: polling_method = polling if cont_token: return LROPoller[_models.ThroughputSettingsGetResults].from_continuation_token( polling_method=polling_method, continuation_token=cont_token, client=self._client, deserialization_callback=get_long_running_output, ) return LROPoller[_models.ThroughputSettingsGetResults]( self._client, raw_result, get_long_running_output, polling_method # type: ignore )
def _migrate_mongo_db_collection_to_manual_throughput_initial( # pylint: disable=name-too-long self, resource_group_name: str, account_name: str, database_name: str, collection_name: str, **kwargs: Any ) -> Iterator[bytes]: error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None) _request = build_migrate_mongo_db_collection_to_manual_throughput_request( resource_group_name=resource_group_name, account_name=account_name, database_name=database_name, collection_name=collection_name, subscription_id=self._config.subscription_id, api_version=api_version, headers=_headers, params=_params, ) _request.url = self._client.format_url(_request.url) _decompress = kwargs.pop("decompress", True) _stream = True pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200, 202]: try: response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) response_headers = {} if response.status_code == 202: response_headers["azure-AsyncOperation"] = self._deserialize( "str", response.headers.get("azure-AsyncOperation") ) response_headers["location"] = self._deserialize("str", response.headers.get("location")) deserialized = response.stream_download(self._client._pipeline, decompress=_decompress) if cls: return cls(pipeline_response, deserialized, response_headers) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace def begin_migrate_mongo_db_collection_to_manual_throughput( # pylint: disable=name-too-long self, resource_group_name: str, account_name: str, database_name: str, collection_name: str, **kwargs: Any ) -> LROPoller[_models.ThroughputSettingsGetResults]: """Migrate an Azure Cosmos DB MongoDB collection from autoscale to manual throughput. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :param database_name: Cosmos DB database name. Required. :type database_name: str :param collection_name: Cosmos DB collection name. Required. :type collection_name: str :return: An instance of LROPoller that returns either ThroughputSettingsGetResults or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.ThroughputSettingsGetResults] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[_models.ThroughputSettingsGetResults] = kwargs.pop("cls", None) polling: Union[bool, PollingMethod] = kwargs.pop("polling", True) lro_delay = kwargs.pop("polling_interval", self._config.polling_interval) cont_token: Optional[str] = kwargs.pop("continuation_token", None) if cont_token is None: raw_result = self._migrate_mongo_db_collection_to_manual_throughput_initial( resource_group_name=resource_group_name, account_name=account_name, database_name=database_name, collection_name=collection_name, api_version=api_version, cls=lambda x, y, z: x, headers=_headers, params=_params, **kwargs ) raw_result.http_response.read() # type: ignore kwargs.pop("error_map", None) def get_long_running_output(pipeline_response): deserialized = self._deserialize("ThroughputSettingsGetResults", pipeline_response.http_response) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized if polling is True: polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs)) elif polling is False: polling_method = cast(PollingMethod, NoPolling()) else: polling_method = polling if cont_token: return LROPoller[_models.ThroughputSettingsGetResults].from_continuation_token( polling_method=polling_method, continuation_token=cont_token, client=self._client, deserialization_callback=get_long_running_output, ) return LROPoller[_models.ThroughputSettingsGetResults]( self._client, raw_result, get_long_running_output, polling_method # type: ignore )
[docs] @distributed_trace def get_mongo_role_definition( self, mongo_role_definition_id: str, resource_group_name: str, account_name: str, **kwargs: Any ) -> _models.MongoRoleDefinitionGetResults: """Retrieves the properties of an existing Azure Cosmos DB Mongo Role Definition with the given Id. :param mongo_role_definition_id: The ID for the Role Definition {dbName.roleName}. Required. :type mongo_role_definition_id: str :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :return: MongoRoleDefinitionGetResults or the result of cls(response) :rtype: ~azure.mgmt.cosmosdb.models.MongoRoleDefinitionGetResults :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[_models.MongoRoleDefinitionGetResults] = kwargs.pop("cls", None) _request = build_get_mongo_role_definition_request( mongo_role_definition_id=mongo_role_definition_id, resource_group_name=resource_group_name, account_name=account_name, subscription_id=self._config.subscription_id, api_version=api_version, headers=_headers, params=_params, ) _request.url = self._client.format_url(_request.url) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) deserialized = self._deserialize("MongoRoleDefinitionGetResults", pipeline_response.http_response) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
def _create_update_mongo_role_definition_initial( # pylint: disable=name-too-long self, mongo_role_definition_id: str, resource_group_name: str, account_name: str, create_update_mongo_role_definition_parameters: Union[ _models.MongoRoleDefinitionCreateUpdateParameters, IO[bytes] ], **kwargs: Any ) -> Iterator[bytes]: error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None) content_type = content_type or "application/json" _json = None _content = None if isinstance(create_update_mongo_role_definition_parameters, (IOBase, bytes)): _content = create_update_mongo_role_definition_parameters else: _json = self._serialize.body( create_update_mongo_role_definition_parameters, "MongoRoleDefinitionCreateUpdateParameters" ) _request = build_create_update_mongo_role_definition_request( mongo_role_definition_id=mongo_role_definition_id, resource_group_name=resource_group_name, account_name=account_name, subscription_id=self._config.subscription_id, api_version=api_version, content_type=content_type, json=_json, content=_content, headers=_headers, params=_params, ) _request.url = self._client.format_url(_request.url) _decompress = kwargs.pop("decompress", True) _stream = True pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200, 202]: try: response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) deserialized = response.stream_download(self._client._pipeline, decompress=_decompress) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore @overload def begin_create_update_mongo_role_definition( # pylint: disable=name-too-long self, mongo_role_definition_id: str, resource_group_name: str, account_name: str, create_update_mongo_role_definition_parameters: _models.MongoRoleDefinitionCreateUpdateParameters, *, content_type: str = "application/json", **kwargs: Any ) -> LROPoller[_models.MongoRoleDefinitionGetResults]: """Creates or updates an Azure Cosmos DB Mongo Role Definition. :param mongo_role_definition_id: The ID for the Role Definition {dbName.roleName}. Required. :type mongo_role_definition_id: str :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :param create_update_mongo_role_definition_parameters: The properties required to create or update a Role Definition. Required. :type create_update_mongo_role_definition_parameters: ~azure.mgmt.cosmosdb.models.MongoRoleDefinitionCreateUpdateParameters :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: An instance of LROPoller that returns either MongoRoleDefinitionGetResults or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.MongoRoleDefinitionGetResults] :raises ~azure.core.exceptions.HttpResponseError: """ @overload def begin_create_update_mongo_role_definition( # pylint: disable=name-too-long self, mongo_role_definition_id: str, resource_group_name: str, account_name: str, create_update_mongo_role_definition_parameters: IO[bytes], *, content_type: str = "application/json", **kwargs: Any ) -> LROPoller[_models.MongoRoleDefinitionGetResults]: """Creates or updates an Azure Cosmos DB Mongo Role Definition. :param mongo_role_definition_id: The ID for the Role Definition {dbName.roleName}. Required. :type mongo_role_definition_id: str :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :param create_update_mongo_role_definition_parameters: The properties required to create or update a Role Definition. Required. :type create_update_mongo_role_definition_parameters: IO[bytes] :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: An instance of LROPoller that returns either MongoRoleDefinitionGetResults or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.MongoRoleDefinitionGetResults] :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace def begin_create_update_mongo_role_definition( # pylint: disable=name-too-long self, mongo_role_definition_id: str, resource_group_name: str, account_name: str, create_update_mongo_role_definition_parameters: Union[ _models.MongoRoleDefinitionCreateUpdateParameters, IO[bytes] ], **kwargs: Any ) -> LROPoller[_models.MongoRoleDefinitionGetResults]: """Creates or updates an Azure Cosmos DB Mongo Role Definition. :param mongo_role_definition_id: The ID for the Role Definition {dbName.roleName}. Required. :type mongo_role_definition_id: str :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :param create_update_mongo_role_definition_parameters: The properties required to create or update a Role Definition. Is either a MongoRoleDefinitionCreateUpdateParameters type or a IO[bytes] type. Required. :type create_update_mongo_role_definition_parameters: ~azure.mgmt.cosmosdb.models.MongoRoleDefinitionCreateUpdateParameters or IO[bytes] :return: An instance of LROPoller that returns either MongoRoleDefinitionGetResults or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.MongoRoleDefinitionGetResults] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[_models.MongoRoleDefinitionGetResults] = kwargs.pop("cls", None) polling: Union[bool, PollingMethod] = kwargs.pop("polling", True) lro_delay = kwargs.pop("polling_interval", self._config.polling_interval) cont_token: Optional[str] = kwargs.pop("continuation_token", None) if cont_token is None: raw_result = self._create_update_mongo_role_definition_initial( mongo_role_definition_id=mongo_role_definition_id, resource_group_name=resource_group_name, account_name=account_name, create_update_mongo_role_definition_parameters=create_update_mongo_role_definition_parameters, api_version=api_version, content_type=content_type, cls=lambda x, y, z: x, headers=_headers, params=_params, **kwargs ) raw_result.http_response.read() # type: ignore kwargs.pop("error_map", None) def get_long_running_output(pipeline_response): deserialized = self._deserialize("MongoRoleDefinitionGetResults", pipeline_response.http_response) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized if polling is True: polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs)) elif polling is False: polling_method = cast(PollingMethod, NoPolling()) else: polling_method = polling if cont_token: return LROPoller[_models.MongoRoleDefinitionGetResults].from_continuation_token( polling_method=polling_method, continuation_token=cont_token, client=self._client, deserialization_callback=get_long_running_output, ) return LROPoller[_models.MongoRoleDefinitionGetResults]( self._client, raw_result, get_long_running_output, polling_method # type: ignore )
def _delete_mongo_role_definition_initial( self, mongo_role_definition_id: str, resource_group_name: str, account_name: str, **kwargs: Any ) -> Iterator[bytes]: error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None) _request = build_delete_mongo_role_definition_request( mongo_role_definition_id=mongo_role_definition_id, resource_group_name=resource_group_name, account_name=account_name, subscription_id=self._config.subscription_id, api_version=api_version, headers=_headers, params=_params, ) _request.url = self._client.format_url(_request.url) _decompress = kwargs.pop("decompress", True) _stream = True pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200, 202, 204]: try: response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) deserialized = response.stream_download(self._client._pipeline, decompress=_decompress) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace def begin_delete_mongo_role_definition( self, mongo_role_definition_id: str, resource_group_name: str, account_name: str, **kwargs: Any ) -> LROPoller[None]: """Deletes an existing Azure Cosmos DB Mongo Role Definition. :param mongo_role_definition_id: The ID for the Role Definition {dbName.roleName}. Required. :type mongo_role_definition_id: str :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :return: An instance of LROPoller that returns either None or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[None] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[None] = kwargs.pop("cls", None) polling: Union[bool, PollingMethod] = kwargs.pop("polling", True) lro_delay = kwargs.pop("polling_interval", self._config.polling_interval) cont_token: Optional[str] = kwargs.pop("continuation_token", None) if cont_token is None: raw_result = self._delete_mongo_role_definition_initial( mongo_role_definition_id=mongo_role_definition_id, resource_group_name=resource_group_name, account_name=account_name, api_version=api_version, cls=lambda x, y, z: x, headers=_headers, params=_params, **kwargs ) raw_result.http_response.read() # type: ignore kwargs.pop("error_map", None) def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements if cls: return cls(pipeline_response, None, {}) # type: ignore if polling is True: polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs)) elif polling is False: polling_method = cast(PollingMethod, NoPolling()) else: polling_method = polling if cont_token: return LROPoller[None].from_continuation_token( polling_method=polling_method, continuation_token=cont_token, client=self._client, deserialization_callback=get_long_running_output, ) return LROPoller[None](self._client, raw_result, get_long_running_output, polling_method) # type: ignore
[docs] @distributed_trace def list_mongo_role_definitions( self, resource_group_name: str, account_name: str, **kwargs: Any ) -> Iterable["_models.MongoRoleDefinitionGetResults"]: """Retrieves the list of all Azure Cosmos DB Mongo Role Definitions. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :return: An iterator like instance of either MongoRoleDefinitionGetResults or the result of cls(response) :rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.cosmosdb.models.MongoRoleDefinitionGetResults] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[_models.MongoRoleDefinitionListResult] = kwargs.pop("cls", None) error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) def prepare_request(next_link=None): if not next_link: _request = build_list_mongo_role_definitions_request( resource_group_name=resource_group_name, account_name=account_name, subscription_id=self._config.subscription_id, api_version=api_version, headers=_headers, params=_params, ) _request.url = self._client.format_url(_request.url) else: # make call to next link with the client's api-version _parsed_next_link = urllib.parse.urlparse(next_link) _next_request_params = case_insensitive_dict( { key: [urllib.parse.quote(v) for v in value] for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items() } ) _next_request_params["api-version"] = self._config.api_version _request = HttpRequest( "GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params ) _request.url = self._client.format_url(_request.url) _request.method = "GET" return _request def extract_data(pipeline_response): deserialized = self._deserialize("MongoRoleDefinitionListResult", pipeline_response) list_of_elem = deserialized.value if cls: list_of_elem = cls(list_of_elem) # type: ignore return None, iter(list_of_elem) def get_next(next_link=None): _request = prepare_request(next_link) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) return pipeline_response return ItemPaged(get_next, extract_data)
[docs] @distributed_trace def get_mongo_user_definition( self, mongo_user_definition_id: str, resource_group_name: str, account_name: str, **kwargs: Any ) -> _models.MongoUserDefinitionGetResults: """Retrieves the properties of an existing Azure Cosmos DB Mongo User Definition with the given Id. :param mongo_user_definition_id: The ID for the User Definition {dbName.userName}. Required. :type mongo_user_definition_id: str :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :return: MongoUserDefinitionGetResults or the result of cls(response) :rtype: ~azure.mgmt.cosmosdb.models.MongoUserDefinitionGetResults :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[_models.MongoUserDefinitionGetResults] = kwargs.pop("cls", None) _request = build_get_mongo_user_definition_request( mongo_user_definition_id=mongo_user_definition_id, resource_group_name=resource_group_name, account_name=account_name, subscription_id=self._config.subscription_id, api_version=api_version, headers=_headers, params=_params, ) _request.url = self._client.format_url(_request.url) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) deserialized = self._deserialize("MongoUserDefinitionGetResults", pipeline_response.http_response) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
def _create_update_mongo_user_definition_initial( # pylint: disable=name-too-long self, mongo_user_definition_id: str, resource_group_name: str, account_name: str, create_update_mongo_user_definition_parameters: Union[ _models.MongoUserDefinitionCreateUpdateParameters, IO[bytes] ], **kwargs: Any ) -> Iterator[bytes]: error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None) content_type = content_type or "application/json" _json = None _content = None if isinstance(create_update_mongo_user_definition_parameters, (IOBase, bytes)): _content = create_update_mongo_user_definition_parameters else: _json = self._serialize.body( create_update_mongo_user_definition_parameters, "MongoUserDefinitionCreateUpdateParameters" ) _request = build_create_update_mongo_user_definition_request( mongo_user_definition_id=mongo_user_definition_id, resource_group_name=resource_group_name, account_name=account_name, subscription_id=self._config.subscription_id, api_version=api_version, content_type=content_type, json=_json, content=_content, headers=_headers, params=_params, ) _request.url = self._client.format_url(_request.url) _decompress = kwargs.pop("decompress", True) _stream = True pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200, 202]: try: response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) deserialized = response.stream_download(self._client._pipeline, decompress=_decompress) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore @overload def begin_create_update_mongo_user_definition( # pylint: disable=name-too-long self, mongo_user_definition_id: str, resource_group_name: str, account_name: str, create_update_mongo_user_definition_parameters: _models.MongoUserDefinitionCreateUpdateParameters, *, content_type: str = "application/json", **kwargs: Any ) -> LROPoller[_models.MongoUserDefinitionGetResults]: """Creates or updates an Azure Cosmos DB Mongo User Definition. :param mongo_user_definition_id: The ID for the User Definition {dbName.userName}. Required. :type mongo_user_definition_id: str :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :param create_update_mongo_user_definition_parameters: The properties required to create or update a User Definition. Required. :type create_update_mongo_user_definition_parameters: ~azure.mgmt.cosmosdb.models.MongoUserDefinitionCreateUpdateParameters :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: An instance of LROPoller that returns either MongoUserDefinitionGetResults or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.MongoUserDefinitionGetResults] :raises ~azure.core.exceptions.HttpResponseError: """ @overload def begin_create_update_mongo_user_definition( # pylint: disable=name-too-long self, mongo_user_definition_id: str, resource_group_name: str, account_name: str, create_update_mongo_user_definition_parameters: IO[bytes], *, content_type: str = "application/json", **kwargs: Any ) -> LROPoller[_models.MongoUserDefinitionGetResults]: """Creates or updates an Azure Cosmos DB Mongo User Definition. :param mongo_user_definition_id: The ID for the User Definition {dbName.userName}. Required. :type mongo_user_definition_id: str :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :param create_update_mongo_user_definition_parameters: The properties required to create or update a User Definition. Required. :type create_update_mongo_user_definition_parameters: IO[bytes] :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: An instance of LROPoller that returns either MongoUserDefinitionGetResults or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.MongoUserDefinitionGetResults] :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace def begin_create_update_mongo_user_definition( # pylint: disable=name-too-long self, mongo_user_definition_id: str, resource_group_name: str, account_name: str, create_update_mongo_user_definition_parameters: Union[ _models.MongoUserDefinitionCreateUpdateParameters, IO[bytes] ], **kwargs: Any ) -> LROPoller[_models.MongoUserDefinitionGetResults]: """Creates or updates an Azure Cosmos DB Mongo User Definition. :param mongo_user_definition_id: The ID for the User Definition {dbName.userName}. Required. :type mongo_user_definition_id: str :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :param create_update_mongo_user_definition_parameters: The properties required to create or update a User Definition. Is either a MongoUserDefinitionCreateUpdateParameters type or a IO[bytes] type. Required. :type create_update_mongo_user_definition_parameters: ~azure.mgmt.cosmosdb.models.MongoUserDefinitionCreateUpdateParameters or IO[bytes] :return: An instance of LROPoller that returns either MongoUserDefinitionGetResults or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.MongoUserDefinitionGetResults] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[_models.MongoUserDefinitionGetResults] = kwargs.pop("cls", None) polling: Union[bool, PollingMethod] = kwargs.pop("polling", True) lro_delay = kwargs.pop("polling_interval", self._config.polling_interval) cont_token: Optional[str] = kwargs.pop("continuation_token", None) if cont_token is None: raw_result = self._create_update_mongo_user_definition_initial( mongo_user_definition_id=mongo_user_definition_id, resource_group_name=resource_group_name, account_name=account_name, create_update_mongo_user_definition_parameters=create_update_mongo_user_definition_parameters, api_version=api_version, content_type=content_type, cls=lambda x, y, z: x, headers=_headers, params=_params, **kwargs ) raw_result.http_response.read() # type: ignore kwargs.pop("error_map", None) def get_long_running_output(pipeline_response): deserialized = self._deserialize("MongoUserDefinitionGetResults", pipeline_response.http_response) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized if polling is True: polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs)) elif polling is False: polling_method = cast(PollingMethod, NoPolling()) else: polling_method = polling if cont_token: return LROPoller[_models.MongoUserDefinitionGetResults].from_continuation_token( polling_method=polling_method, continuation_token=cont_token, client=self._client, deserialization_callback=get_long_running_output, ) return LROPoller[_models.MongoUserDefinitionGetResults]( self._client, raw_result, get_long_running_output, polling_method # type: ignore )
def _delete_mongo_user_definition_initial( self, mongo_user_definition_id: str, resource_group_name: str, account_name: str, **kwargs: Any ) -> Iterator[bytes]: error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None) _request = build_delete_mongo_user_definition_request( mongo_user_definition_id=mongo_user_definition_id, resource_group_name=resource_group_name, account_name=account_name, subscription_id=self._config.subscription_id, api_version=api_version, headers=_headers, params=_params, ) _request.url = self._client.format_url(_request.url) _decompress = kwargs.pop("decompress", True) _stream = True pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200, 202, 204]: try: response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) deserialized = response.stream_download(self._client._pipeline, decompress=_decompress) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace def begin_delete_mongo_user_definition( self, mongo_user_definition_id: str, resource_group_name: str, account_name: str, **kwargs: Any ) -> LROPoller[None]: """Deletes an existing Azure Cosmos DB Mongo User Definition. :param mongo_user_definition_id: The ID for the User Definition {dbName.userName}. Required. :type mongo_user_definition_id: str :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :return: An instance of LROPoller that returns either None or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[None] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[None] = kwargs.pop("cls", None) polling: Union[bool, PollingMethod] = kwargs.pop("polling", True) lro_delay = kwargs.pop("polling_interval", self._config.polling_interval) cont_token: Optional[str] = kwargs.pop("continuation_token", None) if cont_token is None: raw_result = self._delete_mongo_user_definition_initial( mongo_user_definition_id=mongo_user_definition_id, resource_group_name=resource_group_name, account_name=account_name, api_version=api_version, cls=lambda x, y, z: x, headers=_headers, params=_params, **kwargs ) raw_result.http_response.read() # type: ignore kwargs.pop("error_map", None) def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements if cls: return cls(pipeline_response, None, {}) # type: ignore if polling is True: polling_method: PollingMethod = cast(PollingMethod, ARMPolling(lro_delay, **kwargs)) elif polling is False: polling_method = cast(PollingMethod, NoPolling()) else: polling_method = polling if cont_token: return LROPoller[None].from_continuation_token( polling_method=polling_method, continuation_token=cont_token, client=self._client, deserialization_callback=get_long_running_output, ) return LROPoller[None](self._client, raw_result, get_long_running_output, polling_method) # type: ignore
[docs] @distributed_trace def list_mongo_user_definitions( self, resource_group_name: str, account_name: str, **kwargs: Any ) -> Iterable["_models.MongoUserDefinitionGetResults"]: """Retrieves the list of all Azure Cosmos DB Mongo User Definition. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :return: An iterator like instance of either MongoUserDefinitionGetResults or the result of cls(response) :rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.cosmosdb.models.MongoUserDefinitionGetResults] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[_models.MongoUserDefinitionListResult] = kwargs.pop("cls", None) error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) def prepare_request(next_link=None): if not next_link: _request = build_list_mongo_user_definitions_request( resource_group_name=resource_group_name, account_name=account_name, subscription_id=self._config.subscription_id, api_version=api_version, headers=_headers, params=_params, ) _request.url = self._client.format_url(_request.url) else: # make call to next link with the client's api-version _parsed_next_link = urllib.parse.urlparse(next_link) _next_request_params = case_insensitive_dict( { key: [urllib.parse.quote(v) for v in value] for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items() } ) _next_request_params["api-version"] = self._config.api_version _request = HttpRequest( "GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params ) _request.url = self._client.format_url(_request.url) _request.method = "GET" return _request def extract_data(pipeline_response): deserialized = self._deserialize("MongoUserDefinitionListResult", pipeline_response) list_of_elem = deserialized.value if cls: list_of_elem = cls(list_of_elem) # type: ignore return None, iter(list_of_elem) def get_next(next_link=None): _request = prepare_request(next_link) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) return pipeline_response return ItemPaged(get_next, extract_data)
def _retrieve_continuous_backup_information_initial( # pylint: disable=name-too-long self, resource_group_name: str, account_name: str, database_name: str, collection_name: str, location: Union[_models.ContinuousBackupRestoreLocation, IO[bytes]], **kwargs: Any ) -> Iterator[bytes]: error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None) content_type = content_type or "application/json" _json = None _content = None if isinstance(location, (IOBase, bytes)): _content = location else: _json = self._serialize.body(location, "ContinuousBackupRestoreLocation") _request = build_retrieve_continuous_backup_information_request( resource_group_name=resource_group_name, account_name=account_name, database_name=database_name, collection_name=collection_name, subscription_id=self._config.subscription_id, api_version=api_version, content_type=content_type, json=_json, content=_content, headers=_headers, params=_params, ) _request.url = self._client.format_url(_request.url) _decompress = kwargs.pop("decompress", True) _stream = True pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200, 202]: try: response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response, error_format=ARMErrorFormat) deserialized = response.stream_download(self._client._pipeline, decompress=_decompress) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore @overload def begin_retrieve_continuous_backup_information( # pylint: disable=name-too-long self, resource_group_name: str, account_name: str, database_name: str, collection_name: str, location: _models.ContinuousBackupRestoreLocation, *, content_type: str = "application/json", **kwargs: Any ) -> LROPoller[_models.BackupInformation]: """Retrieves continuous backup information for a Mongodb collection. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :param database_name: Cosmos DB database name. Required. :type database_name: str :param collection_name: Cosmos DB collection name. Required. :type collection_name: str :param location: The name of the continuous backup restore location. Required. :type location: ~azure.mgmt.cosmosdb.models.ContinuousBackupRestoreLocation :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: An instance of LROPoller that returns either BackupInformation or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.BackupInformation] :raises ~azure.core.exceptions.HttpResponseError: """ @overload def begin_retrieve_continuous_backup_information( # pylint: disable=name-too-long self, resource_group_name: str, account_name: str, database_name: str, collection_name: str, location: IO[bytes], *, content_type: str = "application/json", **kwargs: Any ) -> LROPoller[_models.BackupInformation]: """Retrieves continuous backup information for a Mongodb collection. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :param database_name: Cosmos DB database name. Required. :type database_name: str :param collection_name: Cosmos DB collection name. Required. :type collection_name: str :param location: The name of the continuous backup restore location. Required. :type location: IO[bytes] :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: An instance of LROPoller that returns either BackupInformation or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.BackupInformation] :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace def begin_retrieve_continuous_backup_information( # pylint: disable=name-too-long self, resource_group_name: str, account_name: str, database_name: str, collection_name: str, location: Union[_models.ContinuousBackupRestoreLocation, IO[bytes]], **kwargs: Any ) -> LROPoller[_models.BackupInformation]: """Retrieves continuous backup information for a Mongodb collection. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param account_name: Cosmos DB database account name. Required. :type account_name: str :param database_name: Cosmos DB database name. Required. :type database_name: str :param collection_name: Cosmos DB collection name. Required. :type collection_name: str :param location: The name of the continuous backup restore location. Is either a ContinuousBackupRestoreLocation type or a IO[bytes] type. Required. :type location: ~azure.mgmt.cosmosdb.models.ContinuousBackupRestoreLocation or IO[bytes] :return: An instance of LROPoller that returns either BackupInformation or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.cosmosdb.models.BackupInformation] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[_models.BackupInformation] = kwargs.pop("cls", None) polling: Union[bool, PollingMethod] = kwargs.pop("polling", True) lro_delay = kwargs.pop("polling_interval", self._config.polling_interval) cont_token: Optional[str] = kwargs.pop("continuation_token", None) if cont_token is None: raw_result = self._retrieve_continuous_backup_information_initial( resource_group_name=resource_group_name, account_name=account_name, database_name=database_name, collection_name=collection_name, location=location, api_version=api_version, content_type=content_type, cls=lambda x, y, z: x, headers=_headers, params=_params, **kwargs ) raw_result.http_response.read() # type: ignore kwargs.pop("error_map", None) def get_long_running_output(pipeline_response): deserialized = self._deserialize("BackupInformation", pipeline_response.http_response) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized if polling is True: polling_method: PollingMethod = cast( PollingMethod, ARMPolling(lro_delay, lro_options={"final-state-via": "location"}, **kwargs) ) elif polling is False: polling_method = cast(PollingMethod, NoPolling()) else: polling_method = polling if cont_token: return LROPoller[_models.BackupInformation].from_continuation_token( polling_method=polling_method, continuation_token=cont_token, client=self._client, deserialization_callback=get_long_running_output, ) return LROPoller[_models.BackupInformation]( self._client, raw_result, get_long_running_output, polling_method # type: ignore )