# pylint: disable=too-many-lines,too-many-statements
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
from io import IOBase
from typing import Any, Callable, Dict, IO, Iterable, Optional, TypeVar, Union, cast, overload
import urllib.parse
from azure.core.exceptions import (
ClientAuthenticationError,
HttpResponseError,
ResourceExistsError,
ResourceNotFoundError,
ResourceNotModifiedError,
map_error,
)
from azure.core.paging import ItemPaged
from azure.core.pipeline import PipelineResponse
from azure.core.pipeline.transport import HttpResponse
from azure.core.polling import LROPoller, NoPolling, PollingMethod
from azure.core.rest import HttpRequest
from azure.core.tracing.decorator import distributed_trace
from azure.core.utils import case_insensitive_dict
from azure.mgmt.core.exceptions import ARMErrorFormat
from azure.mgmt.core.polling.arm_polling import ARMPolling
from .. import models as _models
from .._serialization import Serializer
from .._vendor import _convert_request
T = TypeVar("T")
ClsType = Optional[Callable[[PipelineResponse[HttpRequest, HttpResponse], T, Dict[str, Any]], Any]]
_SERIALIZER = Serializer()
_SERIALIZER.client_side_validation = False
def build_check_availability_request(subscription_id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/providers/Microsoft.NotificationHubs/checkNamespaceAvailability",
) # pylint: disable=line-too-long
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_get_request(
resource_group_name: str, namespace_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.NotificationHubs/namespaces/{namespaceName}",
) # pylint: disable=line-too-long
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"namespaceName": _SERIALIZER.url(
"namespace_name", namespace_name, "str", max_length=50, min_length=1, pattern=r"^[a-zA-Z][a-zA-Z0-9-]*$"
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_create_or_update_request(
resource_group_name: str, namespace_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.NotificationHubs/namespaces/{namespaceName}",
) # pylint: disable=line-too-long
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"namespaceName": _SERIALIZER.url(
"namespace_name", namespace_name, "str", max_length=50, min_length=1, pattern=r"^[a-zA-Z][a-zA-Z0-9-]*$"
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="PUT", url=_url, params=_params, headers=_headers, **kwargs)
def build_update_request(
resource_group_name: str, namespace_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.NotificationHubs/namespaces/{namespaceName}",
) # pylint: disable=line-too-long
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"namespaceName": _SERIALIZER.url(
"namespace_name", namespace_name, "str", max_length=50, min_length=1, pattern=r"^[a-zA-Z][a-zA-Z0-9-]*$"
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="PATCH", url=_url, params=_params, headers=_headers, **kwargs)
def build_delete_request(
resource_group_name: str, namespace_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.NotificationHubs/namespaces/{namespaceName}",
) # pylint: disable=line-too-long
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"namespaceName": _SERIALIZER.url(
"namespace_name", namespace_name, "str", max_length=50, min_length=1, pattern=r"^[a-zA-Z][a-zA-Z0-9-]*$"
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="DELETE", url=_url, params=_params, headers=_headers, **kwargs)
def build_list_all_request(
subscription_id: str, *, skip_token: Optional[str] = None, top: int = 100, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop("template_url", "/subscriptions/{subscriptionId}/providers/Microsoft.NotificationHubs/namespaces")
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
if skip_token is not None:
_params["$skipToken"] = _SERIALIZER.query("skip_token", skip_token, "str")
if top is not None:
_params["$top"] = _SERIALIZER.query("top", top, "int")
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_list_request(
resource_group_name: str, subscription_id: str, *, skip_token: Optional[str] = None, top: int = 100, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.NotificationHubs/namespaces",
) # pylint: disable=line-too-long
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
if skip_token is not None:
_params["$skipToken"] = _SERIALIZER.query("skip_token", skip_token, "str")
if top is not None:
_params["$top"] = _SERIALIZER.query("top", top, "int")
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_create_or_update_authorization_rule_request( # pylint: disable=name-too-long
resource_group_name: str, namespace_name: str, authorization_rule_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.NotificationHubs/namespaces/{namespaceName}/authorizationRules/{authorizationRuleName}",
) # pylint: disable=line-too-long
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"namespaceName": _SERIALIZER.url(
"namespace_name", namespace_name, "str", max_length=50, min_length=1, pattern=r"^[a-zA-Z][a-zA-Z0-9-]*$"
),
"authorizationRuleName": _SERIALIZER.url(
"authorization_rule_name",
authorization_rule_name,
"str",
max_length=256,
min_length=1,
pattern=r"^[a-zA-Z0-9!()*-._]+$",
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="PUT", url=_url, params=_params, headers=_headers, **kwargs)
def build_delete_authorization_rule_request(
resource_group_name: str, namespace_name: str, authorization_rule_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.NotificationHubs/namespaces/{namespaceName}/authorizationRules/{authorizationRuleName}",
) # pylint: disable=line-too-long
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"namespaceName": _SERIALIZER.url(
"namespace_name", namespace_name, "str", max_length=50, min_length=1, pattern=r"^[a-zA-Z][a-zA-Z0-9-]*$"
),
"authorizationRuleName": _SERIALIZER.url(
"authorization_rule_name",
authorization_rule_name,
"str",
max_length=256,
min_length=1,
pattern=r"^[a-zA-Z0-9!()*-._]+$",
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="DELETE", url=_url, params=_params, headers=_headers, **kwargs)
def build_get_authorization_rule_request(
resource_group_name: str, namespace_name: str, authorization_rule_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.NotificationHubs/namespaces/{namespaceName}/authorizationRules/{authorizationRuleName}",
) # pylint: disable=line-too-long
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"namespaceName": _SERIALIZER.url(
"namespace_name", namespace_name, "str", max_length=50, min_length=1, pattern=r"^[a-zA-Z][a-zA-Z0-9-]*$"
),
"authorizationRuleName": _SERIALIZER.url(
"authorization_rule_name",
authorization_rule_name,
"str",
max_length=256,
min_length=1,
pattern=r"^[a-zA-Z0-9!()*-._]+$",
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_list_authorization_rules_request(
resource_group_name: str, namespace_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.NotificationHubs/namespaces/{namespaceName}/authorizationRules",
) # pylint: disable=line-too-long
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"namespaceName": _SERIALIZER.url(
"namespace_name", namespace_name, "str", max_length=50, min_length=1, pattern=r"^[a-zA-Z][a-zA-Z0-9-]*$"
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_list_keys_request(
resource_group_name: str, namespace_name: str, authorization_rule_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.NotificationHubs/namespaces/{namespaceName}/authorizationRules/{authorizationRuleName}/listKeys",
) # pylint: disable=line-too-long
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"namespaceName": _SERIALIZER.url(
"namespace_name", namespace_name, "str", max_length=50, min_length=1, pattern=r"^[a-zA-Z][a-zA-Z0-9-]*$"
),
"authorizationRuleName": _SERIALIZER.url(
"authorization_rule_name",
authorization_rule_name,
"str",
max_length=256,
min_length=1,
pattern=r"^[a-zA-Z0-9!()*-._]+$",
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_regenerate_keys_request(
resource_group_name: str, namespace_name: str, authorization_rule_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.NotificationHubs/namespaces/{namespaceName}/authorizationRules/{authorizationRuleName}/regenerateKeys",
) # pylint: disable=line-too-long
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"namespaceName": _SERIALIZER.url(
"namespace_name", namespace_name, "str", max_length=50, min_length=1, pattern=r"^[a-zA-Z][a-zA-Z0-9-]*$"
),
"authorizationRuleName": _SERIALIZER.url(
"authorization_rule_name",
authorization_rule_name,
"str",
max_length=256,
min_length=1,
pattern=r"^[a-zA-Z0-9!()*-._]+$",
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_get_pns_credentials_request(
resource_group_name: str, namespace_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = kwargs.pop(
"template_url",
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.NotificationHubs/namespaces/{namespaceName}/pnsCredentials",
) # pylint: disable=line-too-long
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
"resourceGroupName": _SERIALIZER.url(
"resource_group_name", resource_group_name, "str", max_length=90, min_length=1
),
"namespaceName": _SERIALIZER.url(
"namespace_name", namespace_name, "str", max_length=50, min_length=1, pattern=r"^[a-zA-Z][a-zA-Z0-9-]*$"
),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
[docs]class NamespacesOperations:
"""
.. warning::
**DO NOT** instantiate this class directly.
Instead, you should access the following operations through
:class:`~azure.mgmt.notificationhubs.NotificationHubsManagementClient`'s
:attr:`namespaces` attribute.
"""
models = _models
def __init__(self, *args, **kwargs):
input_args = list(args)
self._client = input_args.pop(0) if input_args else kwargs.pop("client")
self._config = input_args.pop(0) if input_args else kwargs.pop("config")
self._serialize = input_args.pop(0) if input_args else kwargs.pop("serializer")
self._deserialize = input_args.pop(0) if input_args else kwargs.pop("deserializer")
@overload
def check_availability(
self, parameters: _models.CheckAvailabilityParameters, *, content_type: str = "application/json", **kwargs: Any
) -> _models.CheckAvailabilityResult:
"""Checks the availability of the given service namespace across all Azure subscriptions. This is
useful because the domain name is created based on the service namespace name.
Checks the availability of the given service namespace across all Azure subscriptions. This is
useful because the domain name is created based on the service namespace name.
:param parameters: Request content. Required.
:type parameters: ~azure.mgmt.notificationhubs.models.CheckAvailabilityParameters
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: CheckAvailabilityResult or the result of cls(response)
:rtype: ~azure.mgmt.notificationhubs.models.CheckAvailabilityResult
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def check_availability(
self, parameters: IO[bytes], *, content_type: str = "application/json", **kwargs: Any
) -> _models.CheckAvailabilityResult:
"""Checks the availability of the given service namespace across all Azure subscriptions. This is
useful because the domain name is created based on the service namespace name.
Checks the availability of the given service namespace across all Azure subscriptions. This is
useful because the domain name is created based on the service namespace name.
:param parameters: Request content. Required.
:type parameters: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: CheckAvailabilityResult or the result of cls(response)
:rtype: ~azure.mgmt.notificationhubs.models.CheckAvailabilityResult
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs] @distributed_trace
def check_availability(
self, parameters: Union[_models.CheckAvailabilityParameters, IO[bytes]], **kwargs: Any
) -> _models.CheckAvailabilityResult:
"""Checks the availability of the given service namespace across all Azure subscriptions. This is
useful because the domain name is created based on the service namespace name.
Checks the availability of the given service namespace across all Azure subscriptions. This is
useful because the domain name is created based on the service namespace name.
:param parameters: Request content. Is either a CheckAvailabilityParameters type or a IO[bytes]
type. Required.
:type parameters: ~azure.mgmt.notificationhubs.models.CheckAvailabilityParameters or IO[bytes]
:return: CheckAvailabilityResult or the result of cls(response)
:rtype: ~azure.mgmt.notificationhubs.models.CheckAvailabilityResult
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.CheckAvailabilityResult] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(parameters, (IOBase, bytes)):
_content = parameters
else:
_json = self._serialize.body(parameters, "CheckAvailabilityParameters")
_request = build_check_availability_request(
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
_request = _convert_request(_request)
_request.url = self._client.format_url(_request.url)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response)
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
deserialized = self._deserialize("CheckAvailabilityResult", pipeline_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
[docs] @distributed_trace
def get(self, resource_group_name: str, namespace_name: str, **kwargs: Any) -> _models.NamespaceResource:
"""Returns the given namespace.
Returns the given namespace.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param namespace_name: Namespace name. Required.
:type namespace_name: str
:return: NamespaceResource or the result of cls(response)
:rtype: ~azure.mgmt.notificationhubs.models.NamespaceResource
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.NamespaceResource] = kwargs.pop("cls", None)
_request = build_get_request(
resource_group_name=resource_group_name,
namespace_name=namespace_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request = _convert_request(_request)
_request.url = self._client.format_url(_request.url)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response)
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
deserialized = self._deserialize("NamespaceResource", pipeline_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
def _create_or_update_initial(
self,
resource_group_name: str,
namespace_name: str,
parameters: Union[_models.NamespaceResource, IO[bytes]],
**kwargs: Any
) -> _models.NamespaceResource:
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.NamespaceResource] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(parameters, (IOBase, bytes)):
_content = parameters
else:
_json = self._serialize.body(parameters, "NamespaceResource")
_request = build_create_or_update_request(
resource_group_name=resource_group_name,
namespace_name=namespace_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
_request = _convert_request(_request)
_request.url = self._client.format_url(_request.url)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 201]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response)
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
if response.status_code == 200:
deserialized = self._deserialize("NamespaceResource", pipeline_response)
if response.status_code == 201:
deserialized = self._deserialize("NamespaceResource", pipeline_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
@overload
def begin_create_or_update(
self,
resource_group_name: str,
namespace_name: str,
parameters: _models.NamespaceResource,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.NamespaceResource]:
"""Creates / Updates a Notification Hub namespace. This operation is idempotent.
Creates / Updates a Notification Hub namespace. This operation is idempotent.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param namespace_name: Namespace name. Required.
:type namespace_name: str
:param parameters: Request content. Required.
:type parameters: ~azure.mgmt.notificationhubs.models.NamespaceResource
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either NamespaceResource or the result of
cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.notificationhubs.models.NamespaceResource]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_create_or_update(
self,
resource_group_name: str,
namespace_name: str,
parameters: IO[bytes],
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.NamespaceResource]:
"""Creates / Updates a Notification Hub namespace. This operation is idempotent.
Creates / Updates a Notification Hub namespace. This operation is idempotent.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param namespace_name: Namespace name. Required.
:type namespace_name: str
:param parameters: Request content. Required.
:type parameters: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns either NamespaceResource or the result of
cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.notificationhubs.models.NamespaceResource]
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs] @distributed_trace
def begin_create_or_update(
self,
resource_group_name: str,
namespace_name: str,
parameters: Union[_models.NamespaceResource, IO[bytes]],
**kwargs: Any
) -> LROPoller[_models.NamespaceResource]:
"""Creates / Updates a Notification Hub namespace. This operation is idempotent.
Creates / Updates a Notification Hub namespace. This operation is idempotent.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param namespace_name: Namespace name. Required.
:type namespace_name: str
:param parameters: Request content. Is either a NamespaceResource type or a IO[bytes] type.
Required.
:type parameters: ~azure.mgmt.notificationhubs.models.NamespaceResource or IO[bytes]
:return: An instance of LROPoller that returns either NamespaceResource or the result of
cls(response)
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.notificationhubs.models.NamespaceResource]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.NamespaceResource] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._create_or_update_initial(
resource_group_name=resource_group_name,
namespace_name=namespace_name,
parameters=parameters,
api_version=api_version,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response):
deserialized = self._deserialize("NamespaceResource", pipeline_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized
if polling is True:
polling_method: PollingMethod = cast(
PollingMethod, ARMPolling(lro_delay, lro_options={"final-state-via": "azure-async-operation"}, **kwargs)
)
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[_models.NamespaceResource].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[_models.NamespaceResource](
self._client, raw_result, get_long_running_output, polling_method # type: ignore
)
@overload
def update(
self,
resource_group_name: str,
namespace_name: str,
parameters: _models.NamespacePatchParameters,
*,
content_type: str = "application/json",
**kwargs: Any
) -> _models.NamespaceResource:
"""Patches the existing namespace.
Patches the existing namespace.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param namespace_name: Namespace name. Required.
:type namespace_name: str
:param parameters: Request content. Required.
:type parameters: ~azure.mgmt.notificationhubs.models.NamespacePatchParameters
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: NamespaceResource or the result of cls(response)
:rtype: ~azure.mgmt.notificationhubs.models.NamespaceResource
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def update(
self,
resource_group_name: str,
namespace_name: str,
parameters: IO[bytes],
*,
content_type: str = "application/json",
**kwargs: Any
) -> _models.NamespaceResource:
"""Patches the existing namespace.
Patches the existing namespace.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param namespace_name: Namespace name. Required.
:type namespace_name: str
:param parameters: Request content. Required.
:type parameters: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: NamespaceResource or the result of cls(response)
:rtype: ~azure.mgmt.notificationhubs.models.NamespaceResource
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs] @distributed_trace
def update(
self,
resource_group_name: str,
namespace_name: str,
parameters: Union[_models.NamespacePatchParameters, IO[bytes]],
**kwargs: Any
) -> _models.NamespaceResource:
"""Patches the existing namespace.
Patches the existing namespace.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param namespace_name: Namespace name. Required.
:type namespace_name: str
:param parameters: Request content. Is either a NamespacePatchParameters type or a IO[bytes]
type. Required.
:type parameters: ~azure.mgmt.notificationhubs.models.NamespacePatchParameters or IO[bytes]
:return: NamespaceResource or the result of cls(response)
:rtype: ~azure.mgmt.notificationhubs.models.NamespaceResource
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.NamespaceResource] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(parameters, (IOBase, bytes)):
_content = parameters
else:
_json = self._serialize.body(parameters, "NamespacePatchParameters")
_request = build_update_request(
resource_group_name=resource_group_name,
namespace_name=namespace_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
_request = _convert_request(_request)
_request.url = self._client.format_url(_request.url)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response)
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
deserialized = self._deserialize("NamespaceResource", pipeline_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
[docs] @distributed_trace
def delete( # pylint: disable=inconsistent-return-statements
self, resource_group_name: str, namespace_name: str, **kwargs: Any
) -> None:
"""Deletes an existing namespace. This operation also removes all associated notificationHubs
under the namespace.
Deletes an existing namespace. This operation also removes all associated notificationHubs
under the namespace.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param namespace_name: Namespace name. Required.
:type namespace_name: str
:return: None or the result of cls(response)
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[None] = kwargs.pop("cls", None)
_request = build_delete_request(
resource_group_name=resource_group_name,
namespace_name=namespace_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request = _convert_request(_request)
_request.url = self._client.format_url(_request.url)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 204]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response)
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
if cls:
return cls(pipeline_response, None, {}) # type: ignore
[docs] @distributed_trace
def list_all(
self, skip_token: Optional[str] = None, top: int = 100, **kwargs: Any
) -> Iterable["_models.NamespaceResource"]:
"""Lists all the available namespaces within the subscription.
Lists all the available namespaces within the subscription.
:param skip_token: Skip token for subsequent requests. Default value is None.
:type skip_token: str
:param top: Maximum number of results to return. Default value is 100.
:type top: int
:return: An iterator like instance of either NamespaceResource or the result of cls(response)
:rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.notificationhubs.models.NamespaceResource]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.NamespaceListResult] = kwargs.pop("cls", None)
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
_request = build_list_all_request(
subscription_id=self._config.subscription_id,
skip_token=skip_token,
top=top,
api_version=api_version,
headers=_headers,
params=_params,
)
_request = _convert_request(_request)
_request.url = self._client.format_url(_request.url)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
_request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
_request = _convert_request(_request)
_request.url = self._client.format_url(_request.url)
_request.method = "GET"
return _request
def extract_data(pipeline_response):
deserialized = self._deserialize("NamespaceListResult", pipeline_response)
list_of_elem = deserialized.value
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return deserialized.next_link or None, iter(list_of_elem)
def get_next(next_link=None):
_request = prepare_request(next_link)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response)
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
return pipeline_response
return ItemPaged(get_next, extract_data)
[docs] @distributed_trace
def list(
self, resource_group_name: str, skip_token: Optional[str] = None, top: int = 100, **kwargs: Any
) -> Iterable["_models.NamespaceResource"]:
"""Lists the available namespaces within a resource group.
Lists the available namespaces within a resource group.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param skip_token: Skip token for subsequent requests. Default value is None.
:type skip_token: str
:param top: Maximum number of results to return. Default value is 100.
:type top: int
:return: An iterator like instance of either NamespaceResource or the result of cls(response)
:rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.notificationhubs.models.NamespaceResource]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.NamespaceListResult] = kwargs.pop("cls", None)
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
_request = build_list_request(
resource_group_name=resource_group_name,
subscription_id=self._config.subscription_id,
skip_token=skip_token,
top=top,
api_version=api_version,
headers=_headers,
params=_params,
)
_request = _convert_request(_request)
_request.url = self._client.format_url(_request.url)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
_request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
_request = _convert_request(_request)
_request.url = self._client.format_url(_request.url)
_request.method = "GET"
return _request
def extract_data(pipeline_response):
deserialized = self._deserialize("NamespaceListResult", pipeline_response)
list_of_elem = deserialized.value
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return deserialized.next_link or None, iter(list_of_elem)
def get_next(next_link=None):
_request = prepare_request(next_link)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response)
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
return pipeline_response
return ItemPaged(get_next, extract_data)
@overload
def create_or_update_authorization_rule(
self,
resource_group_name: str,
namespace_name: str,
authorization_rule_name: str,
parameters: _models.SharedAccessAuthorizationRuleResource,
*,
content_type: str = "application/json",
**kwargs: Any
) -> _models.SharedAccessAuthorizationRuleResource:
"""Creates an authorization rule for a namespace.
Creates an authorization rule for a namespace.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param namespace_name: Namespace name. Required.
:type namespace_name: str
:param authorization_rule_name: Authorization Rule Name. Required.
:type authorization_rule_name: str
:param parameters: Request content. Required.
:type parameters: ~azure.mgmt.notificationhubs.models.SharedAccessAuthorizationRuleResource
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: SharedAccessAuthorizationRuleResource or the result of cls(response)
:rtype: ~azure.mgmt.notificationhubs.models.SharedAccessAuthorizationRuleResource
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def create_or_update_authorization_rule(
self,
resource_group_name: str,
namespace_name: str,
authorization_rule_name: str,
parameters: IO[bytes],
*,
content_type: str = "application/json",
**kwargs: Any
) -> _models.SharedAccessAuthorizationRuleResource:
"""Creates an authorization rule for a namespace.
Creates an authorization rule for a namespace.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param namespace_name: Namespace name. Required.
:type namespace_name: str
:param authorization_rule_name: Authorization Rule Name. Required.
:type authorization_rule_name: str
:param parameters: Request content. Required.
:type parameters: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: SharedAccessAuthorizationRuleResource or the result of cls(response)
:rtype: ~azure.mgmt.notificationhubs.models.SharedAccessAuthorizationRuleResource
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs] @distributed_trace
def create_or_update_authorization_rule(
self,
resource_group_name: str,
namespace_name: str,
authorization_rule_name: str,
parameters: Union[_models.SharedAccessAuthorizationRuleResource, IO[bytes]],
**kwargs: Any
) -> _models.SharedAccessAuthorizationRuleResource:
"""Creates an authorization rule for a namespace.
Creates an authorization rule for a namespace.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param namespace_name: Namespace name. Required.
:type namespace_name: str
:param authorization_rule_name: Authorization Rule Name. Required.
:type authorization_rule_name: str
:param parameters: Request content. Is either a SharedAccessAuthorizationRuleResource type or a
IO[bytes] type. Required.
:type parameters: ~azure.mgmt.notificationhubs.models.SharedAccessAuthorizationRuleResource or
IO[bytes]
:return: SharedAccessAuthorizationRuleResource or the result of cls(response)
:rtype: ~azure.mgmt.notificationhubs.models.SharedAccessAuthorizationRuleResource
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.SharedAccessAuthorizationRuleResource] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(parameters, (IOBase, bytes)):
_content = parameters
else:
_json = self._serialize.body(parameters, "SharedAccessAuthorizationRuleResource")
_request = build_create_or_update_authorization_rule_request(
resource_group_name=resource_group_name,
namespace_name=namespace_name,
authorization_rule_name=authorization_rule_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
_request = _convert_request(_request)
_request.url = self._client.format_url(_request.url)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 201]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response)
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
if response.status_code == 200:
deserialized = self._deserialize("SharedAccessAuthorizationRuleResource", pipeline_response)
if response.status_code == 201:
deserialized = self._deserialize("SharedAccessAuthorizationRuleResource", pipeline_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
[docs] @distributed_trace
def delete_authorization_rule( # pylint: disable=inconsistent-return-statements
self, resource_group_name: str, namespace_name: str, authorization_rule_name: str, **kwargs: Any
) -> None:
"""Deletes a namespace authorization rule.
Deletes a namespace authorization rule.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param namespace_name: Namespace name. Required.
:type namespace_name: str
:param authorization_rule_name: Authorization Rule Name. Required.
:type authorization_rule_name: str
:return: None or the result of cls(response)
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[None] = kwargs.pop("cls", None)
_request = build_delete_authorization_rule_request(
resource_group_name=resource_group_name,
namespace_name=namespace_name,
authorization_rule_name=authorization_rule_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request = _convert_request(_request)
_request.url = self._client.format_url(_request.url)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 204]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response)
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
if cls:
return cls(pipeline_response, None, {}) # type: ignore
[docs] @distributed_trace
def get_authorization_rule(
self, resource_group_name: str, namespace_name: str, authorization_rule_name: str, **kwargs: Any
) -> _models.SharedAccessAuthorizationRuleResource:
"""Gets an authorization rule for a namespace by name.
Gets an authorization rule for a namespace by name.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param namespace_name: Namespace name. Required.
:type namespace_name: str
:param authorization_rule_name: Authorization Rule Name. Required.
:type authorization_rule_name: str
:return: SharedAccessAuthorizationRuleResource or the result of cls(response)
:rtype: ~azure.mgmt.notificationhubs.models.SharedAccessAuthorizationRuleResource
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.SharedAccessAuthorizationRuleResource] = kwargs.pop("cls", None)
_request = build_get_authorization_rule_request(
resource_group_name=resource_group_name,
namespace_name=namespace_name,
authorization_rule_name=authorization_rule_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request = _convert_request(_request)
_request.url = self._client.format_url(_request.url)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response)
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
deserialized = self._deserialize("SharedAccessAuthorizationRuleResource", pipeline_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
[docs] @distributed_trace
def list_authorization_rules(
self, resource_group_name: str, namespace_name: str, **kwargs: Any
) -> Iterable["_models.SharedAccessAuthorizationRuleResource"]:
"""Gets the authorization rules for a namespace.
Gets the authorization rules for a namespace.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param namespace_name: Namespace name. Required.
:type namespace_name: str
:return: An iterator like instance of either SharedAccessAuthorizationRuleResource or the
result of cls(response)
:rtype:
~azure.core.paging.ItemPaged[~azure.mgmt.notificationhubs.models.SharedAccessAuthorizationRuleResource]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.SharedAccessAuthorizationRuleListResult] = kwargs.pop("cls", None)
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
_request = build_list_authorization_rules_request(
resource_group_name=resource_group_name,
namespace_name=namespace_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request = _convert_request(_request)
_request.url = self._client.format_url(_request.url)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
_request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
_request = _convert_request(_request)
_request.url = self._client.format_url(_request.url)
_request.method = "GET"
return _request
def extract_data(pipeline_response):
deserialized = self._deserialize("SharedAccessAuthorizationRuleListResult", pipeline_response)
list_of_elem = deserialized.value
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return deserialized.next_link or None, iter(list_of_elem)
def get_next(next_link=None):
_request = prepare_request(next_link)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response)
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
return pipeline_response
return ItemPaged(get_next, extract_data)
[docs] @distributed_trace
def list_keys(
self, resource_group_name: str, namespace_name: str, authorization_rule_name: str, **kwargs: Any
) -> _models.ResourceListKeys:
"""Gets the Primary and Secondary ConnectionStrings to the namespace.
Gets the Primary and Secondary ConnectionStrings to the namespace.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param namespace_name: Namespace name. Required.
:type namespace_name: str
:param authorization_rule_name: Authorization Rule Name. Required.
:type authorization_rule_name: str
:return: ResourceListKeys or the result of cls(response)
:rtype: ~azure.mgmt.notificationhubs.models.ResourceListKeys
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.ResourceListKeys] = kwargs.pop("cls", None)
_request = build_list_keys_request(
resource_group_name=resource_group_name,
namespace_name=namespace_name,
authorization_rule_name=authorization_rule_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request = _convert_request(_request)
_request.url = self._client.format_url(_request.url)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response)
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
deserialized = self._deserialize("ResourceListKeys", pipeline_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
@overload
def regenerate_keys(
self,
resource_group_name: str,
namespace_name: str,
authorization_rule_name: str,
parameters: _models.PolicyKeyResource,
*,
content_type: str = "application/json",
**kwargs: Any
) -> _models.ResourceListKeys:
"""Regenerates the Primary/Secondary Keys to the Namespace Authorization Rule.
Regenerates the Primary/Secondary Keys to the Namespace Authorization Rule.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param namespace_name: Namespace name. Required.
:type namespace_name: str
:param authorization_rule_name: Authorization Rule Name. Required.
:type authorization_rule_name: str
:param parameters: Request content. Required.
:type parameters: ~azure.mgmt.notificationhubs.models.PolicyKeyResource
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: ResourceListKeys or the result of cls(response)
:rtype: ~azure.mgmt.notificationhubs.models.ResourceListKeys
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def regenerate_keys(
self,
resource_group_name: str,
namespace_name: str,
authorization_rule_name: str,
parameters: IO[bytes],
*,
content_type: str = "application/json",
**kwargs: Any
) -> _models.ResourceListKeys:
"""Regenerates the Primary/Secondary Keys to the Namespace Authorization Rule.
Regenerates the Primary/Secondary Keys to the Namespace Authorization Rule.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param namespace_name: Namespace name. Required.
:type namespace_name: str
:param authorization_rule_name: Authorization Rule Name. Required.
:type authorization_rule_name: str
:param parameters: Request content. Required.
:type parameters: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: ResourceListKeys or the result of cls(response)
:rtype: ~azure.mgmt.notificationhubs.models.ResourceListKeys
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs] @distributed_trace
def regenerate_keys(
self,
resource_group_name: str,
namespace_name: str,
authorization_rule_name: str,
parameters: Union[_models.PolicyKeyResource, IO[bytes]],
**kwargs: Any
) -> _models.ResourceListKeys:
"""Regenerates the Primary/Secondary Keys to the Namespace Authorization Rule.
Regenerates the Primary/Secondary Keys to the Namespace Authorization Rule.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param namespace_name: Namespace name. Required.
:type namespace_name: str
:param authorization_rule_name: Authorization Rule Name. Required.
:type authorization_rule_name: str
:param parameters: Request content. Is either a PolicyKeyResource type or a IO[bytes] type.
Required.
:type parameters: ~azure.mgmt.notificationhubs.models.PolicyKeyResource or IO[bytes]
:return: ResourceListKeys or the result of cls(response)
:rtype: ~azure.mgmt.notificationhubs.models.ResourceListKeys
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.ResourceListKeys] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(parameters, (IOBase, bytes)):
_content = parameters
else:
_json = self._serialize.body(parameters, "PolicyKeyResource")
_request = build_regenerate_keys_request(
resource_group_name=resource_group_name,
namespace_name=namespace_name,
authorization_rule_name=authorization_rule_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
content_type=content_type,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
_request = _convert_request(_request)
_request.url = self._client.format_url(_request.url)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response)
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
deserialized = self._deserialize("ResourceListKeys", pipeline_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
[docs] @distributed_trace
def get_pns_credentials(
self, resource_group_name: str, namespace_name: str, **kwargs: Any
) -> _models.PnsCredentialsResource:
"""Lists the PNS credentials associated with a namespace.
Lists the PNS credentials associated with a namespace.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param namespace_name: Namespace name. Required.
:type namespace_name: str
:return: PnsCredentialsResource or the result of cls(response)
:rtype: ~azure.mgmt.notificationhubs.models.PnsCredentialsResource
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", self._config.api_version))
cls: ClsType[_models.PnsCredentialsResource] = kwargs.pop("cls", None)
_request = build_get_pns_credentials_request(
resource_group_name=resource_group_name,
namespace_name=namespace_name,
subscription_id=self._config.subscription_id,
api_version=api_version,
headers=_headers,
params=_params,
)
_request = _convert_request(_request)
_request.url = self._client.format_url(_request.url)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, pipeline_response)
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
deserialized = self._deserialize("PnsCredentialsResource", pipeline_response)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore