Source code for azure.iot.deviceupdate.aio.operations._operations

# pylint: disable=too-many-lines
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
import sys
from typing import Any, AsyncIterable, Callable, Dict, IO, List, Optional, TypeVar, Union, cast, overload
from urllib.parse import parse_qs, urljoin, urlparse

from azure.core.async_paging import AsyncItemPaged, AsyncList
from azure.core.exceptions import (
    ClientAuthenticationError,
    HttpResponseError,
    ResourceExistsError,
    ResourceNotFoundError,
    map_error,
)
from azure.core.pipeline import PipelineResponse
from azure.core.pipeline.transport import AsyncHttpResponse
from azure.core.polling import AsyncLROPoller, AsyncNoPolling, AsyncPollingMethod
from azure.core.polling.async_base_polling import AsyncLROBasePolling
from azure.core.rest import HttpRequest
from azure.core.tracing.decorator import distributed_trace
from azure.core.tracing.decorator_async import distributed_trace_async
from azure.core.utils import case_insensitive_dict

from ...operations._operations import (
    build_device_management_create_or_update_deployment_request,
    build_device_management_delete_deployment_for_device_class_subgroup_request,
    build_device_management_delete_deployment_request,
    build_device_management_delete_device_class_request,
    build_device_management_delete_device_class_subgroup_request,
    build_device_management_delete_group_request,
    build_device_management_get_best_updates_for_device_class_subgroup_request,
    build_device_management_get_deployment_for_device_class_subgroup_request,
    build_device_management_get_deployment_request,
    build_device_management_get_deployment_status_request,
    build_device_management_get_device_class_request,
    build_device_management_get_device_class_subgroup_deployment_status_request,
    build_device_management_get_device_class_subgroup_request,
    build_device_management_get_device_class_subgroup_update_compliance_request,
    build_device_management_get_device_module_request,
    build_device_management_get_device_request,
    build_device_management_get_group_request,
    build_device_management_get_log_collection_detailed_status_request,
    build_device_management_get_log_collection_request,
    build_device_management_get_operation_status_request,
    build_device_management_get_update_compliance_for_group_request,
    build_device_management_get_update_compliance_request,
    build_device_management_import_devices_request,
    build_device_management_list_best_updates_for_group_request,
    build_device_management_list_deployments_for_device_class_subgroup_request,
    build_device_management_list_deployments_for_group_request,
    build_device_management_list_device_class_subgroups_for_group_request,
    build_device_management_list_device_classes_request,
    build_device_management_list_device_states_for_device_class_subgroup_deployment_request,
    build_device_management_list_devices_request,
    build_device_management_list_groups_request,
    build_device_management_list_health_of_devices_request,
    build_device_management_list_installable_updates_for_device_class_request,
    build_device_management_list_log_collections_request,
    build_device_management_list_operation_statuses_request,
    build_device_management_retry_deployment_request,
    build_device_management_start_log_collection_request,
    build_device_management_stop_deployment_request,
    build_device_management_update_device_class_request,
    build_device_update_delete_update_request,
    build_device_update_get_file_request,
    build_device_update_get_operation_status_request,
    build_device_update_get_update_request,
    build_device_update_import_update_request,
    build_device_update_list_files_request,
    build_device_update_list_names_request,
    build_device_update_list_operation_statuses_request,
    build_device_update_list_providers_request,
    build_device_update_list_updates_request,
    build_device_update_list_versions_request,
)

if sys.version_info >= (3, 9):
    from collections.abc import MutableMapping
else:
    from typing import MutableMapping  # type: ignore  # pylint: disable=ungrouped-imports
JSON = MutableMapping[str, Any]  # pylint: disable=unsubscriptable-object
T = TypeVar("T")
ClsType = Optional[Callable[[PipelineResponse[HttpRequest, AsyncHttpResponse], T, Dict[str, Any]], Any]]


