Source code for azure.mgmt.notificationhubs.operations._namespaces_operations

# pylint: disable=too-many-lines,too-many-statements
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
from io import IOBase
from typing import Any, Callable, Dict, IO, Iterable, Optional, TypeVar, Union, cast, overload
import urllib.parse

from azure.core.exceptions import (
    ClientAuthenticationError,
    HttpResponseError,
    ResourceExistsError,
    ResourceNotFoundError,
    ResourceNotModifiedError,
    map_error,
)
from azure.core.paging import ItemPaged
from azure.core.pipeline import PipelineResponse
from azure.core.pipeline.transport import HttpResponse
from azure.core.polling import LROPoller, NoPolling, PollingMethod
from azure.core.rest import HttpRequest
from azure.core.tracing.decorator import distributed_trace
from azure.core.utils import case_insensitive_dict
from azure.mgmt.core.exceptions import ARMErrorFormat
from azure.mgmt.core.polling.arm_polling import ARMPolling

from .. import models as _models
from .._serialization import Serializer
from .._vendor import _convert_request

T = TypeVar("T")
ClsType = Optional[Callable[[PipelineResponse[HttpRequest, HttpResponse], T, Dict[str, Any]], Any]]

_SERIALIZER = Serializer()
_SERIALIZER.client_side_validation = False


def build_check_availability_request(subscription_id: str, **kwargs: Any) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
    content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/providers/Microsoft.NotificationHubs/checkNamespaceAvailability",
    )  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    if content_type is not None:
        _headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)


def build_get_request(
    resource_group_name: str, namespace_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.NotificationHubs/namespaces/{namespaceName}",
    )  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "namespaceName": _SERIALIZER.url(
            "namespace_name", namespace_name, "str", max_length=50, min_length=1, pattern=r"^[a-zA-Z][a-zA-Z0-9-]*$"
        ),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)


def build_create_or_update_request(
    resource_group_name: str, namespace_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
    content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.NotificationHubs/namespaces/{namespaceName}",
    )  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "namespaceName": _SERIALIZER.url(
            "namespace_name", namespace_name, "str", max_length=50, min_length=1, pattern=r"^[a-zA-Z][a-zA-Z0-9-]*$"
        ),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    if content_type is not None:
        _headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="PUT", url=_url, params=_params, headers=_headers, **kwargs)


def build_update_request(
    resource_group_name: str, namespace_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
    content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.NotificationHubs/namespaces/{namespaceName}",
    )  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "namespaceName": _SERIALIZER.url(
            "namespace_name", namespace_name, "str", max_length=50, min_length=1, pattern=r"^[a-zA-Z][a-zA-Z0-9-]*$"
        ),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    if content_type is not None:
        _headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="PATCH", url=_url, params=_params, headers=_headers, **kwargs)


def build_delete_request(
    resource_group_name: str, namespace_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.NotificationHubs/namespaces/{namespaceName}",
    )  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "namespaceName": _SERIALIZER.url(
            "namespace_name", namespace_name, "str", max_length=50, min_length=1, pattern=r"^[a-zA-Z][a-zA-Z0-9-]*$"
        ),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="DELETE", url=_url, params=_params, headers=_headers, **kwargs)


def build_list_all_request(
    subscription_id: str, *, skip_token: Optional[str] = None, top: int = 100, **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop("template_url", "/subscriptions/{subscriptionId}/providers/Microsoft.NotificationHubs/namespaces")
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    if skip_token is not None:
        _params["$skipToken"] = _SERIALIZER.query("skip_token", skip_token, "str")
    if top is not None:
        _params["$top"] = _SERIALIZER.query("top", top, "int")
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)


def build_list_request(
    resource_group_name: str, subscription_id: str, *, skip_token: Optional[str] = None, top: int = 100, **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.NotificationHubs/namespaces",
    )  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    if skip_token is not None:
        _params["$skipToken"] = _SERIALIZER.query("skip_token", skip_token, "str")
    if top is not None:
        _params["$top"] = _SERIALIZER.query("top", top, "int")
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)


def build_create_or_update_authorization_rule_request(  # pylint: disable=name-too-long
    resource_group_name: str, namespace_name: str, authorization_rule_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
    content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.NotificationHubs/namespaces/{namespaceName}/authorizationRules/{authorizationRuleName}",
    )  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "namespaceName": _SERIALIZER.url(
            "namespace_name", namespace_name, "str", max_length=50, min_length=1, pattern=r"^[a-zA-Z][a-zA-Z0-9-]*$"
        ),
        "authorizationRuleName": _SERIALIZER.url(
            "authorization_rule_name",
            authorization_rule_name,
            "str",
            max_length=256,
            min_length=1,
            pattern=r"^[a-zA-Z0-9!()*-._]+$",
        ),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    if content_type is not None:
        _headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="PUT", url=_url, params=_params, headers=_headers, **kwargs)


