# pylint: disable=line-too-long,useless-suppression,too-many-lines
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) Python Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
from collections.abc import MutableMapping
from io import IOBase
import json
from typing import Any, Callable, Dict, IO, Iterator, List, Optional, TypeVar, Union, cast, overload
import urllib.parse
from azure.core import PipelineClient
from azure.core.exceptions import (
ClientAuthenticationError,
HttpResponseError,
ResourceExistsError,
ResourceNotFoundError,
ResourceNotModifiedError,
StreamClosedError,
StreamConsumedError,
map_error,
)
from azure.core.paging import ItemPaged
from azure.core.pipeline import PipelineResponse
from azure.core.polling import LROPoller, NoPolling, PollingMethod
from azure.core.rest import HttpRequest, HttpResponse
from azure.core.tracing.decorator import distributed_trace
from azure.core.utils import case_insensitive_dict
from azure.mgmt.core.exceptions import ARMErrorFormat
from azure.mgmt.core.polling.arm_polling import ARMPolling
from .. import models as _models
from .._configuration import StorageActionsMgmtClientConfiguration
from .._utils.model_base import SdkJSONEncoder, _deserialize, _failsafe_deserialize
from .._utils.serialization import Deserializer, Serializer
T = TypeVar("T")
ClsType = Optional[Callable[[PipelineResponse[HttpRequest, HttpResponse], T, Dict[str, Any]], Any]]
JSON = MutableMapping[str, Any]
_SERIALIZER = Serializer()
_SERIALIZER.client_side_validation = False
def build_operations_list_request(**kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-01-01"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/providers/Microsoft.StorageActions/operations"
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_storage_tasks_get_request(
resource_group_name: str, storage_task_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-01-01"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.StorageActions/storageTasks/{storageTaskName}"
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
"resourceGroupName": _SERIALIZER.url("resource_group_name", resource_group_name, "str"),
"storageTaskName": _SERIALIZER.url("storage_task_name", storage_task_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_storage_tasks_create_request(
resource_group_name: str, storage_task_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-01-01"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.StorageActions/storageTasks/{storageTaskName}"
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
"resourceGroupName": _SERIALIZER.url("resource_group_name", resource_group_name, "str"),
"storageTaskName": _SERIALIZER.url("storage_task_name", storage_task_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="PUT", url=_url, params=_params, headers=_headers, **kwargs)
def build_storage_tasks_update_request(
resource_group_name: str, storage_task_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-01-01"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.StorageActions/storageTasks/{storageTaskName}"
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
"resourceGroupName": _SERIALIZER.url("resource_group_name", resource_group_name, "str"),
"storageTaskName": _SERIALIZER.url("storage_task_name", storage_task_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="PATCH", url=_url, params=_params, headers=_headers, **kwargs)
def build_storage_tasks_delete_request(
resource_group_name: str, storage_task_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-01-01"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.StorageActions/storageTasks/{storageTaskName}"
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
"resourceGroupName": _SERIALIZER.url("resource_group_name", resource_group_name, "str"),
"storageTaskName": _SERIALIZER.url("storage_task_name", storage_task_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="DELETE", url=_url, params=_params, headers=_headers, **kwargs)
def build_storage_tasks_list_by_resource_group_request( # pylint: disable=name-too-long
resource_group_name: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-01-01"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.StorageActions/storageTasks"
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
"resourceGroupName": _SERIALIZER.url("resource_group_name", resource_group_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_storage_tasks_list_by_subscription_request( # pylint: disable=name-too-long
subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-01-01"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/subscriptions/{subscriptionId}/providers/Microsoft.StorageActions/storageTasks"
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_storage_tasks_preview_actions_request( # pylint: disable=name-too-long
location: str, subscription_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-01-01"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/subscriptions/{subscriptionId}/providers/Microsoft.StorageActions/locations/{location}/previewActions"
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
"location": _SERIALIZER.url("location", location, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_storage_tasks_report_list_request(
resource_group_name: str,
storage_task_name: str,
subscription_id: str,
*,
maxpagesize: Optional[int] = None,
filter: Optional[str] = None,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-01-01"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.StorageActions/storageTasks/{storageTaskName}/reports"
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
"resourceGroupName": _SERIALIZER.url("resource_group_name", resource_group_name, "str"),
"storageTaskName": _SERIALIZER.url("storage_task_name", storage_task_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
if maxpagesize is not None:
_params["$maxpagesize"] = _SERIALIZER.query("maxpagesize", maxpagesize, "int")
if filter is not None:
_params["$filter"] = _SERIALIZER.query("filter", filter, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_storage_task_assignment_list_request( # pylint: disable=name-too-long
resource_group_name: str,
storage_task_name: str,
subscription_id: str,
*,
maxpagesize: Optional[int] = None,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-01-01"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.StorageActions/storageTasks/{storageTaskName}/storageTaskAssignments"
path_format_arguments = {
"subscriptionId": _SERIALIZER.url("subscription_id", subscription_id, "str"),
"resourceGroupName": _SERIALIZER.url("resource_group_name", resource_group_name, "str"),
"storageTaskName": _SERIALIZER.url("storage_task_name", storage_task_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
if maxpagesize is not None:
_params["$maxpagesize"] = _SERIALIZER.query("maxpagesize", maxpagesize, "int")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
[docs]
class Operations:
"""
.. warning::
**DO NOT** instantiate this class directly.
Instead, you should access the following operations through
:class:`~azure.mgmt.storageactions.StorageActionsMgmtClient`'s
:attr:`operations` attribute.
"""
def __init__(self, *args, **kwargs) -> None:
input_args = list(args)
self._client: PipelineClient = input_args.pop(0) if input_args else kwargs.pop("client")
self._config: StorageActionsMgmtClientConfiguration = input_args.pop(0) if input_args else kwargs.pop("config")
self._serialize: Serializer = input_args.pop(0) if input_args else kwargs.pop("serializer")
self._deserialize: Deserializer = input_args.pop(0) if input_args else kwargs.pop("deserializer")
[docs]
@distributed_trace
def list(self, **kwargs: Any) -> ItemPaged["_models.Operation"]:
"""Lists all of the available Storage Actions Rest API operations.
:return: An iterator like instance of Operation
:rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.storageactions.models.Operation]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[List[_models.Operation]] = kwargs.pop("cls", None)
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
_request = build_operations_list_request(
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.base_url", self._config.base_url, "str", skip_quote=True
),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
_request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.base_url", self._config.base_url, "str", skip_quote=True
),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
return _request
def extract_data(pipeline_response):
deserialized = pipeline_response.http_response.json()
list_of_elem = _deserialize(List[_models.Operation], deserialized.get("value", []))
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return deserialized.get("nextLink") or None, iter(list_of_elem)
def get_next(next_link=None):
_request = prepare_request(next_link)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = _failsafe_deserialize(_models.ErrorResponse, response.json())
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
return pipeline_response
return ItemPaged(get_next, extract_data)
[docs]
class StorageTasksOperations:
"""
.. warning::
**DO NOT** instantiate this class directly.
Instead, you should access the following operations through
:class:`~azure.mgmt.storageactions.StorageActionsMgmtClient`'s
:attr:`storage_tasks` attribute.
"""
def __init__(self, *args, **kwargs) -> None:
input_args = list(args)
self._client: PipelineClient = input_args.pop(0) if input_args else kwargs.pop("client")
self._config: StorageActionsMgmtClientConfiguration = input_args.pop(0) if input_args else kwargs.pop("config")
self._serialize: Serializer = input_args.pop(0) if input_args else kwargs.pop("serializer")
self._deserialize: Deserializer = input_args.pop(0) if input_args else kwargs.pop("deserializer")
[docs]
@distributed_trace
def get(self, resource_group_name: str, storage_task_name: str, **kwargs: Any) -> _models.StorageTask:
"""Get the storage task properties.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param storage_task_name: The name of the storage task within the specified resource group.
Storage task names must be between 3 and 18 characters in length and use numbers and lower-case
letters only. Required.
:type storage_task_name: str
:return: StorageTask. The StorageTask is compatible with MutableMapping
:rtype: ~azure.mgmt.storageactions.models.StorageTask
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[_models.StorageTask] = kwargs.pop("cls", None)
_request = build_storage_tasks_get_request(
resource_group_name=resource_group_name,
storage_task_name=storage_task_name,
subscription_id=self._config.subscription_id,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.base_url", self._config.base_url, "str", skip_quote=True),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = _failsafe_deserialize(_models.ErrorResponse, response.json())
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.StorageTask, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
def _create_initial(
self,
resource_group_name: str,
storage_task_name: str,
parameters: Union[_models.StorageTask, JSON, IO[bytes]],
**kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_content = None
if isinstance(parameters, (IOBase, bytes)):
_content = parameters
else:
_content = json.dumps(parameters, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore
_request = build_storage_tasks_create_request(
resource_group_name=resource_group_name,
storage_task_name=storage_task_name,
subscription_id=self._config.subscription_id,
content_type=content_type,
api_version=self._config.api_version,
content=_content,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.base_url", self._config.base_url, "str", skip_quote=True),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 201, 202]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = _failsafe_deserialize(_models.ErrorResponse, response.json())
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 201:
response_headers["Location"] = self._deserialize("str", response.headers.get("Location"))
response_headers["Retry-After"] = self._deserialize("int", response.headers.get("Retry-After"))
if response.status_code == 202:
response_headers["Location"] = self._deserialize("str", response.headers.get("Location"))
response_headers["Retry-After"] = self._deserialize("int", response.headers.get("Retry-After"))
deserialized = response.iter_bytes()
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
@overload
def begin_create(
self,
resource_group_name: str,
storage_task_name: str,
parameters: _models.StorageTask,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.StorageTask]:
"""Asynchronously creates a new storage task resource with the specified parameters. If a storage
task is already created and a subsequent create request is issued with different properties,
the storage task properties will be updated. If a storage task is already created and a
subsequent create or update request is issued with the exact same set of properties, the
request will succeed.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param storage_task_name: The name of the storage task within the specified resource group.
Storage task names must be between 3 and 18 characters in length and use numbers and lower-case
letters only. Required.
:type storage_task_name: str
:param parameters: The parameters to create a Storage Task. Required.
:type parameters: ~azure.mgmt.storageactions.models.StorageTask
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns StorageTask. The StorageTask is compatible with
MutableMapping
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.storageactions.models.StorageTask]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_create(
self,
resource_group_name: str,
storage_task_name: str,
parameters: JSON,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.StorageTask]:
"""Asynchronously creates a new storage task resource with the specified parameters. If a storage
task is already created and a subsequent create request is issued with different properties,
the storage task properties will be updated. If a storage task is already created and a
subsequent create or update request is issued with the exact same set of properties, the
request will succeed.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param storage_task_name: The name of the storage task within the specified resource group.
Storage task names must be between 3 and 18 characters in length and use numbers and lower-case
letters only. Required.
:type storage_task_name: str
:param parameters: The parameters to create a Storage Task. Required.
:type parameters: JSON
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns StorageTask. The StorageTask is compatible with
MutableMapping
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.storageactions.models.StorageTask]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_create(
self,
resource_group_name: str,
storage_task_name: str,
parameters: IO[bytes],
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.StorageTask]:
"""Asynchronously creates a new storage task resource with the specified parameters. If a storage
task is already created and a subsequent create request is issued with different properties,
the storage task properties will be updated. If a storage task is already created and a
subsequent create or update request is issued with the exact same set of properties, the
request will succeed.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param storage_task_name: The name of the storage task within the specified resource group.
Storage task names must be between 3 and 18 characters in length and use numbers and lower-case
letters only. Required.
:type storage_task_name: str
:param parameters: The parameters to create a Storage Task. Required.
:type parameters: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns StorageTask. The StorageTask is compatible with
MutableMapping
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.storageactions.models.StorageTask]
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def begin_create(
self,
resource_group_name: str,
storage_task_name: str,
parameters: Union[_models.StorageTask, JSON, IO[bytes]],
**kwargs: Any
) -> LROPoller[_models.StorageTask]:
"""Asynchronously creates a new storage task resource with the specified parameters. If a storage
task is already created and a subsequent create request is issued with different properties,
the storage task properties will be updated. If a storage task is already created and a
subsequent create or update request is issued with the exact same set of properties, the
request will succeed.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param storage_task_name: The name of the storage task within the specified resource group.
Storage task names must be between 3 and 18 characters in length and use numbers and lower-case
letters only. Required.
:type storage_task_name: str
:param parameters: The parameters to create a Storage Task. Is one of the following types:
StorageTask, JSON, IO[bytes] Required.
:type parameters: ~azure.mgmt.storageactions.models.StorageTask or JSON or IO[bytes]
:return: An instance of LROPoller that returns StorageTask. The StorageTask is compatible with
MutableMapping
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.storageactions.models.StorageTask]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.StorageTask] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._create_initial(
resource_group_name=resource_group_name,
storage_task_name=storage_task_name,
parameters=parameters,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response):
response = pipeline_response.http_response
deserialized = _deserialize(_models.StorageTask, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized
path_format_arguments = {
"endpoint": self._serialize.url("self._config.base_url", self._config.base_url, "str", skip_quote=True),
}
if polling is True:
polling_method: PollingMethod = cast(
PollingMethod, ARMPolling(lro_delay, path_format_arguments=path_format_arguments, **kwargs)
)
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[_models.StorageTask].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[_models.StorageTask](
self._client, raw_result, get_long_running_output, polling_method # type: ignore
)
def _update_initial(
self,
resource_group_name: str,
storage_task_name: str,
parameters: Union[_models.StorageTaskUpdateParameters, JSON, IO[bytes]],
**kwargs: Any
) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_content = None
if isinstance(parameters, (IOBase, bytes)):
_content = parameters
else:
_content = json.dumps(parameters, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore
_request = build_storage_tasks_update_request(
resource_group_name=resource_group_name,
storage_task_name=storage_task_name,
subscription_id=self._config.subscription_id,
content_type=content_type,
api_version=self._config.api_version,
content=_content,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.base_url", self._config.base_url, "str", skip_quote=True),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 202]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = _failsafe_deserialize(_models.ErrorResponse, response.json())
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["Azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("Azure-AsyncOperation")
)
response_headers["Location"] = self._deserialize("str", response.headers.get("Location"))
response_headers["Retry-After"] = self._deserialize("int", response.headers.get("Retry-After"))
deserialized = response.iter_bytes()
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
@overload
def begin_update(
self,
resource_group_name: str,
storage_task_name: str,
parameters: _models.StorageTaskUpdateParameters,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.StorageTask]:
"""Update storage task properties.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param storage_task_name: The name of the storage task within the specified resource group.
Storage task names must be between 3 and 18 characters in length and use numbers and lower-case
letters only. Required.
:type storage_task_name: str
:param parameters: The parameters to provide to update the storage task resource. Required.
:type parameters: ~azure.mgmt.storageactions.models.StorageTaskUpdateParameters
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns StorageTask. The StorageTask is compatible with
MutableMapping
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.storageactions.models.StorageTask]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_update(
self,
resource_group_name: str,
storage_task_name: str,
parameters: JSON,
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.StorageTask]:
"""Update storage task properties.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param storage_task_name: The name of the storage task within the specified resource group.
Storage task names must be between 3 and 18 characters in length and use numbers and lower-case
letters only. Required.
:type storage_task_name: str
:param parameters: The parameters to provide to update the storage task resource. Required.
:type parameters: JSON
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns StorageTask. The StorageTask is compatible with
MutableMapping
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.storageactions.models.StorageTask]
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def begin_update(
self,
resource_group_name: str,
storage_task_name: str,
parameters: IO[bytes],
*,
content_type: str = "application/json",
**kwargs: Any
) -> LROPoller[_models.StorageTask]:
"""Update storage task properties.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param storage_task_name: The name of the storage task within the specified resource group.
Storage task names must be between 3 and 18 characters in length and use numbers and lower-case
letters only. Required.
:type storage_task_name: str
:param parameters: The parameters to provide to update the storage task resource. Required.
:type parameters: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: An instance of LROPoller that returns StorageTask. The StorageTask is compatible with
MutableMapping
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.storageactions.models.StorageTask]
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def begin_update(
self,
resource_group_name: str,
storage_task_name: str,
parameters: Union[_models.StorageTaskUpdateParameters, JSON, IO[bytes]],
**kwargs: Any
) -> LROPoller[_models.StorageTask]:
"""Update storage task properties.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param storage_task_name: The name of the storage task within the specified resource group.
Storage task names must be between 3 and 18 characters in length and use numbers and lower-case
letters only. Required.
:type storage_task_name: str
:param parameters: The parameters to provide to update the storage task resource. Is one of the
following types: StorageTaskUpdateParameters, JSON, IO[bytes] Required.
:type parameters: ~azure.mgmt.storageactions.models.StorageTaskUpdateParameters or JSON or
IO[bytes]
:return: An instance of LROPoller that returns StorageTask. The StorageTask is compatible with
MutableMapping
:rtype: ~azure.core.polling.LROPoller[~azure.mgmt.storageactions.models.StorageTask]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.StorageTask] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._update_initial(
resource_group_name=resource_group_name,
storage_task_name=storage_task_name,
parameters=parameters,
content_type=content_type,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response):
response = pipeline_response.http_response
deserialized = _deserialize(_models.StorageTask, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized
path_format_arguments = {
"endpoint": self._serialize.url("self._config.base_url", self._config.base_url, "str", skip_quote=True),
}
if polling is True:
polling_method: PollingMethod = cast(
PollingMethod, ARMPolling(lro_delay, path_format_arguments=path_format_arguments, **kwargs)
)
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[_models.StorageTask].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[_models.StorageTask](
self._client, raw_result, get_long_running_output, polling_method # type: ignore
)
def _delete_initial(self, resource_group_name: str, storage_task_name: str, **kwargs: Any) -> Iterator[bytes]:
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[Iterator[bytes]] = kwargs.pop("cls", None)
_request = build_storage_tasks_delete_request(
resource_group_name=resource_group_name,
storage_task_name=storage_task_name,
subscription_id=self._config.subscription_id,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.base_url", self._config.base_url, "str", skip_quote=True),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = True
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [202, 204]:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = _failsafe_deserialize(_models.ErrorResponse, response.json())
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
response_headers = {}
if response.status_code == 202:
response_headers["Azure-AsyncOperation"] = self._deserialize(
"str", response.headers.get("Azure-AsyncOperation")
)
response_headers["Location"] = self._deserialize("str", response.headers.get("Location"))
response_headers["Retry-After"] = self._deserialize("int", response.headers.get("Retry-After"))
deserialized = response.iter_bytes()
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def begin_delete(self, resource_group_name: str, storage_task_name: str, **kwargs: Any) -> LROPoller[None]:
"""Delete the storage task resource.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param storage_task_name: The name of the storage task within the specified resource group.
Storage task names must be between 3 and 18 characters in length and use numbers and lower-case
letters only. Required.
:type storage_task_name: str
:return: An instance of LROPoller that returns None
:rtype: ~azure.core.polling.LROPoller[None]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[None] = kwargs.pop("cls", None)
polling: Union[bool, PollingMethod] = kwargs.pop("polling", True)
lro_delay = kwargs.pop("polling_interval", self._config.polling_interval)
cont_token: Optional[str] = kwargs.pop("continuation_token", None)
if cont_token is None:
raw_result = self._delete_initial(
resource_group_name=resource_group_name,
storage_task_name=storage_task_name,
cls=lambda x, y, z: x,
headers=_headers,
params=_params,
**kwargs
)
raw_result.http_response.read() # type: ignore
kwargs.pop("error_map", None)
def get_long_running_output(pipeline_response): # pylint: disable=inconsistent-return-statements
if cls:
return cls(pipeline_response, None, {}) # type: ignore
path_format_arguments = {
"endpoint": self._serialize.url("self._config.base_url", self._config.base_url, "str", skip_quote=True),
}
if polling is True:
polling_method: PollingMethod = cast(
PollingMethod, ARMPolling(lro_delay, path_format_arguments=path_format_arguments, **kwargs)
)
elif polling is False:
polling_method = cast(PollingMethod, NoPolling())
else:
polling_method = polling
if cont_token:
return LROPoller[None].from_continuation_token(
polling_method=polling_method,
continuation_token=cont_token,
client=self._client,
deserialization_callback=get_long_running_output,
)
return LROPoller[None](self._client, raw_result, get_long_running_output, polling_method) # type: ignore
[docs]
@distributed_trace
def list_by_resource_group(self, resource_group_name: str, **kwargs: Any) -> ItemPaged["_models.StorageTask"]:
"""Lists all the storage tasks available under the given resource group.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:return: An iterator like instance of StorageTask
:rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.storageactions.models.StorageTask]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[List[_models.StorageTask]] = kwargs.pop("cls", None)
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
_request = build_storage_tasks_list_by_resource_group_request(
resource_group_name=resource_group_name,
subscription_id=self._config.subscription_id,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.base_url", self._config.base_url, "str", skip_quote=True
),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
_request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.base_url", self._config.base_url, "str", skip_quote=True
),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
return _request
def extract_data(pipeline_response):
deserialized = pipeline_response.http_response.json()
list_of_elem = _deserialize(List[_models.StorageTask], deserialized.get("value", []))
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return deserialized.get("nextLink") or None, iter(list_of_elem)
def get_next(next_link=None):
_request = prepare_request(next_link)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = _failsafe_deserialize(_models.ErrorResponse, response.json())
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
return pipeline_response
return ItemPaged(get_next, extract_data)
[docs]
@distributed_trace
def list_by_subscription(self, **kwargs: Any) -> ItemPaged["_models.StorageTask"]:
"""Lists all the storage tasks available under the subscription.
:return: An iterator like instance of StorageTask
:rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.storageactions.models.StorageTask]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[List[_models.StorageTask]] = kwargs.pop("cls", None)
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
_request = build_storage_tasks_list_by_subscription_request(
subscription_id=self._config.subscription_id,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.base_url", self._config.base_url, "str", skip_quote=True
),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
_request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.base_url", self._config.base_url, "str", skip_quote=True
),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
return _request
def extract_data(pipeline_response):
deserialized = pipeline_response.http_response.json()
list_of_elem = _deserialize(List[_models.StorageTask], deserialized.get("value", []))
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return deserialized.get("nextLink") or None, iter(list_of_elem)
def get_next(next_link=None):
_request = prepare_request(next_link)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = _failsafe_deserialize(_models.ErrorResponse, response.json())
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
return pipeline_response
return ItemPaged(get_next, extract_data)
@overload
def preview_actions(
self,
location: str,
parameters: _models.StorageTaskPreviewAction,
*,
content_type: str = "application/json",
**kwargs: Any
) -> _models.StorageTaskPreviewAction:
"""Runs the input conditions against input object metadata properties and designates matched
objects in response.
:param location: Required.
:type location: str
:param parameters: The parameters to preview action condition. Required.
:type parameters: ~azure.mgmt.storageactions.models.StorageTaskPreviewAction
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: StorageTaskPreviewAction. The StorageTaskPreviewAction is compatible with
MutableMapping
:rtype: ~azure.mgmt.storageactions.models.StorageTaskPreviewAction
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def preview_actions(
self, location: str, parameters: JSON, *, content_type: str = "application/json", **kwargs: Any
) -> _models.StorageTaskPreviewAction:
"""Runs the input conditions against input object metadata properties and designates matched
objects in response.
:param location: Required.
:type location: str
:param parameters: The parameters to preview action condition. Required.
:type parameters: JSON
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: StorageTaskPreviewAction. The StorageTaskPreviewAction is compatible with
MutableMapping
:rtype: ~azure.mgmt.storageactions.models.StorageTaskPreviewAction
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def preview_actions(
self, location: str, parameters: IO[bytes], *, content_type: str = "application/json", **kwargs: Any
) -> _models.StorageTaskPreviewAction:
"""Runs the input conditions against input object metadata properties and designates matched
objects in response.
:param location: Required.
:type location: str
:param parameters: The parameters to preview action condition. Required.
:type parameters: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: StorageTaskPreviewAction. The StorageTaskPreviewAction is compatible with
MutableMapping
:rtype: ~azure.mgmt.storageactions.models.StorageTaskPreviewAction
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def preview_actions(
self, location: str, parameters: Union[_models.StorageTaskPreviewAction, JSON, IO[bytes]], **kwargs: Any
) -> _models.StorageTaskPreviewAction:
"""Runs the input conditions against input object metadata properties and designates matched
objects in response.
:param location: Required.
:type location: str
:param parameters: The parameters to preview action condition. Is one of the following types:
StorageTaskPreviewAction, JSON, IO[bytes] Required.
:type parameters: ~azure.mgmt.storageactions.models.StorageTaskPreviewAction or JSON or
IO[bytes]
:return: StorageTaskPreviewAction. The StorageTaskPreviewAction is compatible with
MutableMapping
:rtype: ~azure.mgmt.storageactions.models.StorageTaskPreviewAction
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.StorageTaskPreviewAction] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_content = None
if isinstance(parameters, (IOBase, bytes)):
_content = parameters
else:
_content = json.dumps(parameters, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore
_request = build_storage_tasks_preview_actions_request(
location=location,
subscription_id=self._config.subscription_id,
content_type=content_type,
api_version=self._config.api_version,
content=_content,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.base_url", self._config.base_url, "str", skip_quote=True),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = _failsafe_deserialize(_models.ErrorResponse, response.json())
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.StorageTaskPreviewAction, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
[docs]
class StorageTasksReportOperations:
"""
.. warning::
**DO NOT** instantiate this class directly.
Instead, you should access the following operations through
:class:`~azure.mgmt.storageactions.StorageActionsMgmtClient`'s
:attr:`storage_tasks_report` attribute.
"""
def __init__(self, *args, **kwargs) -> None:
input_args = list(args)
self._client: PipelineClient = input_args.pop(0) if input_args else kwargs.pop("client")
self._config: StorageActionsMgmtClientConfiguration = input_args.pop(0) if input_args else kwargs.pop("config")
self._serialize: Serializer = input_args.pop(0) if input_args else kwargs.pop("serializer")
self._deserialize: Deserializer = input_args.pop(0) if input_args else kwargs.pop("deserializer")
[docs]
@distributed_trace
def list(
self, resource_group_name: str, storage_task_name: str, *, filter: Optional[str] = None, **kwargs: Any
) -> ItemPaged["_models.StorageTaskReportInstance"]:
"""Fetch the storage tasks run report summary for each assignment.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param storage_task_name: The name of the storage task within the specified resource group.
Storage task names must be between 3 and 18 characters in length and use numbers and lower-case
letters only. Required.
:type storage_task_name: str
:keyword filter: Optional. When specified, it can be used to query using reporting properties.
Default value is None.
:paramtype filter: str
:return: An iterator like instance of StorageTaskReportInstance
:rtype:
~azure.core.paging.ItemPaged[~azure.mgmt.storageactions.models.StorageTaskReportInstance]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
maxpagesize = kwargs.pop("maxpagesize", None)
cls: ClsType[List[_models.StorageTaskReportInstance]] = kwargs.pop("cls", None)
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
_request = build_storage_tasks_report_list_request(
resource_group_name=resource_group_name,
storage_task_name=storage_task_name,
subscription_id=self._config.subscription_id,
maxpagesize=maxpagesize,
filter=filter,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.base_url", self._config.base_url, "str", skip_quote=True
),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
_request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.base_url", self._config.base_url, "str", skip_quote=True
),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
return _request
def extract_data(pipeline_response):
deserialized = pipeline_response.http_response.json()
list_of_elem = _deserialize(List[_models.StorageTaskReportInstance], deserialized.get("value", []))
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return deserialized.get("nextLink") or None, iter(list_of_elem)
def get_next(next_link=None):
_request = prepare_request(next_link)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = _failsafe_deserialize(_models.ErrorResponse, response.json())
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
return pipeline_response
return ItemPaged(get_next, extract_data)
[docs]
class StorageTaskAssignmentOperations:
"""
.. warning::
**DO NOT** instantiate this class directly.
Instead, you should access the following operations through
:class:`~azure.mgmt.storageactions.StorageActionsMgmtClient`'s
:attr:`storage_task_assignment` attribute.
"""
def __init__(self, *args, **kwargs) -> None:
input_args = list(args)
self._client: PipelineClient = input_args.pop(0) if input_args else kwargs.pop("client")
self._config: StorageActionsMgmtClientConfiguration = input_args.pop(0) if input_args else kwargs.pop("config")
self._serialize: Serializer = input_args.pop(0) if input_args else kwargs.pop("serializer")
self._deserialize: Deserializer = input_args.pop(0) if input_args else kwargs.pop("deserializer")
[docs]
@distributed_trace
def list(
self, resource_group_name: str, storage_task_name: str, **kwargs: Any
) -> ItemPaged["_models.StorageTaskAssignment"]:
"""Lists Resource IDs of the Storage Task Assignments associated with this Storage Task.
:param resource_group_name: The name of the resource group. The name is case insensitive.
Required.
:type resource_group_name: str
:param storage_task_name: The name of the storage task within the specified resource group.
Storage task names must be between 3 and 18 characters in length and use numbers and lower-case
letters only. Required.
:type storage_task_name: str
:return: An iterator like instance of StorageTaskAssignment
:rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.storageactions.models.StorageTaskAssignment]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
maxpagesize = kwargs.pop("maxpagesize", None)
cls: ClsType[List[_models.StorageTaskAssignment]] = kwargs.pop("cls", None)
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
_request = build_storage_task_assignment_list_request(
resource_group_name=resource_group_name,
storage_task_name=storage_task_name,
subscription_id=self._config.subscription_id,
maxpagesize=maxpagesize,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.base_url", self._config.base_url, "str", skip_quote=True
),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
_request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.base_url", self._config.base_url, "str", skip_quote=True
),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
return _request
def extract_data(pipeline_response):
deserialized = pipeline_response.http_response.json()
list_of_elem = _deserialize(List[_models.StorageTaskAssignment], deserialized.get("value", []))
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return deserialized.get("nextLink") or None, iter(list_of_elem)
def get_next(next_link=None):
_request = prepare_request(next_link)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = _failsafe_deserialize(_models.ErrorResponse, response.json())
raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
return pipeline_response
return ItemPaged(get_next, extract_data)