[docs]class DeviceUpdateOperations: """ .. warning:: **DO NOT** instantiate this class directly. Instead, you should access the following operations through :class:`~azure.iot.deviceupdate.aio.DeviceUpdateClient`'s :attr:`device_update` attribute. """ def __init__(self, *args, **kwargs) -> None: input_args = list(args) self._client = input_args.pop(0) if input_args else kwargs.pop("client") self._config = input_args.pop(0) if input_args else kwargs.pop("config") self._serialize = input_args.pop(0) if input_args else kwargs.pop("serializer") self._deserialize = input_args.pop(0) if input_args else kwargs.pop("deserializer")
[docs] @distributed_trace def list_updates( self, *, search: Optional[str] = None, filter: Optional[str] = None, **kwargs: Any ) -> AsyncIterable[JSON]: """Get a list of all updates that have been imported to Device Update for IoT Hub. :keyword search: Request updates matching a free-text search expression. Default value is None. :paramtype search: str :keyword filter: Optional to filter updates by isDeployable property. Default value is None. :paramtype filter: str :return: An iterator like instance of JSON object :rtype: ~azure.core.async_paging.AsyncItemPaged[JSON] :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "compatibility": [ { "str": "str" # List of update compatibility information. Required. } ], "createdDateTime": "2020-02-20 00:00:00", # Date and time in UTC when the update was created. Required. "importedDateTime": "2020-02-20 00:00:00", # Date and time in UTC when the update was imported. Required. "manifestVersion": "str", # Schema version of manifest used to import the update. Required. "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description specified by creator. "etag": "str", # Optional. Update ETag. "friendlyName": "str", # Optional. Friendly update name specified by importer. "installedCriteria": "str", # Optional. String interpreted by Device Update client to determine if the update is installed on the device. Deprecated in latest import manifest schema. "instructions": { "steps": [ { "description": "str", # Optional. Step description. "files": [ "str" # Optional. Collection of file names to be passed to handler during execution. Required if step type is inline. ], "handler": "str", # Optional. Identity of handler that will execute this step. Required if step type is inline. "handlerProperties": {}, # Optional. Parameters to be passed to handler during execution. "type": "inline", # Optional. Default value is "inline". Step type. Known values are: "Inline" and "Reference". "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. } } ] }, "isDeployable": True, # Optional. Default value is True. Whether the update can be deployed to a device on its own. "referencedBy": [ { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. } ], "scanResult": "str", # Optional. Update aggregate scan result (calculated from payload file scan results). "updateType": "str" # Optional. Update type. Deprecated in latest import manifest schema. } """ _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) def prepare_request(next_link=None): if not next_link: request = build_device_update_list_updates_request( instance_id=self._config.instance_id, search=search, filter=filter, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore else: # make call to next link with the client's api-version _parsed_next_link = urlparse(next_link) _next_request_params = case_insensitive_dict(parse_qs(_parsed_next_link.query)) _next_request_params["api-version"] = self._config.api_version request = HttpRequest("GET", urljoin(next_link, _parsed_next_link.path), params=_next_request_params) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore return request async def extract_data(pipeline_response): deserialized = pipeline_response.http_response.json() list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) return deserialized.get("nextLink", None), AsyncList(list_of_elem) async def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return AsyncItemPaged(get_next, extract_data)
async def _import_update_initial(self, update_to_import: Union[List[JSON], IO], **kwargs: Any) -> Optional[JSON]: error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = kwargs.pop("params", {}) or {} content_type = kwargs.pop("content_type", _headers.pop("Content-Type", None)) # type: Optional[str] cls = kwargs.pop("cls", None) # type: ClsType[Optional[JSON]] content_type = content_type or "application/json" _json = None _content = None if isinstance(update_to_import, (IO, bytes)): _content = update_to_import else: _json = update_to_import request = build_device_update_import_update_request( instance_id=self._config.instance_id, content_type=content_type, api_version=self._config.api_version, json=_json, content=_content, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200, 202]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) deserialized = None response_headers = {} if response.status_code == 200: if response.content: deserialized = response.json() else: deserialized = None if response.status_code == 202: response_headers["Operation-Location"] = self._deserialize( "str", response.headers.get("Operation-Location") ) if cls: return cls(pipeline_response, deserialized, response_headers) return deserialized @overload async def begin_import_update( self, update_to_import: List[JSON], *, content_type: str = "application/json", **kwargs: Any ) -> AsyncLROPoller[JSON]: """Import new update version. This is a long-running-operation; use Operation-Location response header value to check for operation status. :param update_to_import: The update to be imported (see schema https://json.schemastore.org/azure-deviceupdate-import-manifest-5.0.json for details). Required. :type update_to_import: list[JSON] :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :keyword str continuation_token: A continuation token to restart a poller from a saved state. :keyword polling: By default, your polling method will be AsyncLROBasePolling. Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy. :paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. :return: An instance of AsyncLROPoller that returns JSON object :rtype: ~azure.core.polling.AsyncLROPoller[JSON] :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # JSON input template you can fill out and use as your body input. update_to_import = [ { "importManifest": { "hashes": { "str": "str" # A JSON object containing the hash(es) of the file. At least SHA256 hash is required. This object can be thought of as a set of key-value pairs where the key is the hash algorithm, and the value is the hash of the file calculated using that algorithm. Required. }, "sizeInBytes": 0, # File size in number of bytes. Required. "url": "str" # Azure Blob location from which the import manifest can be downloaded by Device Update for IoT Hub. This is typically a read-only SAS-protected blob URL with an expiration set to at least 4 hours. Required. }, "files": [ { "filename": "str", # Update file name as specified inside import manifest. Required. "url": "str" # Azure Blob location from which the update file can be downloaded by Device Update for IoT Hub. This is typically a read-only SAS-protected blob URL with an expiration set to at least 4 hours. Required. } ], "friendlyName": "str" # Optional. Friendly update name. } ] # response body for status code(s): 200 response == { "compatibility": [ { "str": "str" # List of update compatibility information. Required. } ], "createdDateTime": "2020-02-20 00:00:00", # Date and time in UTC when the update was created. Required. "importedDateTime": "2020-02-20 00:00:00", # Date and time in UTC when the update was imported. Required. "manifestVersion": "str", # Schema version of manifest used to import the update. Required. "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description specified by creator. "etag": "str", # Optional. Update ETag. "friendlyName": "str", # Optional. Friendly update name specified by importer. "installedCriteria": "str", # Optional. String interpreted by Device Update client to determine if the update is installed on the device. Deprecated in latest import manifest schema. "instructions": { "steps": [ { "description": "str", # Optional. Step description. "files": [ "str" # Optional. Collection of file names to be passed to handler during execution. Required if step type is inline. ], "handler": "str", # Optional. Identity of handler that will execute this step. Required if step type is inline. "handlerProperties": {}, # Optional. Parameters to be passed to handler during execution. "type": "inline", # Optional. Default value is "inline". Step type. Known values are: "Inline" and "Reference". "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. } } ] }, "isDeployable": True, # Optional. Default value is True. Whether the update can be deployed to a device on its own. "referencedBy": [ { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. } ], "scanResult": "str", # Optional. Update aggregate scan result (calculated from payload file scan results). "updateType": "str" # Optional. Update type. Deprecated in latest import manifest schema. } """ @overload async def begin_import_update( self, update_to_import: IO, *, content_type: str = "application/json", **kwargs: Any ) -> AsyncLROPoller[JSON]: """Import new update version. This is a long-running-operation; use Operation-Location response header value to check for operation status. :param update_to_import: The update to be imported (see schema https://json.schemastore.org/azure-deviceupdate-import-manifest-5.0.json for details). Required. :type update_to_import: IO :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :keyword str continuation_token: A continuation token to restart a poller from a saved state. :keyword polling: By default, your polling method will be AsyncLROBasePolling. Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy. :paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. :return: An instance of AsyncLROPoller that returns JSON object :rtype: ~azure.core.polling.AsyncLROPoller[JSON] :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "compatibility": [ { "str": "str" # List of update compatibility information. Required. } ], "createdDateTime": "2020-02-20 00:00:00", # Date and time in UTC when the update was created. Required. "importedDateTime": "2020-02-20 00:00:00", # Date and time in UTC when the update was imported. Required. "manifestVersion": "str", # Schema version of manifest used to import the update. Required. "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description specified by creator. "etag": "str", # Optional. Update ETag. "friendlyName": "str", # Optional. Friendly update name specified by importer. "installedCriteria": "str", # Optional. String interpreted by Device Update client to determine if the update is installed on the device. Deprecated in latest import manifest schema. "instructions": { "steps": [ { "description": "str", # Optional. Step description. "files": [ "str" # Optional. Collection of file names to be passed to handler during execution. Required if step type is inline. ], "handler": "str", # Optional. Identity of handler that will execute this step. Required if step type is inline. "handlerProperties": {}, # Optional. Parameters to be passed to handler during execution. "type": "inline", # Optional. Default value is "inline". Step type. Known values are: "Inline" and "Reference". "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. } } ] }, "isDeployable": True, # Optional. Default value is True. Whether the update can be deployed to a device on its own. "referencedBy": [ { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. } ], "scanResult": "str", # Optional. Update aggregate scan result (calculated from payload file scan results). "updateType": "str" # Optional. Update type. Deprecated in latest import manifest schema. } """
[docs] @distributed_trace_async async def begin_import_update(self, update_to_import: Union[List[JSON], IO], **kwargs: Any) -> AsyncLROPoller[JSON]: """Import new update version. This is a long-running-operation; use Operation-Location response header value to check for operation status. :param update_to_import: The update to be imported (see schema https://json.schemastore.org/azure-deviceupdate-import-manifest-5.0.json for details). Is either a list type or a IO type. Required. :type update_to_import: list[JSON] or IO :keyword content_type: Body Parameter content-type. Known values are: 'application/json'. Default value is None. :paramtype content_type: str :keyword str continuation_token: A continuation token to restart a poller from a saved state. :keyword polling: By default, your polling method will be AsyncLROBasePolling. Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy. :paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. :return: An instance of AsyncLROPoller that returns JSON object :rtype: ~azure.core.polling.AsyncLROPoller[JSON] :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "compatibility": [ { "str": "str" # List of update compatibility information. Required. } ], "createdDateTime": "2020-02-20 00:00:00", # Date and time in UTC when the update was created. Required. "importedDateTime": "2020-02-20 00:00:00", # Date and time in UTC when the update was imported. Required. "manifestVersion": "str", # Schema version of manifest used to import the update. Required. "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description specified by creator. "etag": "str", # Optional. Update ETag. "friendlyName": "str", # Optional. Friendly update name specified by importer. "installedCriteria": "str", # Optional. String interpreted by Device Update client to determine if the update is installed on the device. Deprecated in latest import manifest schema. "instructions": { "steps": [ { "description": "str", # Optional. Step description. "files": [ "str" # Optional. Collection of file names to be passed to handler during execution. Required if step type is inline. ], "handler": "str", # Optional. Identity of handler that will execute this step. Required if step type is inline. "handlerProperties": {}, # Optional. Parameters to be passed to handler during execution. "type": "inline", # Optional. Default value is "inline". Step type. Known values are: "Inline" and "Reference". "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. } } ] }, "isDeployable": True, # Optional. Default value is True. Whether the update can be deployed to a device on its own. "referencedBy": [ { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. } ], "scanResult": "str", # Optional. Update aggregate scan result (calculated from payload file scan results). "updateType": "str" # Optional. Update type. Deprecated in latest import manifest schema. } """ _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = kwargs.pop("params", {}) or {} content_type = kwargs.pop("content_type", _headers.pop("Content-Type", None)) # type: Optional[str] cls = kwargs.pop("cls", None) # type: ClsType[JSON] polling = kwargs.pop("polling", True) # type: Union[bool, AsyncPollingMethod] lro_delay = kwargs.pop("polling_interval", self._config.polling_interval) cont_token = kwargs.pop("continuation_token", None) # type: Optional[str] if cont_token is None: raw_result = await self._import_update_initial( # type: ignore update_to_import=update_to_import, content_type=content_type, cls=lambda x, y, z: x, headers=_headers, params=_params, **kwargs ) kwargs.pop("error_map", None) def get_long_running_output(pipeline_response): response = pipeline_response.http_response if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, deserialized, {}) return deserialized path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } if polling is True: polling_method = cast( AsyncPollingMethod, AsyncLROBasePolling(lro_delay, path_format_arguments=path_format_arguments, **kwargs), ) # type: AsyncPollingMethod elif polling is False: polling_method = cast(AsyncPollingMethod, AsyncNoPolling()) else: polling_method = polling if cont_token: return AsyncLROPoller.from_continuation_token( polling_method=polling_method, continuation_token=cont_token, client=self._client, deserialization_callback=get_long_running_output, ) return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method)
[docs] @distributed_trace_async async def get_update( self, provider: str, name: str, version: str, *, if_none_match: Optional[str] = None, **kwargs: Any ) -> JSON: """Get a specific update version. :param provider: Update provider. Required. :type provider: str :param name: Update name. Required. :type name: str :param version: Update version. Required. :type version: str :keyword if_none_match: Defines the If-None-Match condition. The operation will be performed only if the ETag on the server does not match this value. Default value is None. :paramtype if_none_match: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "compatibility": [ { "str": "str" # List of update compatibility information. Required. } ], "createdDateTime": "2020-02-20 00:00:00", # Date and time in UTC when the update was created. Required. "importedDateTime": "2020-02-20 00:00:00", # Date and time in UTC when the update was imported. Required. "manifestVersion": "str", # Schema version of manifest used to import the update. Required. "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description specified by creator. "etag": "str", # Optional. Update ETag. "friendlyName": "str", # Optional. Friendly update name specified by importer. "installedCriteria": "str", # Optional. String interpreted by Device Update client to determine if the update is installed on the device. Deprecated in latest import manifest schema. "instructions": { "steps": [ { "description": "str", # Optional. Step description. "files": [ "str" # Optional. Collection of file names to be passed to handler during execution. Required if step type is inline. ], "handler": "str", # Optional. Identity of handler that will execute this step. Required if step type is inline. "handlerProperties": {}, # Optional. Parameters to be passed to handler during execution. "type": "inline", # Optional. Default value is "inline". Step type. Known values are: "Inline" and "Reference". "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. } } ] }, "isDeployable": True, # Optional. Default value is True. Whether the update can be deployed to a device on its own. "referencedBy": [ { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. } ], "scanResult": "str", # Optional. Update aggregate scan result (calculated from payload file scan results). "updateType": "str" # Optional. Update type. Deprecated in latest import manifest schema. } """ error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] request = build_device_update_get_update_request( provider=provider, name=name, version=version, instance_id=self._config.instance_id, if_none_match=if_none_match, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, cast(JSON, deserialized), {}) return cast(JSON, deserialized)
async def _delete_update_initial( # pylint: disable=inconsistent-return-statements self, provider: str, name: str, version: str, **kwargs: Any ) -> None: error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[None] request = build_device_update_delete_update_request( provider=provider, name=name, version=version, instance_id=self._config.instance_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [202]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) response_headers = {} response_headers["Operation-Location"] = self._deserialize("str", response.headers.get("Operation-Location")) if cls: return cls(pipeline_response, None, response_headers)
[docs] @distributed_trace_async async def begin_delete_update(self, provider: str, name: str, version: str, **kwargs: Any) -> AsyncLROPoller[None]: """Delete a specific update version. This is a long-running-operation; use Operation-Location response header value to check for operation status. :param provider: Update provider. Required. :type provider: str :param name: Update name. Required. :type name: str :param version: Update version. Required. :type version: str :keyword str continuation_token: A continuation token to restart a poller from a saved state. :keyword polling: By default, your polling method will be AsyncLROBasePolling. Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy. :paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. :return: An instance of AsyncLROPoller that returns None :rtype: ~azure.core.polling.AsyncLROPoller[None] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[None] polling = kwargs.pop("polling", True) # type: Union[bool, AsyncPollingMethod] lro_delay = kwargs.pop("polling_interval", self._config.polling_interval) cont_token = kwargs.pop("continuation_token", None) # type: Optional[str] if cont_token is None: raw_result = await self._delete_update_initial( # type: ignore provider=provider, name=name, version=version, cls=lambda x, y, z: x, headers=_headers, params=_params, **kwargs ) kwargs.pop("error_map", None) def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements if cls: return cls(pipeline_response, None, {}) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } if polling is True: polling_method = cast( AsyncPollingMethod, AsyncLROBasePolling(lro_delay, path_format_arguments=path_format_arguments, **kwargs), ) # type: AsyncPollingMethod elif polling is False: polling_method = cast(AsyncPollingMethod, AsyncNoPolling()) else: polling_method = polling if cont_token: return AsyncLROPoller.from_continuation_token( polling_method=polling_method, continuation_token=cont_token, client=self._client, deserialization_callback=get_long_running_output, ) return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method)
[docs] @distributed_trace def list_providers(self, **kwargs: Any) -> AsyncIterable[str]: """Get a list of all update providers that have been imported to Device Update for IoT Hub. :return: An iterator like instance of str :rtype: ~azure.core.async_paging.AsyncItemPaged[str] :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == "str" # Optional. """ _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) def prepare_request(next_link=None): if not next_link: request = build_device_update_list_providers_request( instance_id=self._config.instance_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore else: # make call to next link with the client's api-version _parsed_next_link = urlparse(next_link) _next_request_params = case_insensitive_dict(parse_qs(_parsed_next_link.query)) _next_request_params["api-version"] = self._config.api_version request = HttpRequest("GET", urljoin(next_link, _parsed_next_link.path), params=_next_request_params) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore return request async def extract_data(pipeline_response): deserialized = pipeline_response.http_response.json() list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) return deserialized.get("nextLink", None), AsyncList(list_of_elem) async def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return AsyncItemPaged(get_next, extract_data)
[docs] @distributed_trace def list_names(self, provider: str, **kwargs: Any) -> AsyncIterable[str]: """Get a list of all update names that match the specified provider. :param provider: Update provider. Required. :type provider: str :return: An iterator like instance of str :rtype: ~azure.core.async_paging.AsyncItemPaged[str] :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == "str" # Optional. """ _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) def prepare_request(next_link=None): if not next_link: request = build_device_update_list_names_request( provider=provider, instance_id=self._config.instance_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore else: # make call to next link with the client's api-version _parsed_next_link = urlparse(next_link) _next_request_params = case_insensitive_dict(parse_qs(_parsed_next_link.query)) _next_request_params["api-version"] = self._config.api_version request = HttpRequest("GET", urljoin(next_link, _parsed_next_link.path), params=_next_request_params) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore return request async def extract_data(pipeline_response): deserialized = pipeline_response.http_response.json() list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) return deserialized.get("nextLink", None), AsyncList(list_of_elem) async def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return AsyncItemPaged(get_next, extract_data)
[docs] @distributed_trace def list_versions( self, provider: str, name: str, *, filter: Optional[str] = None, **kwargs: Any ) -> AsyncIterable[str]: """Get a list of all update versions that match the specified provider and name. :param provider: Update provider. Required. :type provider: str :param name: Update name. Required. :type name: str :keyword filter: Optional to filter updates by isDeployable property. Default value is None. :paramtype filter: str :return: An iterator like instance of str :rtype: ~azure.core.async_paging.AsyncItemPaged[str] :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == "str" # Optional. """ _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) def prepare_request(next_link=None): if not next_link: request = build_device_update_list_versions_request( provider=provider, name=name, instance_id=self._config.instance_id, filter=filter, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore else: # make call to next link with the client's api-version _parsed_next_link = urlparse(next_link) _next_request_params = case_insensitive_dict(parse_qs(_parsed_next_link.query)) _next_request_params["api-version"] = self._config.api_version request = HttpRequest("GET", urljoin(next_link, _parsed_next_link.path), params=_next_request_params) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore return request async def extract_data(pipeline_response): deserialized = pipeline_response.http_response.json() list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) return deserialized.get("nextLink", None), AsyncList(list_of_elem) async def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return AsyncItemPaged(get_next, extract_data)
[docs] @distributed_trace def list_files(self, provider: str, name: str, version: str, **kwargs: Any) -> AsyncIterable[str]: """Get a list of all update file identifiers for the specified version. :param provider: Update provider. Required. :type provider: str :param name: Update name. Required. :type name: str :param version: Update version. Required. :type version: str :return: An iterator like instance of str :rtype: ~azure.core.async_paging.AsyncItemPaged[str] :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == "str" # Optional. """ _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) def prepare_request(next_link=None): if not next_link: request = build_device_update_list_files_request( provider=provider, name=name, version=version, instance_id=self._config.instance_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore else: # make call to next link with the client's api-version _parsed_next_link = urlparse(next_link) _next_request_params = case_insensitive_dict(parse_qs(_parsed_next_link.query)) _next_request_params["api-version"] = self._config.api_version request = HttpRequest("GET", urljoin(next_link, _parsed_next_link.path), params=_next_request_params) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore return request async def extract_data(pipeline_response): deserialized = pipeline_response.http_response.json() list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) return deserialized.get("nextLink", None), AsyncList(list_of_elem) async def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return AsyncItemPaged(get_next, extract_data)
[docs] @distributed_trace_async async def get_file( self, provider: str, name: str, version: str, file_id: str, *, if_none_match: Optional[str] = None, **kwargs: Any ) -> JSON: """Get a specific update file from the version. :param provider: Update provider. Required. :type provider: str :param name: Update name. Required. :type name: str :param version: Update version. Required. :type version: str :param file_id: File identifier. Required. :type file_id: str :keyword if_none_match: Defines the If-None-Match condition. The operation will be performed only if the ETag on the server does not match this value. Default value is None. :paramtype if_none_match: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "fileId": "str", # File identity, generated by server at import time. Required. "fileName": "str", # File name. Required. "hashes": { "str": "str" # Mapping of hashing algorithm to base64 encoded hash values. Required. }, "sizeInBytes": 0, # File size in number of bytes. Required. "downloadHandler": { "id": "str" # Download handler identifier. Required. }, "etag": "str", # Optional. File ETag. "mimeType": "str", # Optional. File MIME type. "properties": { "str": "str" # Optional. Optional file properties (not consumed by service but pass-through to device). }, "relatedFiles": [ { "fileName": "str", # File name. Required. "hashes": { "str": "str" # Mapping of hashing algorithm to base64 encoded hash values. Required. }, "sizeInBytes": 0, # File size in number of bytes. Required. "mimeType": "str", # Optional. File MIME type. "properties": { "str": "str" # Optional. Optional file properties (not consumed by service but pass-through to device). }, "scanDetails": "str", # Optional. Anti-malware scan details. "scanResult": "str" # Optional. Anti-malware scan result. } ], "scanDetails": "str", # Optional. Anti-malware scan details. "scanResult": "str" # Optional. Anti-malware scan result. } """ error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] request = build_device_update_get_file_request( provider=provider, name=name, version=version, file_id=file_id, instance_id=self._config.instance_id, if_none_match=if_none_match, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, cast(JSON, deserialized), {}) return cast(JSON, deserialized)
[docs] @distributed_trace def list_operation_statuses( self, *, filter: Optional[str] = None, top: Optional[int] = None, **kwargs: Any ) -> AsyncIterable[JSON]: """Get a list of all import update operations. Completed operations are kept for 7 days before auto-deleted. Delete operations are not returned by this API version. :keyword filter: Optional to filter operations by status property. Only one specific filter is supported: "status eq 'NotStarted' or status eq 'Running'". Default value is None. :paramtype filter: str :keyword top: Specifies a non-negative integer n that limits the number of items returned from a collection. The service returns the number of available items up to but not greater than the specified value n. Default value is None. :paramtype top: int :return: An iterator like instance of JSON object :rtype: ~azure.core.async_paging.AsyncItemPaged[JSON] :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "createdDateTime": "2020-02-20 00:00:00", # Date and time in UTC when the operation was created. Required. "lastActionDateTime": "2020-02-20 00:00:00", # Date and time in UTC when the operation status was last updated. Required. "operationId": "str", # Operation Id. Required. "status": "str", # Operation status. Required. Known values are: "NotStarted", "Running", "Succeeded", and "Failed". "error": { "code": "str", # Server defined error code. Required. "message": "str", # A human-readable representation of the error. Required. "details": [ ... ], "innererror": { "code": "str", # A more specific error code than what was provided by the containing error. Required. "errorDetail": "str", # Optional. The internal error or exception message. "innerError": ..., "message": "str" # Optional. A human-readable representation of the error. }, "occurredDateTime": "2020-02-20 00:00:00", # Optional. Date and time in UTC when the error occurred. "target": "str" # Optional. The target of the error. }, "etag": "str", # Optional. Operation ETag. "resourceLocation": "str", # Optional. Location of the imported update when operation is successful. "traceId": "str", # Optional. Operation correlation identity that can used by Microsoft Support for troubleshooting. "update": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. } } """ _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) def prepare_request(next_link=None): if not next_link: request = build_device_update_list_operation_statuses_request( instance_id=self._config.instance_id, filter=filter, top=top, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore else: # make call to next link with the client's api-version _parsed_next_link = urlparse(next_link) _next_request_params = case_insensitive_dict(parse_qs(_parsed_next_link.query)) _next_request_params["api-version"] = self._config.api_version request = HttpRequest("GET", urljoin(next_link, _parsed_next_link.path), params=_next_request_params) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore return request async def extract_data(pipeline_response): deserialized = pipeline_response.http_response.json() list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) return deserialized.get("nextLink", None), AsyncList(list_of_elem) async def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return AsyncItemPaged(get_next, extract_data)
[docs] @distributed_trace_async async def get_operation_status( self, operation_id: str, *, if_none_match: Optional[str] = None, **kwargs: Any ) -> JSON: """Retrieve operation status. :param operation_id: Operation identifier. Required. :type operation_id: str :keyword if_none_match: Defines the If-None-Match condition. The operation will be performed only if the ETag on the server does not match this value. Default value is None. :paramtype if_none_match: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "createdDateTime": "2020-02-20 00:00:00", # Date and time in UTC when the operation was created. Required. "lastActionDateTime": "2020-02-20 00:00:00", # Date and time in UTC when the operation status was last updated. Required. "operationId": "str", # Operation Id. Required. "status": "str", # Operation status. Required. Known values are: "NotStarted", "Running", "Succeeded", and "Failed". "error": { "code": "str", # Server defined error code. Required. "message": "str", # A human-readable representation of the error. Required. "details": [ ... ], "innererror": { "code": "str", # A more specific error code than what was provided by the containing error. Required. "errorDetail": "str", # Optional. The internal error or exception message. "innerError": ..., "message": "str" # Optional. A human-readable representation of the error. }, "occurredDateTime": "2020-02-20 00:00:00", # Optional. Date and time in UTC when the error occurred. "target": "str" # Optional. The target of the error. }, "etag": "str", # Optional. Operation ETag. "resourceLocation": "str", # Optional. Location of the imported update when operation is successful. "traceId": "str", # Optional. Operation correlation identity that can used by Microsoft Support for troubleshooting. "update": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. } } """ error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] request = build_device_update_get_operation_status_request( operation_id=operation_id, instance_id=self._config.instance_id, if_none_match=if_none_match, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) response_headers = {} response_headers["Retry-After"] = self._deserialize("str", response.headers.get("Retry-After")) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, cast(JSON, deserialized), response_headers) return cast(JSON, deserialized)
[docs]class DeviceManagementOperations: # pylint: disable=too-many-public-methods """ .. warning:: **DO NOT** instantiate this class directly. Instead, you should access the following operations through :class:`~azure.iot.deviceupdate.aio.DeviceUpdateClient`'s :attr:`device_management` attribute. """ def __init__(self, *args, **kwargs) -> None: input_args = list(args) self._client = input_args.pop(0) if input_args else kwargs.pop("client") self._config = input_args.pop(0) if input_args else kwargs.pop("config") self._serialize = input_args.pop(0) if input_args else kwargs.pop("serializer") self._deserialize = input_args.pop(0) if input_args else kwargs.pop("deserializer")
[docs] @distributed_trace def list_device_classes(self, *, filter: Optional[str] = None, **kwargs: Any) -> AsyncIterable[JSON]: """Gets a list of all device classes (sets of devices compatible with the same updates based on the model Id and compat properties reported in the Device Update PnP interface in IoT Hub) for all devices connected to Device Update for IoT Hub. :keyword filter: Restricts the set of device classes returned. You can filter on friendly name. Default value is None. :paramtype filter: str :return: An iterator like instance of JSON object :rtype: ~azure.core.async_paging.AsyncItemPaged[JSON] :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "deviceClassId": "str", # The device class identifier. This is generated from the model Id and the compat properties reported by the device update agent in the Device Update PnP interface in IoT Hub. It is a hex-encoded SHA1 hash. Required. "deviceClassProperties": { "compatProperties": { "str": "str" # The compat properties of the device class. This object can be thought of as a set of key-value pairs where the key is the name of the compatibility property and the value is the value of the compatibility property. There will always be at least 1 compat property. Required. }, "contractModel": { "id": "str", # The Device Update agent contract model Id of the device class. This is also used to calculate the device class Id. Required. "name": "str" # The Device Update agent contract model name of the device class. Intended to be a more readable form of the contract model Id. Required. } }, "bestCompatibleUpdate": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. }, "friendlyName": "str" # Optional. The device class friendly name. This can be updated by callers after the device class has been automatically created. } """ _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) def prepare_request(next_link=None): if not next_link: request = build_device_management_list_device_classes_request( instance_id=self._config.instance_id, filter=filter, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore else: # make call to next link with the client's api-version _parsed_next_link = urlparse(next_link) _next_request_params = case_insensitive_dict(parse_qs(_parsed_next_link.query)) _next_request_params["api-version"] = self._config.api_version request = HttpRequest("GET", urljoin(next_link, _parsed_next_link.path), params=_next_request_params) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore return request async def extract_data(pipeline_response): deserialized = pipeline_response.http_response.json() list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) return deserialized.get("nextLink", None), AsyncList(list_of_elem) async def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return AsyncItemPaged(get_next, extract_data)
[docs] @distributed_trace_async async def get_device_class(self, device_class_id: str, **kwargs: Any) -> JSON: """Gets the properties of a device class. :param device_class_id: Device class identifier. Required. :type device_class_id: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "deviceClassId": "str", # The device class identifier. This is generated from the model Id and the compat properties reported by the device update agent in the Device Update PnP interface in IoT Hub. It is a hex-encoded SHA1 hash. Required. "deviceClassProperties": { "compatProperties": { "str": "str" # The compat properties of the device class. This object can be thought of as a set of key-value pairs where the key is the name of the compatibility property and the value is the value of the compatibility property. There will always be at least 1 compat property. Required. }, "contractModel": { "id": "str", # The Device Update agent contract model Id of the device class. This is also used to calculate the device class Id. Required. "name": "str" # The Device Update agent contract model name of the device class. Intended to be a more readable form of the contract model Id. Required. } }, "bestCompatibleUpdate": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. }, "friendlyName": "str" # Optional. The device class friendly name. This can be updated by callers after the device class has been automatically created. } """ error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] request = build_device_management_get_device_class_request( device_class_id=device_class_id, instance_id=self._config.instance_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, cast(JSON, deserialized), {}) return cast(JSON, deserialized)
@overload async def update_device_class( self, device_class_id: str, device_class_patch: JSON, *, content_type: str = "application/merge-patch+json", **kwargs: Any ) -> JSON: """Update device class details. :param device_class_id: Device class identifier. Required. :type device_class_id: str :param device_class_patch: The device class json merge patch body. Currently only supports patching friendlyName. Required. :type device_class_patch: JSON :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/merge-patch+json". :paramtype content_type: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # JSON input template you can fill out and use as your body input. device_class_patch = { "friendlyName": "str" # The device class friendly name. Friendly name can be 1-100 characters, alphanumeric, dot, and dash. Required. } # response body for status code(s): 200 response == { "deviceClassId": "str", # The device class identifier. This is generated from the model Id and the compat properties reported by the device update agent in the Device Update PnP interface in IoT Hub. It is a hex-encoded SHA1 hash. Required. "deviceClassProperties": { "compatProperties": { "str": "str" # The compat properties of the device class. This object can be thought of as a set of key-value pairs where the key is the name of the compatibility property and the value is the value of the compatibility property. There will always be at least 1 compat property. Required. }, "contractModel": { "id": "str", # The Device Update agent contract model Id of the device class. This is also used to calculate the device class Id. Required. "name": "str" # The Device Update agent contract model name of the device class. Intended to be a more readable form of the contract model Id. Required. } }, "bestCompatibleUpdate": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. }, "friendlyName": "str" # Optional. The device class friendly name. This can be updated by callers after the device class has been automatically created. } """ @overload async def update_device_class( self, device_class_id: str, device_class_patch: IO, *, content_type: str = "application/merge-patch+json", **kwargs: Any ) -> JSON: """Update device class details. :param device_class_id: Device class identifier. Required. :type device_class_id: str :param device_class_patch: The device class json merge patch body. Currently only supports patching friendlyName. Required. :type device_class_patch: IO :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/merge-patch+json". :paramtype content_type: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "deviceClassId": "str", # The device class identifier. This is generated from the model Id and the compat properties reported by the device update agent in the Device Update PnP interface in IoT Hub. It is a hex-encoded SHA1 hash. Required. "deviceClassProperties": { "compatProperties": { "str": "str" # The compat properties of the device class. This object can be thought of as a set of key-value pairs where the key is the name of the compatibility property and the value is the value of the compatibility property. There will always be at least 1 compat property. Required. }, "contractModel": { "id": "str", # The Device Update agent contract model Id of the device class. This is also used to calculate the device class Id. Required. "name": "str" # The Device Update agent contract model name of the device class. Intended to be a more readable form of the contract model Id. Required. } }, "bestCompatibleUpdate": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. }, "friendlyName": "str" # Optional. The device class friendly name. This can be updated by callers after the device class has been automatically created. } """
[docs] @distributed_trace_async async def update_device_class( self, device_class_id: str, device_class_patch: Union[JSON, IO], **kwargs: Any ) -> JSON: """Update device class details. :param device_class_id: Device class identifier. Required. :type device_class_id: str :param device_class_patch: The device class json merge patch body. Currently only supports patching friendlyName. Is either a model type or a IO type. Required. :type device_class_patch: JSON or IO :keyword content_type: Body Parameter content-type. Known values are: 'application/merge-patch+json'. Default value is None. :paramtype content_type: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "deviceClassId": "str", # The device class identifier. This is generated from the model Id and the compat properties reported by the device update agent in the Device Update PnP interface in IoT Hub. It is a hex-encoded SHA1 hash. Required. "deviceClassProperties": { "compatProperties": { "str": "str" # The compat properties of the device class. This object can be thought of as a set of key-value pairs where the key is the name of the compatibility property and the value is the value of the compatibility property. There will always be at least 1 compat property. Required. }, "contractModel": { "id": "str", # The Device Update agent contract model Id of the device class. This is also used to calculate the device class Id. Required. "name": "str" # The Device Update agent contract model name of the device class. Intended to be a more readable form of the contract model Id. Required. } }, "bestCompatibleUpdate": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. }, "friendlyName": "str" # Optional. The device class friendly name. This can be updated by callers after the device class has been automatically created. } """ error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = kwargs.pop("params", {}) or {} content_type = kwargs.pop("content_type", _headers.pop("Content-Type", None)) # type: Optional[str] cls = kwargs.pop("cls", None) # type: ClsType[JSON] content_type = content_type or "application/merge-patch+json" _json = None _content = None if isinstance(device_class_patch, (IO, bytes)): _content = device_class_patch else: _json = device_class_patch request = build_device_management_update_device_class_request( device_class_id=device_class_id, instance_id=self._config.instance_id, content_type=content_type, api_version=self._config.api_version, json=_json, content=_content, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, cast(JSON, deserialized), {}) return cast(JSON, deserialized)
[docs] @distributed_trace_async async def delete_device_class( # pylint: disable=inconsistent-return-statements self, device_class_id: str, **kwargs: Any ) -> None: """Deletes a device class. Device classes are created automatically when Device Update-enabled devices are connected to the hub but are not automatically cleaned up since they are referenced by DeviceClassSubgroups. If the user has deleted all DeviceClassSubgroups for a device class they can also delete the device class to remove the records from the system and to stop checking the compatibility of this device class with new updates. If a device is ever reconnected for this device class it will be re-created. :param device_class_id: Device class identifier. Required. :type device_class_id: str :return: None :rtype: None :raises ~azure.core.exceptions.HttpResponseError: """ error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[None] request = build_device_management_delete_device_class_request( device_class_id=device_class_id, instance_id=self._config.instance_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [204]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if cls: return cls(pipeline_response, None, {})
[docs] @distributed_trace def list_installable_updates_for_device_class(self, device_class_id: str, **kwargs: Any) -> AsyncIterable[JSON]: """Gets a list of installable updates for a device class. :param device_class_id: Device class identifier. Required. :type device_class_id: str :return: An iterator like instance of JSON object :rtype: ~azure.core.async_paging.AsyncItemPaged[JSON] :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. } """ _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) def prepare_request(next_link=None): if not next_link: request = build_device_management_list_installable_updates_for_device_class_request( device_class_id=device_class_id, instance_id=self._config.instance_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore else: # make call to next link with the client's api-version _parsed_next_link = urlparse(next_link) _next_request_params = case_insensitive_dict(parse_qs(_parsed_next_link.query)) _next_request_params["api-version"] = self._config.api_version request = HttpRequest("GET", urljoin(next_link, _parsed_next_link.path), params=_next_request_params) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore return request async def extract_data(pipeline_response): deserialized = pipeline_response.http_response.json() list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) return deserialized.get("nextLink", None), AsyncList(list_of_elem) async def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return AsyncItemPaged(get_next, extract_data)
[docs] @distributed_trace def list_devices(self, *, filter: Optional[str] = None, **kwargs: Any) -> AsyncIterable[JSON]: """Gets a list of devices connected to Device Update for IoT Hub. :keyword filter: Restricts the set of devices returned. You can filter on GroupId, DeviceClassId, or GroupId and DeploymentStatus. Use DeploymentStatus eq null to query for devices with no deployment status (that have never been deployed to). Default value is None. :paramtype filter: str :return: An iterator like instance of JSON object :rtype: ~azure.core.async_paging.AsyncItemPaged[JSON] :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "deviceClassId": "str", # Device class identity. Required. "deviceId": "str", # Device identity. Required. "onLatestUpdate": bool, # Boolean flag indicating whether the latest update (the best compatible update for the device's device class and group) is installed on the device. Required. "deploymentStatus": "str", # Optional. State of the device in its last deployment. Known values are: "Succeeded", "InProgress", "Canceled", and "Failed". "groupId": "str", # Optional. Device group identity. "installedUpdate": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. }, "lastAttemptedUpdate": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. }, "lastDeploymentId": "str", # Optional. The deployment identifier for the last deployment to the device. "lastInstallResult": { "extendedResultCode": 0, # Install extended result code. Required. "resultCode": 0, # Install result code. Required. "resultDetails": "str", # Optional. A string containing further details about the install result. "stepResults": [ { "extendedResultCode": 0, # Install extended result code. Required. "resultCode": 0, # Install result code. Required. "description": "str", # Optional. Step description. "resultDetails": "str", # Optional. A string containing further details about the install result. "update": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. } } ] }, "moduleId": "str" # Optional. Device module identity. } """ _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) def prepare_request(next_link=None): if not next_link: request = build_device_management_list_devices_request( instance_id=self._config.instance_id, filter=filter, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore else: # make call to next link with the client's api-version _parsed_next_link = urlparse(next_link) _next_request_params = case_insensitive_dict(parse_qs(_parsed_next_link.query)) _next_request_params["api-version"] = self._config.api_version request = HttpRequest("GET", urljoin(next_link, _parsed_next_link.path), params=_next_request_params) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore return request async def extract_data(pipeline_response): deserialized = pipeline_response.http_response.json() list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) return deserialized.get("nextLink", None), AsyncList(list_of_elem) async def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return AsyncItemPaged(get_next, extract_data)
async def _import_devices_initial( # pylint: disable=inconsistent-return-statements self, import_type: str, **kwargs: Any ) -> None: error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = kwargs.pop("params", {}) or {} content_type = kwargs.pop("content_type", _headers.pop("Content-Type", "application/json")) # type: str cls = kwargs.pop("cls", None) # type: ClsType[None] _json = import_type request = build_device_management_import_devices_request( instance_id=self._config.instance_id, content_type=content_type, api_version=self._config.api_version, json=_json, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [202]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) response_headers = {} response_headers["Operation-Location"] = self._deserialize("str", response.headers.get("Operation-Location")) if cls: return cls(pipeline_response, None, response_headers)
[docs] @distributed_trace_async async def begin_import_devices(self, import_type: str, **kwargs: Any) -> AsyncLROPoller[None]: """Import existing devices from IoT Hub. This is a long-running-operation; use Operation-Location response header value to check for operation status. :param import_type: The types of devices to import. Known values are: "Devices", "Modules", and "All". Required. :type import_type: str :keyword str continuation_token: A continuation token to restart a poller from a saved state. :keyword polling: By default, your polling method will be AsyncLROBasePolling. Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy. :paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. :return: An instance of AsyncLROPoller that returns None :rtype: ~azure.core.polling.AsyncLROPoller[None] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = kwargs.pop("params", {}) or {} content_type = kwargs.pop("content_type", _headers.pop("Content-Type", "application/json")) # type: str cls = kwargs.pop("cls", None) # type: ClsType[None] polling = kwargs.pop("polling", True) # type: Union[bool, AsyncPollingMethod] lro_delay = kwargs.pop("polling_interval", self._config.polling_interval) cont_token = kwargs.pop("continuation_token", None) # type: Optional[str] if cont_token is None: raw_result = await self._import_devices_initial( # type: ignore import_type=import_type, content_type=content_type, cls=lambda x, y, z: x, headers=_headers, params=_params, **kwargs ) kwargs.pop("error_map", None) def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements if cls: return cls(pipeline_response, None, {}) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } if polling is True: polling_method = cast( AsyncPollingMethod, AsyncLROBasePolling(lro_delay, path_format_arguments=path_format_arguments, **kwargs), ) # type: AsyncPollingMethod elif polling is False: polling_method = cast(AsyncPollingMethod, AsyncNoPolling()) else: polling_method = polling if cont_token: return AsyncLROPoller.from_continuation_token( polling_method=polling_method, continuation_token=cont_token, client=self._client, deserialization_callback=get_long_running_output, ) return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method)
[docs] @distributed_trace_async async def get_device(self, device_id: str, **kwargs: Any) -> JSON: """Gets the device properties and latest deployment status for a device connected to Device Update for IoT Hub. :param device_id: Device identifier in Azure IoT Hub. Required. :type device_id: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "deviceClassId": "str", # Device class identity. Required. "deviceId": "str", # Device identity. Required. "onLatestUpdate": bool, # Boolean flag indicating whether the latest update (the best compatible update for the device's device class and group) is installed on the device. Required. "deploymentStatus": "str", # Optional. State of the device in its last deployment. Known values are: "Succeeded", "InProgress", "Canceled", and "Failed". "groupId": "str", # Optional. Device group identity. "installedUpdate": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. }, "lastAttemptedUpdate": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. }, "lastDeploymentId": "str", # Optional. The deployment identifier for the last deployment to the device. "lastInstallResult": { "extendedResultCode": 0, # Install extended result code. Required. "resultCode": 0, # Install result code. Required. "resultDetails": "str", # Optional. A string containing further details about the install result. "stepResults": [ { "extendedResultCode": 0, # Install extended result code. Required. "resultCode": 0, # Install result code. Required. "description": "str", # Optional. Step description. "resultDetails": "str", # Optional. A string containing further details about the install result. "update": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. } } ] }, "moduleId": "str" # Optional. Device module identity. } """ error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] request = build_device_management_get_device_request( device_id=device_id, instance_id=self._config.instance_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, cast(JSON, deserialized), {}) return cast(JSON, deserialized)
[docs] @distributed_trace_async async def get_device_module(self, device_id: str, module_id: str, **kwargs: Any) -> JSON: """Gets the device module properties and latest deployment status for a device module connected to Device Update for IoT Hub. :param device_id: Device identifier in Azure IoT Hub. Required. :type device_id: str :param module_id: Device module identifier in Azure IoT Hub. Required. :type module_id: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "deviceClassId": "str", # Device class identity. Required. "deviceId": "str", # Device identity. Required. "onLatestUpdate": bool, # Boolean flag indicating whether the latest update (the best compatible update for the device's device class and group) is installed on the device. Required. "deploymentStatus": "str", # Optional. State of the device in its last deployment. Known values are: "Succeeded", "InProgress", "Canceled", and "Failed". "groupId": "str", # Optional. Device group identity. "installedUpdate": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. }, "lastAttemptedUpdate": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. }, "lastDeploymentId": "str", # Optional. The deployment identifier for the last deployment to the device. "lastInstallResult": { "extendedResultCode": 0, # Install extended result code. Required. "resultCode": 0, # Install result code. Required. "resultDetails": "str", # Optional. A string containing further details about the install result. "stepResults": [ { "extendedResultCode": 0, # Install extended result code. Required. "resultCode": 0, # Install result code. Required. "description": "str", # Optional. Step description. "resultDetails": "str", # Optional. A string containing further details about the install result. "update": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. } } ] }, "moduleId": "str" # Optional. Device module identity. } """ error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] request = build_device_management_get_device_module_request( device_id=device_id, module_id=module_id, instance_id=self._config.instance_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, cast(JSON, deserialized), {}) return cast(JSON, deserialized)
[docs] @distributed_trace_async async def get_update_compliance(self, **kwargs: Any) -> JSON: """Gets the breakdown of how many devices are on their latest update, have new updates available, or are in progress receiving new updates. :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "newUpdatesAvailableDeviceCount": 0, # Number of devices with a newer update available. Required. "onLatestUpdateDeviceCount": 0, # Number of devices on the latest update. Required. "totalDeviceCount": 0, # Total number of devices. Required. "updatesInProgressDeviceCount": 0 # Number of devices with update in-progress. Required. } """ error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] request = build_device_management_get_update_compliance_request( instance_id=self._config.instance_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, cast(JSON, deserialized), {}) return cast(JSON, deserialized)
[docs] @distributed_trace def list_groups(self, *, order_by: Optional[str] = None, **kwargs: Any) -> AsyncIterable[JSON]: """Gets a list of all device groups. The $default group will always be returned first. :keyword order_by: Orders the set of groups returned. You can order by groupId, deviceCount, createdDate, subgroupsWithNewUpdatesAvailableCount, subgroupsWithUpdatesInProgressCount, or subgroupsOnLatestUpdateCount. Default value is None. :paramtype order_by: str :return: An iterator like instance of JSON object :rtype: ~azure.core.async_paging.AsyncItemPaged[JSON] :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "createdDateTime": "str", # Date and time when the update was created. Required. "groupId": "str", # Group identity. This is created from the value of the ADUGroup tag in the Iot Hub's device/module twin or $default for devices with no tag. Required. "groupType": "str", # Group type. Required. Known values are: "IoTHubTag" and "DefaultNoTag". "deployments": [ "str" # Optional. The active deployment Ids for the group. ], "deviceCount": 0, # Optional. The number of devices in the group. "subgroupsWithNewUpdatesAvailableCount": 0, # Optional. The count of subgroups with new updates available. "subgroupsWithOnLatestUpdateCount": 0, # Optional. The count of subgroups with devices on the latest update. "subgroupsWithUpdatesInProgressCount": 0 # Optional. The count of subgroups with updates in progress. } """ _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) def prepare_request(next_link=None): if not next_link: request = build_device_management_list_groups_request( instance_id=self._config.instance_id, order_by=order_by, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore else: # make call to next link with the client's api-version _parsed_next_link = urlparse(next_link) _next_request_params = case_insensitive_dict(parse_qs(_parsed_next_link.query)) _next_request_params["api-version"] = self._config.api_version request = HttpRequest("GET", urljoin(next_link, _parsed_next_link.path), params=_next_request_params) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore return request async def extract_data(pipeline_response): deserialized = pipeline_response.http_response.json() list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) return deserialized.get("nextLink", None), AsyncList(list_of_elem) async def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return AsyncItemPaged(get_next, extract_data)
[docs] @distributed_trace_async async def get_group(self, group_id: str, **kwargs: Any) -> JSON: """Gets the device group properties. :param group_id: Group identifier. Required. :type group_id: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "createdDateTime": "str", # Date and time when the update was created. Required. "groupId": "str", # Group identity. This is created from the value of the ADUGroup tag in the Iot Hub's device/module twin or $default for devices with no tag. Required. "groupType": "str", # Group type. Required. Known values are: "IoTHubTag" and "DefaultNoTag". "deployments": [ "str" # Optional. The active deployment Ids for the group. ], "deviceCount": 0, # Optional. The number of devices in the group. "subgroupsWithNewUpdatesAvailableCount": 0, # Optional. The count of subgroups with new updates available. "subgroupsWithOnLatestUpdateCount": 0, # Optional. The count of subgroups with devices on the latest update. "subgroupsWithUpdatesInProgressCount": 0 # Optional. The count of subgroups with updates in progress. } """ error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] request = build_device_management_get_group_request( group_id=group_id, instance_id=self._config.instance_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, cast(JSON, deserialized), {}) return cast(JSON, deserialized)
[docs] @distributed_trace_async async def delete_group( # pylint: disable=inconsistent-return-statements self, group_id: str, **kwargs: Any ) -> None: """Deletes a device group. This group is automatically created when a Device Update-enabled device is connected to the hub and reports its properties. Groups, subgroups, and deployments are not automatically cleaned up but are retained for history purposes. Users can call this method to delete a group if they do not need to retain any of the history of the group and no longer need it. If a device is ever connected again for this group after the group was deleted it will be automatically re-created but there will be no history. :param group_id: Group identifier. Required. :type group_id: str :return: None :rtype: None :raises ~azure.core.exceptions.HttpResponseError: """ error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[None] request = build_device_management_delete_group_request( group_id=group_id, instance_id=self._config.instance_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [204]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if cls: return cls(pipeline_response, None, {})
[docs] @distributed_trace_async async def get_update_compliance_for_group(self, group_id: str, **kwargs: Any) -> JSON: """Get device group update compliance information such as how many devices are on their latest update, how many need new updates, and how many are in progress on receiving a new update. :param group_id: Group identifier. Required. :type group_id: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "newUpdatesAvailableDeviceCount": 0, # Number of devices with a newer update available. Required. "onLatestUpdateDeviceCount": 0, # Number of devices on the latest update. Required. "totalDeviceCount": 0, # Total number of devices. Required. "updatesInProgressDeviceCount": 0 # Number of devices with update in-progress. Required. } """ error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] request = build_device_management_get_update_compliance_for_group_request( group_id=group_id, instance_id=self._config.instance_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, cast(JSON, deserialized), {}) return cast(JSON, deserialized)
[docs] @distributed_trace def list_best_updates_for_group(self, group_id: str, **kwargs: Any) -> AsyncIterable[JSON]: """Get the best available updates for a device group and a count of how many devices need each update. :param group_id: Group identifier. Required. :type group_id: str :return: An iterator like instance of JSON object :rtype: ~azure.core.async_paging.AsyncItemPaged[JSON] :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "deviceClassId": "str", # The device class subgroup's device class Id. Required. "deviceCount": 0, # Total number of devices for which the update is applicable. Required. "groupId": "str", # The group Id. Required. "update": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. } } """ _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) def prepare_request(next_link=None): if not next_link: request = build_device_management_list_best_updates_for_group_request( group_id=group_id, instance_id=self._config.instance_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore else: # make call to next link with the client's api-version _parsed_next_link = urlparse(next_link) _next_request_params = case_insensitive_dict(parse_qs(_parsed_next_link.query)) _next_request_params["api-version"] = self._config.api_version request = HttpRequest("GET", urljoin(next_link, _parsed_next_link.path), params=_next_request_params) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore return request async def extract_data(pipeline_response): deserialized = pipeline_response.http_response.json() list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) return deserialized.get("nextLink", None), AsyncList(list_of_elem) async def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return AsyncItemPaged(get_next, extract_data)
[docs] @distributed_trace def list_deployments_for_group( self, group_id: str, *, order_by: Optional[str] = None, **kwargs: Any ) -> AsyncIterable[JSON]: """Gets a list of deployments for a device group. :param group_id: Group identifier. Required. :type group_id: str :keyword order_by: Orders the set of deployments returned. You can order by start date. Default value is None. :paramtype order_by: str :return: An iterator like instance of JSON object :rtype: ~azure.core.async_paging.AsyncItemPaged[JSON] :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "deploymentId": "str", # The caller-provided deployment identifier. This cannot be longer than 73 characters, must be all lower-case, and cannot contain '&', '^', '[', ']', '{', '}', '|', '<', '>', forward slash, backslash, or double quote. The Updates view in the Azure Portal IoT Hub resource generates a GUID for deploymentId when you create a deployment. Required. "groupId": "str", # The group identity for the devices the deployment is intended to update. Required. "startDateTime": "2020-02-20 00:00:00", # The deployment start datetime. Required. "update": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. }, "deviceClassSubgroups": [ "str" # Optional. The device class subgroups the deployment is compatible with and subgroup deployments have been created for. This is not provided by the caller during CreateOrUpdateDeployment but is automatically determined by Device Update. ], "isCanceled": bool, # Optional. Boolean flag indicating whether the deployment was canceled. "isCloudInitiatedRollback": bool, # Optional. Boolean flag indicating whether the deployment is a rollback deployment. "isRetried": bool, # Optional. Boolean flag indicating whether the deployment has been retried. "rollbackPolicy": { "failure": { "devicesFailedCount": 0, # Number of devices that failed. Required. "devicesFailedPercentage": 0 # Percentage of devices that failed. Required. }, "update": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. } } } """ _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) def prepare_request(next_link=None): if not next_link: request = build_device_management_list_deployments_for_group_request( group_id=group_id, instance_id=self._config.instance_id, order_by=order_by, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore else: # make call to next link with the client's api-version _parsed_next_link = urlparse(next_link) _next_request_params = case_insensitive_dict(parse_qs(_parsed_next_link.query)) _next_request_params["api-version"] = self._config.api_version request = HttpRequest("GET", urljoin(next_link, _parsed_next_link.path), params=_next_request_params) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore return request async def extract_data(pipeline_response): deserialized = pipeline_response.http_response.json() list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) return deserialized.get("nextLink", None), AsyncList(list_of_elem) async def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return AsyncItemPaged(get_next, extract_data)
[docs] @distributed_trace_async async def get_deployment(self, group_id: str, deployment_id: str, **kwargs: Any) -> JSON: """Gets the deployment properties. :param group_id: Group identifier. Required. :type group_id: str :param deployment_id: Deployment identifier. Required. :type deployment_id: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "deploymentId": "str", # The caller-provided deployment identifier. This cannot be longer than 73 characters, must be all lower-case, and cannot contain '&', '^', '[', ']', '{', '}', '|', '<', '>', forward slash, backslash, or double quote. The Updates view in the Azure Portal IoT Hub resource generates a GUID for deploymentId when you create a deployment. Required. "groupId": "str", # The group identity for the devices the deployment is intended to update. Required. "startDateTime": "2020-02-20 00:00:00", # The deployment start datetime. Required. "update": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. }, "deviceClassSubgroups": [ "str" # Optional. The device class subgroups the deployment is compatible with and subgroup deployments have been created for. This is not provided by the caller during CreateOrUpdateDeployment but is automatically determined by Device Update. ], "isCanceled": bool, # Optional. Boolean flag indicating whether the deployment was canceled. "isCloudInitiatedRollback": bool, # Optional. Boolean flag indicating whether the deployment is a rollback deployment. "isRetried": bool, # Optional. Boolean flag indicating whether the deployment has been retried. "rollbackPolicy": { "failure": { "devicesFailedCount": 0, # Number of devices that failed. Required. "devicesFailedPercentage": 0 # Percentage of devices that failed. Required. }, "update": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. } } } """ error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] request = build_device_management_get_deployment_request( group_id=group_id, deployment_id=deployment_id, instance_id=self._config.instance_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, cast(JSON, deserialized), {}) return cast(JSON, deserialized)
@overload async def create_or_update_deployment( self, group_id: str, deployment_id: str, deployment: JSON, *, content_type: str = "application/json", **kwargs: Any ) -> JSON: """Creates or updates a deployment. :param group_id: Group identifier. Required. :type group_id: str :param deployment_id: Deployment identifier. Required. :type deployment_id: str :param deployment: The deployment properties. Required. :type deployment: JSON :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # JSON input template you can fill out and use as your body input. deployment = { "deploymentId": "str", # The caller-provided deployment identifier. This cannot be longer than 73 characters, must be all lower-case, and cannot contain '&', '^', '[', ']', '{', '}', '|', '<', '>', forward slash, backslash, or double quote. The Updates view in the Azure Portal IoT Hub resource generates a GUID for deploymentId when you create a deployment. Required. "groupId": "str", # The group identity for the devices the deployment is intended to update. Required. "startDateTime": "2020-02-20 00:00:00", # The deployment start datetime. Required. "update": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. }, "deviceClassSubgroups": [ "str" # Optional. The device class subgroups the deployment is compatible with and subgroup deployments have been created for. This is not provided by the caller during CreateOrUpdateDeployment but is automatically determined by Device Update. ], "isCanceled": bool, # Optional. Boolean flag indicating whether the deployment was canceled. "isCloudInitiatedRollback": bool, # Optional. Boolean flag indicating whether the deployment is a rollback deployment. "isRetried": bool, # Optional. Boolean flag indicating whether the deployment has been retried. "rollbackPolicy": { "failure": { "devicesFailedCount": 0, # Number of devices that failed. Required. "devicesFailedPercentage": 0 # Percentage of devices that failed. Required. }, "update": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. } } } # response body for status code(s): 200 response == { "deploymentId": "str", # The caller-provided deployment identifier. This cannot be longer than 73 characters, must be all lower-case, and cannot contain '&', '^', '[', ']', '{', '}', '|', '<', '>', forward slash, backslash, or double quote. The Updates view in the Azure Portal IoT Hub resource generates a GUID for deploymentId when you create a deployment. Required. "groupId": "str", # The group identity for the devices the deployment is intended to update. Required. "startDateTime": "2020-02-20 00:00:00", # The deployment start datetime. Required. "update": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. }, "deviceClassSubgroups": [ "str" # Optional. The device class subgroups the deployment is compatible with and subgroup deployments have been created for. This is not provided by the caller during CreateOrUpdateDeployment but is automatically determined by Device Update. ], "isCanceled": bool, # Optional. Boolean flag indicating whether the deployment was canceled. "isCloudInitiatedRollback": bool, # Optional. Boolean flag indicating whether the deployment is a rollback deployment. "isRetried": bool, # Optional. Boolean flag indicating whether the deployment has been retried. "rollbackPolicy": { "failure": { "devicesFailedCount": 0, # Number of devices that failed. Required. "devicesFailedPercentage": 0 # Percentage of devices that failed. Required. }, "update": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. } } } """ @overload async def create_or_update_deployment( self, group_id: str, deployment_id: str, deployment: IO, *, content_type: str = "application/json", **kwargs: Any ) -> JSON: """Creates or updates a deployment. :param group_id: Group identifier. Required. :type group_id: str :param deployment_id: Deployment identifier. Required. :type deployment_id: str :param deployment: The deployment properties. Required. :type deployment: IO :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "deploymentId": "str", # The caller-provided deployment identifier. This cannot be longer than 73 characters, must be all lower-case, and cannot contain '&', '^', '[', ']', '{', '}', '|', '<', '>', forward slash, backslash, or double quote. The Updates view in the Azure Portal IoT Hub resource generates a GUID for deploymentId when you create a deployment. Required. "groupId": "str", # The group identity for the devices the deployment is intended to update. Required. "startDateTime": "2020-02-20 00:00:00", # The deployment start datetime. Required. "update": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. }, "deviceClassSubgroups": [ "str" # Optional. The device class subgroups the deployment is compatible with and subgroup deployments have been created for. This is not provided by the caller during CreateOrUpdateDeployment but is automatically determined by Device Update. ], "isCanceled": bool, # Optional. Boolean flag indicating whether the deployment was canceled. "isCloudInitiatedRollback": bool, # Optional. Boolean flag indicating whether the deployment is a rollback deployment. "isRetried": bool, # Optional. Boolean flag indicating whether the deployment has been retried. "rollbackPolicy": { "failure": { "devicesFailedCount": 0, # Number of devices that failed. Required. "devicesFailedPercentage": 0 # Percentage of devices that failed. Required. }, "update": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. } } } """
[docs] @distributed_trace_async async def create_or_update_deployment( self, group_id: str, deployment_id: str, deployment: Union[JSON, IO], **kwargs: Any ) -> JSON: """Creates or updates a deployment. :param group_id: Group identifier. Required. :type group_id: str :param deployment_id: Deployment identifier. Required. :type deployment_id: str :param deployment: The deployment properties. Is either a model type or a IO type. Required. :type deployment: JSON or IO :keyword content_type: Body Parameter content-type. Known values are: 'application/json'. Default value is None. :paramtype content_type: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "deploymentId": "str", # The caller-provided deployment identifier. This cannot be longer than 73 characters, must be all lower-case, and cannot contain '&', '^', '[', ']', '{', '}', '|', '<', '>', forward slash, backslash, or double quote. The Updates view in the Azure Portal IoT Hub resource generates a GUID for deploymentId when you create a deployment. Required. "groupId": "str", # The group identity for the devices the deployment is intended to update. Required. "startDateTime": "2020-02-20 00:00:00", # The deployment start datetime. Required. "update": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. }, "deviceClassSubgroups": [ "str" # Optional. The device class subgroups the deployment is compatible with and subgroup deployments have been created for. This is not provided by the caller during CreateOrUpdateDeployment but is automatically determined by Device Update. ], "isCanceled": bool, # Optional. Boolean flag indicating whether the deployment was canceled. "isCloudInitiatedRollback": bool, # Optional. Boolean flag indicating whether the deployment is a rollback deployment. "isRetried": bool, # Optional. Boolean flag indicating whether the deployment has been retried. "rollbackPolicy": { "failure": { "devicesFailedCount": 0, # Number of devices that failed. Required. "devicesFailedPercentage": 0 # Percentage of devices that failed. Required. }, "update": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. } } } """ error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = kwargs.pop("params", {}) or {} content_type = kwargs.pop("content_type", _headers.pop("Content-Type", None)) # type: Optional[str] cls = kwargs.pop("cls", None) # type: ClsType[JSON] content_type = content_type or "application/json" _json = None _content = None if isinstance(deployment, (IO, bytes)): _content = deployment else: _json = deployment request = build_device_management_create_or_update_deployment_request( group_id=group_id, deployment_id=deployment_id, instance_id=self._config.instance_id, content_type=content_type, api_version=self._config.api_version, json=_json, content=_content, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, cast(JSON, deserialized), {}) return cast(JSON, deserialized)
[docs] @distributed_trace_async async def delete_deployment( # pylint: disable=inconsistent-return-statements self, group_id: str, deployment_id: str, **kwargs: Any ) -> None: """Deletes a deployment. :param group_id: Group identifier. Required. :type group_id: str :param deployment_id: Deployment identifier. Required. :type deployment_id: str :return: None :rtype: None :raises ~azure.core.exceptions.HttpResponseError: """ error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[None] request = build_device_management_delete_deployment_request( group_id=group_id, deployment_id=deployment_id, instance_id=self._config.instance_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [204]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if cls: return cls(pipeline_response, None, {})
[docs] @distributed_trace_async async def get_deployment_status(self, group_id: str, deployment_id: str, **kwargs: Any) -> JSON: """Gets the status of a deployment including a breakdown of how many devices in the deployment are in progress, completed, or failed. :param group_id: Group identifier. Required. :type group_id: str :param deployment_id: Deployment identifier. Required. :type deployment_id: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "deploymentState": "str", # The state of the deployment. Required. Known values are: "Active", "ActiveWithSubgroupFailures", "Failed", "Inactive", and "Canceled". "groupId": "str", # The group identity. Required. "subgroupStatus": [ { "deploymentState": "str", # The state of the subgroup deployment. Required. Known values are: "Active", "Failed", "Inactive", and "Canceled". "deviceClassId": "str", # The device class subgroup identity. Required. "groupId": "str", # The group identity. Required. "devicesCanceledCount": 0, # Optional. The number of devices which have had their deployment canceled. "devicesCompletedFailedCount": 0, # Optional. The number of devices that have completed deployment with a failure. "devicesCompletedSucceededCount": 0, # Optional. The number of devices which have successfully completed deployment. "devicesInProgressCount": 0, # Optional. The number of devices that are currently in deployment. "error": { "code": "str", # Server defined error code. Required. "message": "str", # A human-readable representation of the error. Required. "details": [ ... ], "innererror": { "code": "str", # A more specific error code than what was provided by the containing error. Required. "errorDetail": "str", # Optional. The internal error or exception message. "innerError": ..., "message": "str" # Optional. A human-readable representation of the error. }, "occurredDateTime": "2020-02-20 00:00:00", # Optional. Date and time in UTC when the error occurred. "target": "str" # Optional. The target of the error. }, "totalDevices": 0 # Optional. The total number of devices in the deployment. } ], "error": { "code": "str", # Server defined error code. Required. "message": "str", # A human-readable representation of the error. Required. "details": [ ... ], "innererror": { "code": "str", # A more specific error code than what was provided by the containing error. Required. "errorDetail": "str", # Optional. The internal error or exception message. "innerError": ..., "message": "str" # Optional. A human-readable representation of the error. }, "occurredDateTime": "2020-02-20 00:00:00", # Optional. Date and time in UTC when the error occurred. "target": "str" # Optional. The target of the error. } } """ error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] request = build_device_management_get_deployment_status_request( group_id=group_id, deployment_id=deployment_id, instance_id=self._config.instance_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, cast(JSON, deserialized), {}) return cast(JSON, deserialized)
[docs] @distributed_trace def list_device_class_subgroups_for_group( self, group_id: str, *, filter: Optional[str] = None, **kwargs: Any ) -> AsyncIterable[JSON]: """Get the device class subgroups for the group. A device class subgroup is the set of devices within the group that share the same device class. All devices within the same device class are compatible with the same updates. :param group_id: Group identifier. Required. :type group_id: str :keyword filter: Restricts the set of device class subgroups returned. You can filter on compat properties by name and value. (i.e. filter=compatProperties/propertyName1 eq 'value1' and compatProperties/propertyName2 eq 'value2'). Default value is None. :paramtype filter: str :return: An iterator like instance of JSON object :rtype: ~azure.core.async_paging.AsyncItemPaged[JSON] :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "createdDateTime": "str", # Date and time when the device class subgroup was created. Required. "deviceClassId": "str", # Device class subgroup identity. This is generated from the model Id and the compat properties reported by the device update agent in the Device Update PnP interface in IoT Hub. It is a hex-encoded SHA1 hash. Required. "groupId": "str", # Group identity. Required. "deploymentId": "str", # Optional. The active deployment Id for the device class subgroup. "deviceCount": 0 # Optional. The number of devices in the device class subgroup. } """ _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) def prepare_request(next_link=None): if not next_link: request = build_device_management_list_device_class_subgroups_for_group_request( group_id=group_id, instance_id=self._config.instance_id, filter=filter, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore else: # make call to next link with the client's api-version _parsed_next_link = urlparse(next_link) _next_request_params = case_insensitive_dict(parse_qs(_parsed_next_link.query)) _next_request_params["api-version"] = self._config.api_version request = HttpRequest("GET", urljoin(next_link, _parsed_next_link.path), params=_next_request_params) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore return request async def extract_data(pipeline_response): deserialized = pipeline_response.http_response.json() list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) return deserialized.get("nextLink", None), AsyncList(list_of_elem) async def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return AsyncItemPaged(get_next, extract_data)
[docs] @distributed_trace_async async def get_device_class_subgroup(self, group_id: str, device_class_id: str, **kwargs: Any) -> JSON: """Gets device class subgroup details. A device class subgroup is the set of devices within the group that share the same device class. All devices within the same device class are compatible with the same updates. :param group_id: Group identifier. Required. :type group_id: str :param device_class_id: Device class identifier. Required. :type device_class_id: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "createdDateTime": "str", # Date and time when the device class subgroup was created. Required. "deviceClassId": "str", # Device class subgroup identity. This is generated from the model Id and the compat properties reported by the device update agent in the Device Update PnP interface in IoT Hub. It is a hex-encoded SHA1 hash. Required. "groupId": "str", # Group identity. Required. "deploymentId": "str", # Optional. The active deployment Id for the device class subgroup. "deviceCount": 0 # Optional. The number of devices in the device class subgroup. } """ error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] request = build_device_management_get_device_class_subgroup_request( group_id=group_id, device_class_id=device_class_id, instance_id=self._config.instance_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, cast(JSON, deserialized), {}) return cast(JSON, deserialized)
[docs] @distributed_trace_async async def delete_device_class_subgroup( # pylint: disable=inconsistent-return-statements self, group_id: str, device_class_id: str, **kwargs: Any ) -> None: """Deletes a device class subgroup. This subgroup is automatically created when a Device Update-enabled device is connected to the hub and reports its properties. Groups, subgroups, and deployments are not automatically cleaned up but are retained for history purposes. Users can call this method to delete a subgroup if they do not need to retain any of the history of the subgroup and no longer need it. If a device is ever connected again for this subgroup after the subgroup was deleted it will be automatically re-created but there will be no history. :param group_id: Group identifier. Required. :type group_id: str :param device_class_id: Device class identifier. Required. :type device_class_id: str :return: None :rtype: None :raises ~azure.core.exceptions.HttpResponseError: """ error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[None] request = build_device_management_delete_device_class_subgroup_request( group_id=group_id, device_class_id=device_class_id, instance_id=self._config.instance_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [204]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if cls: return cls(pipeline_response, None, {})
[docs] @distributed_trace_async async def get_device_class_subgroup_update_compliance( self, group_id: str, device_class_id: str, **kwargs: Any ) -> JSON: """Get device class subgroup update compliance information such as how many devices are on their latest update, how many need new updates, and how many are in progress on receiving a new update. :param group_id: Group identifier. Required. :type group_id: str :param device_class_id: Device class identifier. Required. :type device_class_id: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "newUpdatesAvailableDeviceCount": 0, # Number of devices with a newer update available. Required. "onLatestUpdateDeviceCount": 0, # Number of devices on the latest update. Required. "totalDeviceCount": 0, # Total number of devices. Required. "updatesInProgressDeviceCount": 0 # Number of devices with update in-progress. Required. } """ error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] request = build_device_management_get_device_class_subgroup_update_compliance_request( group_id=group_id, device_class_id=device_class_id, instance_id=self._config.instance_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, cast(JSON, deserialized), {}) return cast(JSON, deserialized)
[docs] @distributed_trace_async async def get_best_updates_for_device_class_subgroup( self, group_id: str, device_class_id: str, **kwargs: Any ) -> JSON: """Get the best available update for a device class subgroup and a count of how many devices need this update. :param group_id: Group identifier. Required. :type group_id: str :param device_class_id: Device class identifier. Required. :type device_class_id: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "deviceClassId": "str", # The device class subgroup's device class Id. Required. "deviceCount": 0, # Total number of devices for which the update is applicable. Required. "groupId": "str", # The group Id. Required. "update": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. } } """ error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] request = build_device_management_get_best_updates_for_device_class_subgroup_request( group_id=group_id, device_class_id=device_class_id, instance_id=self._config.instance_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, cast(JSON, deserialized), {}) return cast(JSON, deserialized)
[docs] @distributed_trace def list_deployments_for_device_class_subgroup( self, group_id: str, device_class_id: str, *, order_by: Optional[str] = None, **kwargs: Any ) -> AsyncIterable[JSON]: """Gets a list of deployments for a device class subgroup. :param group_id: Group identifier. Required. :type group_id: str :param device_class_id: Device class identifier. Required. :type device_class_id: str :keyword order_by: Orders the set of deployments returned. You can order by start date. Default value is None. :paramtype order_by: str :return: An iterator like instance of JSON object :rtype: ~azure.core.async_paging.AsyncItemPaged[JSON] :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "deploymentId": "str", # The caller-provided deployment identifier. This cannot be longer than 73 characters, must be all lower-case, and cannot contain '&', '^', '[', ']', '{', '}', '|', '<', '>', forward slash, backslash, or double quote. The Updates view in the Azure Portal IoT Hub resource generates a GUID for deploymentId when you create a deployment. Required. "groupId": "str", # The group identity for the devices the deployment is intended to update. Required. "startDateTime": "2020-02-20 00:00:00", # The deployment start datetime. Required. "update": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. }, "deviceClassSubgroups": [ "str" # Optional. The device class subgroups the deployment is compatible with and subgroup deployments have been created for. This is not provided by the caller during CreateOrUpdateDeployment but is automatically determined by Device Update. ], "isCanceled": bool, # Optional. Boolean flag indicating whether the deployment was canceled. "isCloudInitiatedRollback": bool, # Optional. Boolean flag indicating whether the deployment is a rollback deployment. "isRetried": bool, # Optional. Boolean flag indicating whether the deployment has been retried. "rollbackPolicy": { "failure": { "devicesFailedCount": 0, # Number of devices that failed. Required. "devicesFailedPercentage": 0 # Percentage of devices that failed. Required. }, "update": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. } } } """ _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) def prepare_request(next_link=None): if not next_link: request = build_device_management_list_deployments_for_device_class_subgroup_request( group_id=group_id, device_class_id=device_class_id, instance_id=self._config.instance_id, order_by=order_by, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore else: # make call to next link with the client's api-version _parsed_next_link = urlparse(next_link) _next_request_params = case_insensitive_dict(parse_qs(_parsed_next_link.query)) _next_request_params["api-version"] = self._config.api_version request = HttpRequest("GET", urljoin(next_link, _parsed_next_link.path), params=_next_request_params) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore return request async def extract_data(pipeline_response): deserialized = pipeline_response.http_response.json() list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) return deserialized.get("nextLink", None), AsyncList(list_of_elem) async def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return AsyncItemPaged(get_next, extract_data)
[docs] @distributed_trace_async async def get_deployment_for_device_class_subgroup( self, group_id: str, device_class_id: str, deployment_id: str, **kwargs: Any ) -> JSON: """Gets the deployment properties. :param group_id: Group identifier. Required. :type group_id: str :param device_class_id: Device class identifier. Required. :type device_class_id: str :param deployment_id: Deployment identifier. Required. :type deployment_id: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "deploymentId": "str", # The caller-provided deployment identifier. This cannot be longer than 73 characters, must be all lower-case, and cannot contain '&', '^', '[', ']', '{', '}', '|', '<', '>', forward slash, backslash, or double quote. The Updates view in the Azure Portal IoT Hub resource generates a GUID for deploymentId when you create a deployment. Required. "groupId": "str", # The group identity for the devices the deployment is intended to update. Required. "startDateTime": "2020-02-20 00:00:00", # The deployment start datetime. Required. "update": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. }, "deviceClassSubgroups": [ "str" # Optional. The device class subgroups the deployment is compatible with and subgroup deployments have been created for. This is not provided by the caller during CreateOrUpdateDeployment but is automatically determined by Device Update. ], "isCanceled": bool, # Optional. Boolean flag indicating whether the deployment was canceled. "isCloudInitiatedRollback": bool, # Optional. Boolean flag indicating whether the deployment is a rollback deployment. "isRetried": bool, # Optional. Boolean flag indicating whether the deployment has been retried. "rollbackPolicy": { "failure": { "devicesFailedCount": 0, # Number of devices that failed. Required. "devicesFailedPercentage": 0 # Percentage of devices that failed. Required. }, "update": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. } } } """ error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] request = build_device_management_get_deployment_for_device_class_subgroup_request( group_id=group_id, device_class_id=device_class_id, deployment_id=deployment_id, instance_id=self._config.instance_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, cast(JSON, deserialized), {}) return cast(JSON, deserialized)
[docs] @distributed_trace_async async def delete_deployment_for_device_class_subgroup( # pylint: disable=inconsistent-return-statements self, group_id: str, device_class_id: str, deployment_id: str, **kwargs: Any ) -> None: """Deletes a device class subgroup deployment. :param group_id: Group identifier. Required. :type group_id: str :param device_class_id: Device class identifier. Required. :type device_class_id: str :param deployment_id: Deployment identifier. Required. :type deployment_id: str :return: None :rtype: None :raises ~azure.core.exceptions.HttpResponseError: """ error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[None] request = build_device_management_delete_deployment_for_device_class_subgroup_request( group_id=group_id, device_class_id=device_class_id, deployment_id=deployment_id, instance_id=self._config.instance_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [204]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if cls: return cls(pipeline_response, None, {})
[docs] @distributed_trace_async async def stop_deployment(self, group_id: str, device_class_id: str, deployment_id: str, **kwargs: Any) -> JSON: """Stops a deployment. :param group_id: Group identifier. Required. :type group_id: str :param device_class_id: Device class identifier. Required. :type device_class_id: str :param deployment_id: Deployment identifier. Required. :type deployment_id: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "deploymentId": "str", # The caller-provided deployment identifier. This cannot be longer than 73 characters, must be all lower-case, and cannot contain '&', '^', '[', ']', '{', '}', '|', '<', '>', forward slash, backslash, or double quote. The Updates view in the Azure Portal IoT Hub resource generates a GUID for deploymentId when you create a deployment. Required. "groupId": "str", # The group identity for the devices the deployment is intended to update. Required. "startDateTime": "2020-02-20 00:00:00", # The deployment start datetime. Required. "update": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. }, "deviceClassSubgroups": [ "str" # Optional. The device class subgroups the deployment is compatible with and subgroup deployments have been created for. This is not provided by the caller during CreateOrUpdateDeployment but is automatically determined by Device Update. ], "isCanceled": bool, # Optional. Boolean flag indicating whether the deployment was canceled. "isCloudInitiatedRollback": bool, # Optional. Boolean flag indicating whether the deployment is a rollback deployment. "isRetried": bool, # Optional. Boolean flag indicating whether the deployment has been retried. "rollbackPolicy": { "failure": { "devicesFailedCount": 0, # Number of devices that failed. Required. "devicesFailedPercentage": 0 # Percentage of devices that failed. Required. }, "update": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. } } } """ error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] request = build_device_management_stop_deployment_request( group_id=group_id, device_class_id=device_class_id, deployment_id=deployment_id, instance_id=self._config.instance_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, cast(JSON, deserialized), {}) return cast(JSON, deserialized)
[docs] @distributed_trace_async async def retry_deployment(self, group_id: str, device_class_id: str, deployment_id: str, **kwargs: Any) -> JSON: """Retries a deployment with failed devices. :param group_id: Group identifier. Required. :type group_id: str :param device_class_id: Device class identifier. Required. :type device_class_id: str :param deployment_id: Deployment identifier. Required. :type deployment_id: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "deploymentId": "str", # The caller-provided deployment identifier. This cannot be longer than 73 characters, must be all lower-case, and cannot contain '&', '^', '[', ']', '{', '}', '|', '<', '>', forward slash, backslash, or double quote. The Updates view in the Azure Portal IoT Hub resource generates a GUID for deploymentId when you create a deployment. Required. "groupId": "str", # The group identity for the devices the deployment is intended to update. Required. "startDateTime": "2020-02-20 00:00:00", # The deployment start datetime. Required. "update": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. }, "deviceClassSubgroups": [ "str" # Optional. The device class subgroups the deployment is compatible with and subgroup deployments have been created for. This is not provided by the caller during CreateOrUpdateDeployment but is automatically determined by Device Update. ], "isCanceled": bool, # Optional. Boolean flag indicating whether the deployment was canceled. "isCloudInitiatedRollback": bool, # Optional. Boolean flag indicating whether the deployment is a rollback deployment. "isRetried": bool, # Optional. Boolean flag indicating whether the deployment has been retried. "rollbackPolicy": { "failure": { "devicesFailedCount": 0, # Number of devices that failed. Required. "devicesFailedPercentage": 0 # Percentage of devices that failed. Required. }, "update": { "updateId": { "name": "str", # Update name. Required. "provider": "str", # Update provider. Required. "version": "str" # Update version. Required. }, "description": "str", # Optional. Update description. "friendlyName": "str" # Optional. Friendly update name. } } } """ error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] request = build_device_management_retry_deployment_request( group_id=group_id, device_class_id=device_class_id, deployment_id=deployment_id, instance_id=self._config.instance_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, cast(JSON, deserialized), {}) return cast(JSON, deserialized)
[docs] @distributed_trace_async async def get_device_class_subgroup_deployment_status( self, group_id: str, device_class_id: str, deployment_id: str, **kwargs: Any ) -> JSON: """Gets the status of a deployment including a breakdown of how many devices in the deployment are in progress, completed, or failed. :param group_id: Group identifier. Required. :type group_id: str :param device_class_id: Device class identifier. Required. :type device_class_id: str :param deployment_id: Deployment identifier. Required. :type deployment_id: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "deploymentState": "str", # The state of the subgroup deployment. Required. Known values are: "Active", "Failed", "Inactive", and "Canceled". "deviceClassId": "str", # The device class subgroup identity. Required. "groupId": "str", # The group identity. Required. "devicesCanceledCount": 0, # Optional. The number of devices which have had their deployment canceled. "devicesCompletedFailedCount": 0, # Optional. The number of devices that have completed deployment with a failure. "devicesCompletedSucceededCount": 0, # Optional. The number of devices which have successfully completed deployment. "devicesInProgressCount": 0, # Optional. The number of devices that are currently in deployment. "error": { "code": "str", # Server defined error code. Required. "message": "str", # A human-readable representation of the error. Required. "details": [ ... ], "innererror": { "code": "str", # A more specific error code than what was provided by the containing error. Required. "errorDetail": "str", # Optional. The internal error or exception message. "innerError": ..., "message": "str" # Optional. A human-readable representation of the error. }, "occurredDateTime": "2020-02-20 00:00:00", # Optional. Date and time in UTC when the error occurred. "target": "str" # Optional. The target of the error. }, "totalDevices": 0 # Optional. The total number of devices in the deployment. } """ error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] request = build_device_management_get_device_class_subgroup_deployment_status_request( group_id=group_id, device_class_id=device_class_id, deployment_id=deployment_id, instance_id=self._config.instance_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, cast(JSON, deserialized), {}) return cast(JSON, deserialized)
[docs] @distributed_trace def list_device_states_for_device_class_subgroup_deployment( self, group_id: str, device_class_id: str, deployment_id: str, *, filter: Optional[str] = None, **kwargs: Any ) -> AsyncIterable[JSON]: """Gets a list of devices in a deployment along with their state. Useful for getting a list of failed devices. :param group_id: Group identifier. Required. :type group_id: str :param device_class_id: Device class identifier. Required. :type device_class_id: str :param deployment_id: Deployment identifier. Required. :type deployment_id: str :keyword filter: Restricts the set of deployment device states returned. You can filter on deviceId and moduleId and/or deviceState. Default value is None. :paramtype filter: str :return: An iterator like instance of JSON object :rtype: ~azure.core.async_paging.AsyncItemPaged[JSON] :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "deviceId": "str", # Device identity. Required. "deviceState": "str", # Deployment device state. Required. Known values are: "Succeeded", "InProgress", "Canceled", and "Failed". "movedOnToNewDeployment": bool, # Boolean flag indicating whether this device is in a newer deployment and can no longer retry this deployment. Required. "retryCount": 0, # The number of times this deployment has been retried on this device. Required. "moduleId": "str" # Optional. Device module identity. } """ _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) def prepare_request(next_link=None): if not next_link: request = build_device_management_list_device_states_for_device_class_subgroup_deployment_request( group_id=group_id, device_class_id=device_class_id, deployment_id=deployment_id, instance_id=self._config.instance_id, filter=filter, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore else: # make call to next link with the client's api-version _parsed_next_link = urlparse(next_link) _next_request_params = case_insensitive_dict(parse_qs(_parsed_next_link.query)) _next_request_params["api-version"] = self._config.api_version request = HttpRequest("GET", urljoin(next_link, _parsed_next_link.path), params=_next_request_params) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore return request async def extract_data(pipeline_response): deserialized = pipeline_response.http_response.json() list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) return deserialized.get("nextLink", None), AsyncList(list_of_elem) async def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return AsyncItemPaged(get_next, extract_data)
[docs] @distributed_trace_async async def get_operation_status( self, operation_id: str, *, if_none_match: Optional[str] = None, **kwargs: Any ) -> JSON: """Retrieve operation status. :param operation_id: Operation identifier. Required. :type operation_id: str :keyword if_none_match: Defines the If-None-Match condition. The operation will be performed only if the ETag on the server does not match this value. Default value is None. :paramtype if_none_match: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "createdDateTime": "2020-02-20 00:00:00", # Date and time in UTC when the operation was created. Required. "lastActionDateTime": "2020-02-20 00:00:00", # Date and time in UTC when the operation status was last updated. Required. "operationId": "str", # Operation Id. Required. "status": "str", # Operation status. Required. Known values are: "NotStarted", "Running", "Succeeded", and "Failed". "error": { "code": "str", # Server defined error code. Required. "message": "str", # A human-readable representation of the error. Required. "details": [ ... ], "innererror": { "code": "str", # A more specific error code than what was provided by the containing error. Required. "errorDetail": "str", # Optional. The internal error or exception message. "innerError": ..., "message": "str" # Optional. A human-readable representation of the error. }, "occurredDateTime": "2020-02-20 00:00:00", # Optional. Date and time in UTC when the error occurred. "target": "str" # Optional. The target of the error. }, "etag": "str", # Optional. Operation ETag. "traceId": "str" # Optional. Operation correlation identity that can used by Microsoft Support for troubleshooting. } """ error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] request = build_device_management_get_operation_status_request( operation_id=operation_id, instance_id=self._config.instance_id, if_none_match=if_none_match, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) response_headers = {} response_headers["Retry-After"] = self._deserialize("str", response.headers.get("Retry-After")) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, cast(JSON, deserialized), response_headers) return cast(JSON, deserialized)
[docs] @distributed_trace def list_operation_statuses( self, *, filter: Optional[str] = None, top: Optional[int] = None, **kwargs: Any ) -> AsyncIterable[JSON]: """Get a list of all device import operations. Completed operations are kept for 7 days before auto-deleted. :keyword filter: Restricts the set of operations returned. Only one specific filter is supported: "status eq 'NotStarted' or status eq 'Running'". Default value is None. :paramtype filter: str :keyword top: Specifies a non-negative integer n that limits the number of items returned from a collection. The service returns the number of available items up to but not greater than the specified value n. Default value is None. :paramtype top: int :return: An iterator like instance of JSON object :rtype: ~azure.core.async_paging.AsyncItemPaged[JSON] :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "createdDateTime": "2020-02-20 00:00:00", # Date and time in UTC when the operation was created. Required. "lastActionDateTime": "2020-02-20 00:00:00", # Date and time in UTC when the operation status was last updated. Required. "operationId": "str", # Operation Id. Required. "status": "str", # Operation status. Required. Known values are: "NotStarted", "Running", "Succeeded", and "Failed". "error": { "code": "str", # Server defined error code. Required. "message": "str", # A human-readable representation of the error. Required. "details": [ ... ], "innererror": { "code": "str", # A more specific error code than what was provided by the containing error. Required. "errorDetail": "str", # Optional. The internal error or exception message. "innerError": ..., "message": "str" # Optional. A human-readable representation of the error. }, "occurredDateTime": "2020-02-20 00:00:00", # Optional. Date and time in UTC when the error occurred. "target": "str" # Optional. The target of the error. }, "etag": "str", # Optional. Operation ETag. "traceId": "str" # Optional. Operation correlation identity that can used by Microsoft Support for troubleshooting. } """ _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) def prepare_request(next_link=None): if not next_link: request = build_device_management_list_operation_statuses_request( instance_id=self._config.instance_id, filter=filter, top=top, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore else: # make call to next link with the client's api-version _parsed_next_link = urlparse(next_link) _next_request_params = case_insensitive_dict(parse_qs(_parsed_next_link.query)) _next_request_params["api-version"] = self._config.api_version request = HttpRequest("GET", urljoin(next_link, _parsed_next_link.path), params=_next_request_params) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore return request async def extract_data(pipeline_response): deserialized = pipeline_response.http_response.json() list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) return deserialized.get("nextLink", None), AsyncList(list_of_elem) async def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return AsyncItemPaged(get_next, extract_data)
@overload async def start_log_collection( self, log_collection_id: str, log_collection: JSON, *, content_type: str = "application/json", **kwargs: Any ) -> JSON: """Start the device diagnostics log collection on specified devices. :param log_collection_id: Log collection identifier. Required. :type log_collection_id: str :param log_collection: The log collection properties. Required. :type log_collection: JSON :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # JSON input template you can fill out and use as your body input. log_collection = { "deviceList": [ { "deviceId": "str", # Device Id. Required. "moduleId": "str" # Optional. Module Id. } ], "createdDateTime": "str", # Optional. The timestamp when the operation was created. "description": "str", # Optional. Description of the diagnostics operation. "lastActionDateTime": "str", # Optional. A timestamp for when the current state was entered. "operationId": "str", # Optional. The log collection id. "status": "str" # Optional. Operation status. Known values are: "NotStarted", "Running", "Succeeded", and "Failed". } # response body for status code(s): 201 response == { "deviceList": [ { "deviceId": "str", # Device Id. Required. "moduleId": "str" # Optional. Module Id. } ], "createdDateTime": "str", # Optional. The timestamp when the operation was created. "description": "str", # Optional. Description of the diagnostics operation. "lastActionDateTime": "str", # Optional. A timestamp for when the current state was entered. "operationId": "str", # Optional. The log collection id. "status": "str" # Optional. Operation status. Known values are: "NotStarted", "Running", "Succeeded", and "Failed". } """ @overload async def start_log_collection( self, log_collection_id: str, log_collection: IO, *, content_type: str = "application/json", **kwargs: Any ) -> JSON: """Start the device diagnostics log collection on specified devices. :param log_collection_id: Log collection identifier. Required. :type log_collection_id: str :param log_collection: The log collection properties. Required. :type log_collection: IO :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 201 response == { "deviceList": [ { "deviceId": "str", # Device Id. Required. "moduleId": "str" # Optional. Module Id. } ], "createdDateTime": "str", # Optional. The timestamp when the operation was created. "description": "str", # Optional. Description of the diagnostics operation. "lastActionDateTime": "str", # Optional. A timestamp for when the current state was entered. "operationId": "str", # Optional. The log collection id. "status": "str" # Optional. Operation status. Known values are: "NotStarted", "Running", "Succeeded", and "Failed". } """
[docs] @distributed_trace_async async def start_log_collection( self, log_collection_id: str, log_collection: Union[JSON, IO], **kwargs: Any ) -> JSON: """Start the device diagnostics log collection on specified devices. :param log_collection_id: Log collection identifier. Required. :type log_collection_id: str :param log_collection: The log collection properties. Is either a model type or a IO type. Required. :type log_collection: JSON or IO :keyword content_type: Body Parameter content-type. Known values are: 'application/json'. Default value is None. :paramtype content_type: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 201 response == { "deviceList": [ { "deviceId": "str", # Device Id. Required. "moduleId": "str" # Optional. Module Id. } ], "createdDateTime": "str", # Optional. The timestamp when the operation was created. "description": "str", # Optional. Description of the diagnostics operation. "lastActionDateTime": "str", # Optional. A timestamp for when the current state was entered. "operationId": "str", # Optional. The log collection id. "status": "str" # Optional. Operation status. Known values are: "NotStarted", "Running", "Succeeded", and "Failed". } """ error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = kwargs.pop("params", {}) or {} content_type = kwargs.pop("content_type", _headers.pop("Content-Type", None)) # type: Optional[str] cls = kwargs.pop("cls", None) # type: ClsType[JSON] content_type = content_type or "application/json" _json = None _content = None if isinstance(log_collection, (IO, bytes)): _content = log_collection else: _json = log_collection request = build_device_management_start_log_collection_request( log_collection_id=log_collection_id, instance_id=self._config.instance_id, content_type=content_type, api_version=self._config.api_version, json=_json, content=_content, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [201]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, cast(JSON, deserialized), {}) return cast(JSON, deserialized)
[docs] @distributed_trace_async async def get_log_collection(self, log_collection_id: str, **kwargs: Any) -> JSON: """Get the device diagnostics log collection. :param log_collection_id: Log collection identifier. Required. :type log_collection_id: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "deviceList": [ { "deviceId": "str", # Device Id. Required. "moduleId": "str" # Optional. Module Id. } ], "createdDateTime": "str", # Optional. The timestamp when the operation was created. "description": "str", # Optional. Description of the diagnostics operation. "lastActionDateTime": "str", # Optional. A timestamp for when the current state was entered. "operationId": "str", # Optional. The log collection id. "status": "str" # Optional. Operation status. Known values are: "NotStarted", "Running", "Succeeded", and "Failed". } """ error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] request = build_device_management_get_log_collection_request( log_collection_id=log_collection_id, instance_id=self._config.instance_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, cast(JSON, deserialized), {}) return cast(JSON, deserialized)
[docs] @distributed_trace def list_log_collections(self, **kwargs: Any) -> AsyncIterable[JSON]: """Get all device diagnostics log collections. :return: An iterator like instance of JSON object :rtype: ~azure.core.async_paging.AsyncItemPaged[JSON] :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "deviceList": [ { "deviceId": "str", # Device Id. Required. "moduleId": "str" # Optional. Module Id. } ], "createdDateTime": "str", # Optional. The timestamp when the operation was created. "description": "str", # Optional. Description of the diagnostics operation. "lastActionDateTime": "str", # Optional. A timestamp for when the current state was entered. "operationId": "str", # Optional. The log collection id. "status": "str" # Optional. Operation status. Known values are: "NotStarted", "Running", "Succeeded", and "Failed". } """ _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) def prepare_request(next_link=None): if not next_link: request = build_device_management_list_log_collections_request( instance_id=self._config.instance_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore else: # make call to next link with the client's api-version _parsed_next_link = urlparse(next_link) _next_request_params = case_insensitive_dict(parse_qs(_parsed_next_link.query)) _next_request_params["api-version"] = self._config.api_version request = HttpRequest("GET", urljoin(next_link, _parsed_next_link.path), params=_next_request_params) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore return request async def extract_data(pipeline_response): deserialized = pipeline_response.http_response.json() list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) return deserialized.get("nextLink", None), AsyncList(list_of_elem) async def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return AsyncItemPaged(get_next, extract_data)
[docs] @distributed_trace_async async def get_log_collection_detailed_status(self, log_collection_id: str, **kwargs: Any) -> JSON: """Get log collection with detailed status. :param log_collection_id: Log collection identifier. Required. :type log_collection_id: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "createdDateTime": "str", # Optional. The timestamp when the operation was created. "description": "str", # Optional. Device diagnostics operation description. "deviceStatus": [ { "deviceId": "str", # Device id. Required. "status": "str", # Log upload status. Required. Known values are: "NotStarted", "Running", "Succeeded", and "Failed". "extendedResultCode": "str", # Optional. Log upload extended result code. "logLocation": "str", # Optional. Log upload location. "moduleId": "str", # Optional. Module id. "resultCode": "str" # Optional. Log upload result code. } ], "lastActionDateTime": "str", # Optional. A timestamp for when the current state was entered. "operationId": "str", # Optional. The device diagnostics operation id. "status": "str" # Optional. Operation status. Known values are: "NotStarted", "Running", "Succeeded", and "Failed". } """ error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] request = build_device_management_get_log_collection_detailed_status_request( log_collection_id=log_collection_id, instance_id=self._config.instance_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, cast(JSON, deserialized), {}) return cast(JSON, deserialized)
[docs] @distributed_trace def list_health_of_devices(self, *, filter: str, **kwargs: Any) -> AsyncIterable[JSON]: """Get list of device health. :keyword filter: Restricts the set of devices for which device health is returned. You can filter on status, device id and module id. Required. :paramtype filter: str :return: An iterator like instance of JSON object :rtype: ~azure.core.async_paging.AsyncItemPaged[JSON] :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "deviceId": "str", # Device id. Required. "healthChecks": [ { "name": "str", # Optional. Health check name. "result": "str" # Optional. Health check result. Known values are: "success" and "userError". } ], "state": "str", # Aggregate device health state. Required. Known values are: "healthy" and "unhealthy". "digitalTwinModelId": "str", # Optional. Digital twin model Id. "moduleId": "str" # Optional. Module id. } """ _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls = kwargs.pop("cls", None) # type: ClsType[JSON] error_map = {401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError} error_map.update(kwargs.pop("error_map", {}) or {}) def prepare_request(next_link=None): if not next_link: request = build_device_management_list_health_of_devices_request( instance_id=self._config.instance_id, filter=filter, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore else: # make call to next link with the client's api-version _parsed_next_link = urlparse(next_link) _next_request_params = case_insensitive_dict(parse_qs(_parsed_next_link.query)) _next_request_params["api-version"] = self._config.api_version request = HttpRequest("GET", urljoin(next_link, _parsed_next_link.path), params=_next_request_params) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) # type: ignore return request async def extract_data(pipeline_response): deserialized = pipeline_response.http_response.json() list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) return deserialized.get("nextLink", None), AsyncList(list_of_elem) async def get_next(next_link=None): request = prepare_request(next_link) pipeline_response = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access request, stream=False, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return AsyncItemPaged(get_next, extract_data)