def build_delete_authorization_rule_request(
    resource_group_name: str, namespace_name: str, authorization_rule_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.NotificationHubs/namespaces/{namespaceName}/authorizationRules/{authorizationRuleName}",
    )  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "namespaceName": _SERIALIZER.url(
            "namespace_name", namespace_name, "str", max_length=50, min_length=1, pattern=r"^[a-zA-Z][a-zA-Z0-9-]*$"
        ),
        "authorizationRuleName": _SERIALIZER.url(
            "authorization_rule_name",
            authorization_rule_name,
            "str",
            max_length=256,
            min_length=1,
            pattern=r"^[a-zA-Z0-9!()*-._]+$",
        ),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="DELETE", url=_url, params=_params, headers=_headers, **kwargs)


def build_get_authorization_rule_request(
    resource_group_name: str, namespace_name: str, authorization_rule_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.NotificationHubs/namespaces/{namespaceName}/authorizationRules/{authorizationRuleName}",
    )  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "namespaceName": _SERIALIZER.url(
            "namespace_name", namespace_name, "str", max_length=50, min_length=1, pattern=r"^[a-zA-Z][a-zA-Z0-9-]*$"
        ),
        "authorizationRuleName": _SERIALIZER.url(
            "authorization_rule_name",
            authorization_rule_name,
            "str",
            max_length=256,
            min_length=1,
            pattern=r"^[a-zA-Z0-9!()*-._]+$",
        ),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)


def build_list_authorization_rules_request(
    resource_group_name: str, namespace_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.NotificationHubs/namespaces/{namespaceName}/authorizationRules",
    )  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "namespaceName": _SERIALIZER.url(
            "namespace_name", namespace_name, "str", max_length=50, min_length=1, pattern=r"^[a-zA-Z][a-zA-Z0-9-]*$"
        ),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)


def build_list_keys_request(
    resource_group_name: str, namespace_name: str, authorization_rule_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.NotificationHubs/namespaces/{namespaceName}/authorizationRules/{authorizationRuleName}/listKeys",
    )  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "namespaceName": _SERIALIZER.url(
            "namespace_name", namespace_name, "str", max_length=50, min_length=1, pattern=r"^[a-zA-Z][a-zA-Z0-9-]*$"
        ),
        "authorizationRuleName": _SERIALIZER.url(
            "authorization_rule_name",
            authorization_rule_name,
            "str",
            max_length=256,
            min_length=1,
            pattern=r"^[a-zA-Z0-9!()*-._]+$",
        ),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)


def build_regenerate_keys_request(
    resource_group_name: str, namespace_name: str, authorization_rule_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
    content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.NotificationHubs/namespaces/{namespaceName}/authorizationRules/{authorizationRuleName}/regenerateKeys",
    )  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "namespaceName": _SERIALIZER.url(
            "namespace_name", namespace_name, "str", max_length=50, min_length=1, pattern=r"^[a-zA-Z][a-zA-Z0-9-]*$"
        ),
        "authorizationRuleName": _SERIALIZER.url(
            "authorization_rule_name",
            authorization_rule_name,
            "str",
            max_length=256,
            min_length=1,
            pattern=r"^[a-zA-Z0-9!()*-._]+$",
        ),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    if content_type is not None:
        _headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)


def build_get_pns_credentials_request(
    resource_group_name: str, namespace_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = kwargs.pop(
        "template_url",
        "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.NotificationHubs/namespaces/{namespaceName}/pnsCredentials",
    )  # pylint: disable=line-too-long
    path_format_arguments = {
        "subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
        "resourceGroupName": _SERIALIZER.url(
            "resource_group_name", resource_group_name, "str", max_length=90, min_length=1
        ),
        "namespaceName": _SERIALIZER.url(
            "namespace_name", namespace_name, "str", max_length=50, min_length=1, pattern=r"^[a-zA-Z][a-zA-Z0-9-]*$"
        ),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)


[docs]class NamespacesOperations: """ .. warning:: **DO NOT** instantiate this class directly. Instead, you should access the following operations through :class:`~azure.mgmt.notificationhubs.NotificationHubsManagementClient`'s :attr:`namespaces` attribute. """ models = _models def __init__(self, *args, **kwargs): input_args = list(args) self._client = input_args.pop(0) if input_args else kwargs.pop("client") self._config = input_args.pop(0) if input_args else kwargs.pop("config") self._serialize = input_args.pop(0) if input_args else kwargs.pop("serializer") self._deserialize = input_args.pop(0) if input_args else kwargs.pop("deserializer") @overload def check_availability( self, parameters: _models.CheckAvailabilityParameters, *, content_type: str = "application/json", **kwargs: Any ) -> _models.CheckAvailabilityResult: """Checks the availability of the given service namespace across all Azure subscriptions. This is useful because the domain name is created based on the service namespace name. Checks the availability of the given service namespace across all Azure subscriptions. This is useful because the domain name is created based on the service namespace name. :param parameters: Request content. Required. :type parameters: ~azure.mgmt.notificationhubs.models.CheckAvailabilityParameters :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: CheckAvailabilityResult or the result of cls(response) :rtype: ~azure.mgmt.notificationhubs.models.CheckAvailabilityResult :raises ~azure.core.exceptions.HttpResponseError: """ @overload def check_availability( self, parameters: IO[bytes], *, content_type: str = "application/json", **kwargs: Any ) -> _models.CheckAvailabilityResult: """Checks the availability of the given service namespace across all Azure subscriptions. This is useful because the domain name is created based on the service namespace name. Checks the availability of the given service namespace across all Azure subscriptions. This is useful because the domain name is created based on the service namespace name. :param parameters: Request content. Required. :type parameters: IO[bytes] :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: CheckAvailabilityResult or the result of cls(response) :rtype: ~azure.mgmt.notificationhubs.models.CheckAvailabilityResult :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace def check_availability( self, parameters: Union[_models.CheckAvailabilityParameters, IO[bytes]], **kwargs: Any ) -> _models.CheckAvailabilityResult: """Checks the availability of the given service namespace across all Azure subscriptions. This is useful because the domain name is created based on the service namespace name. Checks the availability of the given service namespace across all Azure subscriptions. This is useful because the domain name is created based on the service namespace name. :param parameters: Request content. Is either a CheckAvailabilityParameters type or a IO[bytes] type. Required. :type parameters: ~azure.mgmt.notificationhubs.models.CheckAvailabilityParameters or IO[bytes] :return: CheckAvailabilityResult or the result of cls(response) :rtype: ~azure.mgmt.notificationhubs.models.CheckAvailabilityResult :raises ~azure.core.exceptions.HttpResponseError: """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[_models.CheckAvailabilityResult] = kwargs.pop("cls", None) content_type = content_type or "application/json" _json = None _content = None if isinstance(parameters, (IOBase, bytes)): _content = parameters else: _json = self._serialize.body(parameters, "CheckAvailabilityParameters") _request = build_check_availability_request( subscription_id=self._config.subscription_id, api_version=api_version, content_type=content_type, json=_json, content=_content, headers=_headers, params=_params, ) _request = _convert_request(_request) _request.url = self._client.format_url(_request.url) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) deserialized = self._deserialize("CheckAvailabilityResult", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace def get(self, resource_group_name: str, namespace_name: str, **kwargs: Any) -> _models.NamespaceResource: """Returns the given namespace. Returns the given namespace. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param namespace_name: Namespace name. Required. :type namespace_name: str :return: NamespaceResource or the result of cls(response) :rtype: ~azure.mgmt.notificationhubs.models.NamespaceResource :raises ~azure.core.exceptions.HttpResponseError: """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[_models.NamespaceResource] = kwargs.pop("cls", None) _request = build_get_request( resource_group_name=resource_group_name, namespace_name=namespace_name, subscription_id=self._config.subscription_id, api_version=api_version, headers=_headers, params=_params, ) _request = _convert_request(_request) _request.url = self._client.format_url(_request.url) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) deserialized = self._deserialize("NamespaceResource", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
def _create_or_update_initial( self, resource_group_name: str, namespace_name: str, parameters: Union[_models.NamespaceResource, IO[bytes]], **kwargs: Any ) -> _models.NamespaceResource: error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[_models.NamespaceResource] = kwargs.pop("cls", None) content_type = content_type or "application/json" _json = None _content = None if isinstance(parameters, (IOBase, bytes)): _content = parameters else: _json = self._serialize.body(parameters, "NamespaceResource") _request = build_create_or_update_request( resource_group_name=resource_group_name, namespace_name=namespace_name, subscription_id=self._config.subscription_id, api_version=api_version, content_type=content_type, json=_json, content=_content, headers=_headers, params=_params, ) _request = _convert_request(_request) _request.url = self._client.format_url(_request.url) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200, 201]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) if response.status_code == 200: deserialized = self._deserialize("NamespaceResource", pipeline_response) if response.status_code == 201: deserialized = self._deserialize("NamespaceResource", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore @overload def begin_create_or_update( self, resource_group_name: str, namespace_name: str, parameters: _models.NamespaceResource, *, content_type: str = "application/json", **kwargs: Any ) -> LROPoller[_models.NamespaceResource]: """Creates / Updates a Notification Hub namespace. This operation is idempotent. Creates / Updates a Notification Hub namespace. This operation is idempotent. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param namespace_name: Namespace name. Required. :type namespace_name: str :param parameters: Request content. Required. :type parameters: ~azure.mgmt.notificationhubs.models.NamespaceResource :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: An instance of LROPoller that returns either NamespaceResource or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.notificationhubs.models.NamespaceResource] :raises ~azure.core.exceptions.HttpResponseError: """ @overload def begin_create_or_update( self, resource_group_name: str, namespace_name: str, parameters: IO[bytes], *, content_type: str = "application/json", **kwargs: Any ) -> LROPoller[_models.NamespaceResource]: """Creates / Updates a Notification Hub namespace. This operation is idempotent. Creates / Updates a Notification Hub namespace. This operation is idempotent. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param namespace_name: Namespace name. Required. :type namespace_name: str :param parameters: Request content. Required. :type parameters: IO[bytes] :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: An instance of LROPoller that returns either NamespaceResource or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.notificationhubs.models.NamespaceResource] :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace def begin_create_or_update( self, resource_group_name: str, namespace_name: str, parameters: Union[_models.NamespaceResource, IO[bytes]], **kwargs: Any ) -> LROPoller[_models.NamespaceResource]: """Creates / Updates a Notification Hub namespace. This operation is idempotent. Creates / Updates a Notification Hub namespace. This operation is idempotent. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param namespace_name: Namespace name. Required. :type namespace_name: str :param parameters: Request content. Is either a NamespaceResource type or a IO[bytes] type. Required. :type parameters: ~azure.mgmt.notificationhubs.models.NamespaceResource or IO[bytes] :return: An instance of LROPoller that returns either NamespaceResource or the result of cls(response) :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.notificationhubs.models.NamespaceResource] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[_models.NamespaceResource] = kwargs.pop("cls", None) polling: Union[bool, PollingMethod] = kwargs.pop("polling", True) lro_delay = kwargs.pop("polling_interval", self._config.polling_interval) cont_token: Optional[str] = kwargs.pop("continuation_token", None) if cont_token is None: raw_result = self._create_or_update_initial( resource_group_name=resource_group_name, namespace_name=namespace_name, parameters=parameters, api_version=api_version, content_type=content_type, cls=lambda x, y, z: x, headers=_headers, params=_params, **kwargs ) kwargs.pop("error_map", None) def get_long_running_output(pipeline_response): deserialized = self._deserialize("NamespaceResource", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized if polling is True: polling_method: PollingMethod = cast( PollingMethod, ARMPolling(lro_delay, lro_options={"final-state-via": "azure-async-operation"}, **kwargs) ) elif polling is False: polling_method = cast(PollingMethod, NoPolling()) else: polling_method = polling if cont_token: return LROPoller[_models.NamespaceResource].from_continuation_token( polling_method=polling_method, continuation_token=cont_token, client=self._client, deserialization_callback=get_long_running_output, ) return LROPoller[_models.NamespaceResource]( self._client, raw_result, get_long_running_output, polling_method # type: ignore )
@overload def update( self, resource_group_name: str, namespace_name: str, parameters: _models.NamespacePatchParameters, *, content_type: str = "application/json", **kwargs: Any ) -> _models.NamespaceResource: """Patches the existing namespace. Patches the existing namespace. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param namespace_name: Namespace name. Required. :type namespace_name: str :param parameters: Request content. Required. :type parameters: ~azure.mgmt.notificationhubs.models.NamespacePatchParameters :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: NamespaceResource or the result of cls(response) :rtype: ~azure.mgmt.notificationhubs.models.NamespaceResource :raises ~azure.core.exceptions.HttpResponseError: """ @overload def update( self, resource_group_name: str, namespace_name: str, parameters: IO[bytes], *, content_type: str = "application/json", **kwargs: Any ) -> _models.NamespaceResource: """Patches the existing namespace. Patches the existing namespace. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param namespace_name: Namespace name. Required. :type namespace_name: str :param parameters: Request content. Required. :type parameters: IO[bytes] :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: NamespaceResource or the result of cls(response) :rtype: ~azure.mgmt.notificationhubs.models.NamespaceResource :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace def update( self, resource_group_name: str, namespace_name: str, parameters: Union[_models.NamespacePatchParameters, IO[bytes]], **kwargs: Any ) -> _models.NamespaceResource: """Patches the existing namespace. Patches the existing namespace. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param namespace_name: Namespace name. Required. :type namespace_name: str :param parameters: Request content. Is either a NamespacePatchParameters type or a IO[bytes] type. Required. :type parameters: ~azure.mgmt.notificationhubs.models.NamespacePatchParameters or IO[bytes] :return: NamespaceResource or the result of cls(response) :rtype: ~azure.mgmt.notificationhubs.models.NamespaceResource :raises ~azure.core.exceptions.HttpResponseError: """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[_models.NamespaceResource] = kwargs.pop("cls", None) content_type = content_type or "application/json" _json = None _content = None if isinstance(parameters, (IOBase, bytes)): _content = parameters else: _json = self._serialize.body(parameters, "NamespacePatchParameters") _request = build_update_request( resource_group_name=resource_group_name, namespace_name=namespace_name, subscription_id=self._config.subscription_id, api_version=api_version, content_type=content_type, json=_json, content=_content, headers=_headers, params=_params, ) _request = _convert_request(_request) _request.url = self._client.format_url(_request.url) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) deserialized = self._deserialize("NamespaceResource", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace def delete( # pylint: disable=inconsistent-return-statements self, resource_group_name: str, namespace_name: str, **kwargs: Any ) -> None: """Deletes an existing namespace. This operation also removes all associated notificationHubs under the namespace. Deletes an existing namespace. This operation also removes all associated notificationHubs under the namespace. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param namespace_name: Namespace name. Required. :type namespace_name: str :return: None or the result of cls(response) :rtype: None :raises ~azure.core.exceptions.HttpResponseError: """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[None] = kwargs.pop("cls", None) _request = build_delete_request( resource_group_name=resource_group_name, namespace_name=namespace_name, subscription_id=self._config.subscription_id, api_version=api_version, headers=_headers, params=_params, ) _request = _convert_request(_request) _request.url = self._client.format_url(_request.url) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200, 204]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) if cls: return cls(pipeline_response, None, {}) # type: ignore
[docs] @distributed_trace def list_all( self, skip_token: Optional[str] = None, top: int = 100, **kwargs: Any ) -> Iterable["_models.NamespaceResource"]: """Lists all the available namespaces within the subscription. Lists all the available namespaces within the subscription. :param skip_token: Skip token for subsequent requests. Default value is None. :type skip_token: str :param top: Maximum number of results to return. Default value is 100. :type top: int :return: An iterator like instance of either NamespaceResource or the result of cls(response) :rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.notificationhubs.models.NamespaceResource] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[_models.NamespaceListResult] = kwargs.pop("cls", None) error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) def prepare_request(next_link=None): if not next_link: _request = build_list_all_request( subscription_id=self._config.subscription_id, skip_token=skip_token, top=top, api_version=api_version, headers=_headers, params=_params, ) _request = _convert_request(_request) _request.url = self._client.format_url(_request.url) else: # make call to next link with the client's api-version _parsed_next_link = urllib.parse.urlparse(next_link) _next_request_params = case_insensitive_dict( { key: [urllib.parse.quote(v) for v in value] for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items() } ) _next_request_params["api-version"] = self._config.api_version _request = HttpRequest( "GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params ) _request = _convert_request(_request) _request.url = self._client.format_url(_request.url) _request.method = "GET" return _request def extract_data(pipeline_response): deserialized = self._deserialize("NamespaceListResult", pipeline_response) list_of_elem = deserialized.value if cls: list_of_elem = cls(list_of_elem) # type: ignore return deserialized.next_link or None, iter(list_of_elem) def get_next(next_link=None): _request = prepare_request(next_link) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) return pipeline_response return ItemPaged(get_next, extract_data)
[docs] @distributed_trace def list( self, resource_group_name: str, skip_token: Optional[str] = None, top: int = 100, **kwargs: Any ) -> Iterable["_models.NamespaceResource"]: """Lists the available namespaces within a resource group. Lists the available namespaces within a resource group. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param skip_token: Skip token for subsequent requests. Default value is None. :type skip_token: str :param top: Maximum number of results to return. Default value is 100. :type top: int :return: An iterator like instance of either NamespaceResource or the result of cls(response) :rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.notificationhubs.models.NamespaceResource] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[_models.NamespaceListResult] = kwargs.pop("cls", None) error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) def prepare_request(next_link=None): if not next_link: _request = build_list_request( resource_group_name=resource_group_name, subscription_id=self._config.subscription_id, skip_token=skip_token, top=top, api_version=api_version, headers=_headers, params=_params, ) _request = _convert_request(_request) _request.url = self._client.format_url(_request.url) else: # make call to next link with the client's api-version _parsed_next_link = urllib.parse.urlparse(next_link) _next_request_params = case_insensitive_dict( { key: [urllib.parse.quote(v) for v in value] for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items() } ) _next_request_params["api-version"] = self._config.api_version _request = HttpRequest( "GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params ) _request = _convert_request(_request) _request.url = self._client.format_url(_request.url) _request.method = "GET" return _request def extract_data(pipeline_response): deserialized = self._deserialize("NamespaceListResult", pipeline_response) list_of_elem = deserialized.value if cls: list_of_elem = cls(list_of_elem) # type: ignore return deserialized.next_link or None, iter(list_of_elem) def get_next(next_link=None): _request = prepare_request(next_link) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) return pipeline_response return ItemPaged(get_next, extract_data)
@overload def create_or_update_authorization_rule( self, resource_group_name: str, namespace_name: str, authorization_rule_name: str, parameters: _models.SharedAccessAuthorizationRuleResource, *, content_type: str = "application/json", **kwargs: Any ) -> _models.SharedAccessAuthorizationRuleResource: """Creates an authorization rule for a namespace. Creates an authorization rule for a namespace. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param namespace_name: Namespace name. Required. :type namespace_name: str :param authorization_rule_name: Authorization Rule Name. Required. :type authorization_rule_name: str :param parameters: Request content. Required. :type parameters: ~azure.mgmt.notificationhubs.models.SharedAccessAuthorizationRuleResource :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: SharedAccessAuthorizationRuleResource or the result of cls(response) :rtype: ~azure.mgmt.notificationhubs.models.SharedAccessAuthorizationRuleResource :raises ~azure.core.exceptions.HttpResponseError: """ @overload def create_or_update_authorization_rule( self, resource_group_name: str, namespace_name: str, authorization_rule_name: str, parameters: IO[bytes], *, content_type: str = "application/json", **kwargs: Any ) -> _models.SharedAccessAuthorizationRuleResource: """Creates an authorization rule for a namespace. Creates an authorization rule for a namespace. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param namespace_name: Namespace name. Required. :type namespace_name: str :param authorization_rule_name: Authorization Rule Name. Required. :type authorization_rule_name: str :param parameters: Request content. Required. :type parameters: IO[bytes] :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: SharedAccessAuthorizationRuleResource or the result of cls(response) :rtype: ~azure.mgmt.notificationhubs.models.SharedAccessAuthorizationRuleResource :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace def create_or_update_authorization_rule( self, resource_group_name: str, namespace_name: str, authorization_rule_name: str, parameters: Union[_models.SharedAccessAuthorizationRuleResource, IO[bytes]], **kwargs: Any ) -> _models.SharedAccessAuthorizationRuleResource: """Creates an authorization rule for a namespace. Creates an authorization rule for a namespace. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param namespace_name: Namespace name. Required. :type namespace_name: str :param authorization_rule_name: Authorization Rule Name. Required. :type authorization_rule_name: str :param parameters: Request content. Is either a SharedAccessAuthorizationRuleResource type or a IO[bytes] type. Required. :type parameters: ~azure.mgmt.notificationhubs.models.SharedAccessAuthorizationRuleResource or IO[bytes] :return: SharedAccessAuthorizationRuleResource or the result of cls(response) :rtype: ~azure.mgmt.notificationhubs.models.SharedAccessAuthorizationRuleResource :raises ~azure.core.exceptions.HttpResponseError: """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[_models.SharedAccessAuthorizationRuleResource] = kwargs.pop("cls", None) content_type = content_type or "application/json" _json = None _content = None if isinstance(parameters, (IOBase, bytes)): _content = parameters else: _json = self._serialize.body(parameters, "SharedAccessAuthorizationRuleResource") _request = build_create_or_update_authorization_rule_request( resource_group_name=resource_group_name, namespace_name=namespace_name, authorization_rule_name=authorization_rule_name, subscription_id=self._config.subscription_id, api_version=api_version, content_type=content_type, json=_json, content=_content, headers=_headers, params=_params, ) _request = _convert_request(_request) _request.url = self._client.format_url(_request.url) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200, 201]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) if response.status_code == 200: deserialized = self._deserialize("SharedAccessAuthorizationRuleResource", pipeline_response) if response.status_code == 201: deserialized = self._deserialize("SharedAccessAuthorizationRuleResource", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace def delete_authorization_rule( # pylint: disable=inconsistent-return-statements self, resource_group_name: str, namespace_name: str, authorization_rule_name: str, **kwargs: Any ) -> None: """Deletes a namespace authorization rule. Deletes a namespace authorization rule. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param namespace_name: Namespace name. Required. :type namespace_name: str :param authorization_rule_name: Authorization Rule Name. Required. :type authorization_rule_name: str :return: None or the result of cls(response) :rtype: None :raises ~azure.core.exceptions.HttpResponseError: """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[None] = kwargs.pop("cls", None) _request = build_delete_authorization_rule_request( resource_group_name=resource_group_name, namespace_name=namespace_name, authorization_rule_name=authorization_rule_name, subscription_id=self._config.subscription_id, api_version=api_version, headers=_headers, params=_params, ) _request = _convert_request(_request) _request.url = self._client.format_url(_request.url) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200, 204]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) if cls: return cls(pipeline_response, None, {}) # type: ignore
[docs] @distributed_trace def get_authorization_rule( self, resource_group_name: str, namespace_name: str, authorization_rule_name: str, **kwargs: Any ) -> _models.SharedAccessAuthorizationRuleResource: """Gets an authorization rule for a namespace by name. Gets an authorization rule for a namespace by name. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param namespace_name: Namespace name. Required. :type namespace_name: str :param authorization_rule_name: Authorization Rule Name. Required. :type authorization_rule_name: str :return: SharedAccessAuthorizationRuleResource or the result of cls(response) :rtype: ~azure.mgmt.notificationhubs.models.SharedAccessAuthorizationRuleResource :raises ~azure.core.exceptions.HttpResponseError: """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[_models.SharedAccessAuthorizationRuleResource] = kwargs.pop("cls", None) _request = build_get_authorization_rule_request( resource_group_name=resource_group_name, namespace_name=namespace_name, authorization_rule_name=authorization_rule_name, subscription_id=self._config.subscription_id, api_version=api_version, headers=_headers, params=_params, ) _request = _convert_request(_request) _request.url = self._client.format_url(_request.url) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) deserialized = self._deserialize("SharedAccessAuthorizationRuleResource", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace def list_authorization_rules( self, resource_group_name: str, namespace_name: str, **kwargs: Any ) -> Iterable["_models.SharedAccessAuthorizationRuleResource"]: """Gets the authorization rules for a namespace. Gets the authorization rules for a namespace. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param namespace_name: Namespace name. Required. :type namespace_name: str :return: An iterator like instance of either SharedAccessAuthorizationRuleResource or the result of cls(response) :rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.notificationhubs.models.SharedAccessAuthorizationRuleResource] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[_models.SharedAccessAuthorizationRuleListResult] = kwargs.pop("cls", None) error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) def prepare_request(next_link=None): if not next_link: _request = build_list_authorization_rules_request( resource_group_name=resource_group_name, namespace_name=namespace_name, subscription_id=self._config.subscription_id, api_version=api_version, headers=_headers, params=_params, ) _request = _convert_request(_request) _request.url = self._client.format_url(_request.url) else: # make call to next link with the client's api-version _parsed_next_link = urllib.parse.urlparse(next_link) _next_request_params = case_insensitive_dict( { key: [urllib.parse.quote(v) for v in value] for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items() } ) _next_request_params["api-version"] = self._config.api_version _request = HttpRequest( "GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params ) _request = _convert_request(_request) _request.url = self._client.format_url(_request.url) _request.method = "GET" return _request def extract_data(pipeline_response): deserialized = self._deserialize("SharedAccessAuthorizationRuleListResult", pipeline_response) list_of_elem = deserialized.value if cls: list_of_elem = cls(list_of_elem) # type: ignore return deserialized.next_link or None, iter(list_of_elem) def get_next(next_link=None): _request = prepare_request(next_link) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) return pipeline_response return ItemPaged(get_next, extract_data)
[docs] @distributed_trace def list_keys( self, resource_group_name: str, namespace_name: str, authorization_rule_name: str, **kwargs: Any ) -> _models.ResourceListKeys: """Gets the Primary and Secondary ConnectionStrings to the namespace. Gets the Primary and Secondary ConnectionStrings to the namespace. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param namespace_name: Namespace name. Required. :type namespace_name: str :param authorization_rule_name: Authorization Rule Name. Required. :type authorization_rule_name: str :return: ResourceListKeys or the result of cls(response) :rtype: ~azure.mgmt.notificationhubs.models.ResourceListKeys :raises ~azure.core.exceptions.HttpResponseError: """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[_models.ResourceListKeys] = kwargs.pop("cls", None) _request = build_list_keys_request( resource_group_name=resource_group_name, namespace_name=namespace_name, authorization_rule_name=authorization_rule_name, subscription_id=self._config.subscription_id, api_version=api_version, headers=_headers, params=_params, ) _request = _convert_request(_request) _request.url = self._client.format_url(_request.url) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) deserialized = self._deserialize("ResourceListKeys", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
@overload def regenerate_keys( self, resource_group_name: str, namespace_name: str, authorization_rule_name: str, parameters: _models.PolicyKeyResource, *, content_type: str = "application/json", **kwargs: Any ) -> _models.ResourceListKeys: """Regenerates the Primary/Secondary Keys to the Namespace Authorization Rule. Regenerates the Primary/Secondary Keys to the Namespace Authorization Rule. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param namespace_name: Namespace name. Required. :type namespace_name: str :param authorization_rule_name: Authorization Rule Name. Required. :type authorization_rule_name: str :param parameters: Request content. Required. :type parameters: ~azure.mgmt.notificationhubs.models.PolicyKeyResource :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: ResourceListKeys or the result of cls(response) :rtype: ~azure.mgmt.notificationhubs.models.ResourceListKeys :raises ~azure.core.exceptions.HttpResponseError: """ @overload def regenerate_keys( self, resource_group_name: str, namespace_name: str, authorization_rule_name: str, parameters: IO[bytes], *, content_type: str = "application/json", **kwargs: Any ) -> _models.ResourceListKeys: """Regenerates the Primary/Secondary Keys to the Namespace Authorization Rule. Regenerates the Primary/Secondary Keys to the Namespace Authorization Rule. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param namespace_name: Namespace name. Required. :type namespace_name: str :param authorization_rule_name: Authorization Rule Name. Required. :type authorization_rule_name: str :param parameters: Request content. Required. :type parameters: IO[bytes] :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: ResourceListKeys or the result of cls(response) :rtype: ~azure.mgmt.notificationhubs.models.ResourceListKeys :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace def regenerate_keys( self, resource_group_name: str, namespace_name: str, authorization_rule_name: str, parameters: Union[_models.PolicyKeyResource, IO[bytes]], **kwargs: Any ) -> _models.ResourceListKeys: """Regenerates the Primary/Secondary Keys to the Namespace Authorization Rule. Regenerates the Primary/Secondary Keys to the Namespace Authorization Rule. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param namespace_name: Namespace name. Required. :type namespace_name: str :param authorization_rule_name: Authorization Rule Name. Required. :type authorization_rule_name: str :param parameters: Request content. Is either a PolicyKeyResource type or a IO[bytes] type. Required. :type parameters: ~azure.mgmt.notificationhubs.models.PolicyKeyResource or IO[bytes] :return: ResourceListKeys or the result of cls(response) :rtype: ~azure.mgmt.notificationhubs.models.ResourceListKeys :raises ~azure.core.exceptions.HttpResponseError: """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[_models.ResourceListKeys] = kwargs.pop("cls", None) content_type = content_type or "application/json" _json = None _content = None if isinstance(parameters, (IOBase, bytes)): _content = parameters else: _json = self._serialize.body(parameters, "PolicyKeyResource") _request = build_regenerate_keys_request( resource_group_name=resource_group_name, namespace_name=namespace_name, authorization_rule_name=authorization_rule_name, subscription_id=self._config.subscription_id, api_version=api_version, content_type=content_type, json=_json, content=_content, headers=_headers, params=_params, ) _request = _convert_request(_request) _request.url = self._client.format_url(_request.url) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) deserialized = self._deserialize("ResourceListKeys", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace def get_pns_credentials( self, resource_group_name: str, namespace_name: str, **kwargs: Any ) -> _models.PnsCredentialsResource: """Lists the PNS credentials associated with a namespace. Lists the PNS credentials associated with a namespace. :param resource_group_name: The name of the resource group. The name is case insensitive. Required. :type resource_group_name: str :param namespace_name: Namespace name. Required. :type namespace_name: str :return: PnsCredentialsResource or the result of cls(response) :rtype: ~azure.mgmt.notificationhubs.models.PnsCredentialsResource :raises ~azure.core.exceptions.HttpResponseError: """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = case_insensitive_dict(kwargs.pop("params", {}) or {}) api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version)) cls: ClsType[_models.PnsCredentialsResource] = kwargs.pop("cls", None) _request = build_get_pns_credentials_request( resource_group_name=resource_group_name, namespace_name=namespace_name, subscription_id=self._config.subscription_id, api_version=api_version, headers=_headers, params=_params, ) _request = _convert_request(_request) _request.url = self._client.format_url(_request.url) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response) raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) deserialized = self._deserialize("PnsCredentialsResource", pipeline_response) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore