# pylint: disable=too-many-lines
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
from io import IOBase
import sys
from typing import Any, Callable, Dict, IO, Iterable, List, Optional, TypeVar, Union, cast, overload
import urllib.parse
from azure.core.exceptions import (
ClientAuthenticationError,
HttpResponseError,
ResourceExistsError,
ResourceNotFoundError,
ResourceNotModifiedError,
map_error,
)
from azure.core.paging import ItemPaged
from azure.core.pipeline import PipelineResponse
from azure.core.rest import HttpRequest, HttpResponse
from azure.core.tracing.decorator import distributed_trace
from azure.core.utils import case_insensitive_dict
from .._serialization import Serializer
if sys.version_info >= (3, 9):
from collections.abc import MutableMapping
else:
from typing import MutableMapping # type: ignore # pylint: disable=ungrouped-imports
JSON = MutableMapping[str, Any] # pylint: disable=unsubscriptable-object
T = TypeVar("T")
ClsType = Optional[Callable[[PipelineResponse[HttpRequest, HttpResponse], T, Dict[str, Any]], Any]]
_SERIALIZER = Serializer()
_SERIALIZER.client_side_validation = False
def build_workflows_list_request(**kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/workflows"
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_workflow_get_request(workflow_id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/workflows/{workflowId}"
path_format_arguments = {
"workflowId": _SERIALIZER.url("workflow_id", workflow_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_workflow_create_or_replace_request(workflow_id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/workflows/{workflowId}"
path_format_arguments = {
"workflowId": _SERIALIZER.url("workflow_id", workflow_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="PUT", url=_url, params=_params, headers=_headers, **kwargs)
def build_workflow_delete_request(workflow_id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/workflows/{workflowId}"
path_format_arguments = {
"workflowId": _SERIALIZER.url("workflow_id", workflow_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="DELETE", url=_url, params=_params, headers=_headers, **kwargs)
def build_workflow_validate_request(workflow_id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/workflows/{workflowId}/validate"
path_format_arguments = {
"workflowId": _SERIALIZER.url("workflow_id", workflow_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_user_requests_submit_request(**kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/userrequests"
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_workflow_runs_list_request(
*,
view_mode: Optional[str] = None,
time_window: Optional[str] = None,
orderby: Optional[str] = None,
run_statuses: Optional[List[str]] = None,
workflow_ids: Optional[List[str]] = None,
requestors: Optional[List[str]] = None,
maxpagesize: Optional[int] = None,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/workflowruns"
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
if view_mode is not None:
_params["viewMode"] = _SERIALIZER.query("view_mode", view_mode, "str")
if time_window is not None:
_params["timeWindow"] = _SERIALIZER.query("time_window", time_window, "str")
if orderby is not None:
_params["orderby"] = _SERIALIZER.query("orderby", orderby, "str")
if run_statuses is not None:
_params["runStatuses"] = _SERIALIZER.query("run_statuses", run_statuses, "[str]", div=",")
if workflow_ids is not None:
_params["workflowIds"] = _SERIALIZER.query("workflow_ids", workflow_ids, "[str]", div=",")
if requestors is not None:
_params["requestors"] = _SERIALIZER.query("requestors", requestors, "[str]", div=",")
if maxpagesize is not None:
_params["maxpagesize"] = _SERIALIZER.query("maxpagesize", maxpagesize, "int")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_workflow_run_get_request(workflow_run_id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/workflowruns/{workflowRunId}"
path_format_arguments = {
"workflowRunId": _SERIALIZER.url("workflow_run_id", workflow_run_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_workflow_run_cancel_request(workflow_run_id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/workflowruns/{workflowRunId}/cancel"
path_format_arguments = {
"workflowRunId": _SERIALIZER.url("workflow_run_id", workflow_run_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_workflow_tasks_list_request(
*,
view_mode: Optional[str] = None,
workflow_ids: Optional[List[str]] = None,
time_window: Optional[str] = None,
maxpagesize: Optional[int] = None,
orderby: Optional[str] = None,
task_types: Optional[List[str]] = None,
task_statuses: Optional[List[str]] = None,
requestors: Optional[List[str]] = None,
assignees: Optional[List[str]] = None,
workflow_name_keyword: Optional[str] = None,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/workflowtasks"
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
if view_mode is not None:
_params["viewMode"] = _SERIALIZER.query("view_mode", view_mode, "str")
if workflow_ids is not None:
_params["workflowIds"] = _SERIALIZER.query("workflow_ids", workflow_ids, "[str]", div=",")
if time_window is not None:
_params["timeWindow"] = _SERIALIZER.query("time_window", time_window, "str")
if maxpagesize is not None:
_params["maxpagesize"] = _SERIALIZER.query("maxpagesize", maxpagesize, "int")
if orderby is not None:
_params["orderby"] = _SERIALIZER.query("orderby", orderby, "str")
if task_types is not None:
_params["taskTypes"] = _SERIALIZER.query("task_types", task_types, "[str]", div=",")
if task_statuses is not None:
_params["taskStatuses"] = _SERIALIZER.query("task_statuses", task_statuses, "[str]", div=",")
if requestors is not None:
_params["requestors"] = _SERIALIZER.query("requestors", requestors, "[str]", div=",")
if assignees is not None:
_params["assignees"] = _SERIALIZER.query("assignees", assignees, "[str]", div=",")
if workflow_name_keyword is not None:
_params["workflowNameKeyword"] = _SERIALIZER.query("workflow_name_keyword", workflow_name_keyword, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_workflow_task_get_request(task_id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/workflowtasks/{taskId}"
path_format_arguments = {
"taskId": _SERIALIZER.url("task_id", task_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, headers=_headers, **kwargs)
def build_workflow_task_reassign_request(task_id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/workflowtasks/{taskId}/reassign"
path_format_arguments = {
"taskId": _SERIALIZER.url("task_id", task_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_approval_approve_request(task_id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/workflowtasks/{taskId}/approve-approval"
path_format_arguments = {
"taskId": _SERIALIZER.url("task_id", task_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_approval_reject_request(task_id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/workflowtasks/{taskId}/reject-approval"
path_format_arguments = {
"taskId": _SERIALIZER.url("task_id", task_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_task_status_update_request(task_id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/workflowtasks/{taskId}/change-task-status"
path_format_arguments = {
"taskId": _SERIALIZER.url("task_id", task_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
[docs]class WorkflowsOperations:
"""
.. warning::
**DO NOT** instantiate this class directly.
Instead, you should access the following operations through
:class:`~azure.purview.workflow.PurviewWorkflowClient`'s
:attr:`workflows` attribute.
"""
def __init__(self, *args, **kwargs):
input_args = list(args)
self._client = input_args.pop(0) if input_args else kwargs.pop("client")
self._config = input_args.pop(0) if input_args else kwargs.pop("config")
self._serialize = input_args.pop(0) if input_args else kwargs.pop("serializer")
self._deserialize = input_args.pop(0) if input_args else kwargs.pop("deserializer")
[docs] @distributed_trace
def list(self, **kwargs: Any) -> Iterable[JSON]:
"""List all workflows.
:return: An iterator like instance of JSON object
:rtype: ~azure.core.paging.ItemPaged[JSON]
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# response body for status code(s): 200
response == {
"description": "str", # Description of a workflow. Required.
"id": "str", # The id of workflow. Required.
"isEnabled": bool, # Whether the workflow is enabled or not. Required.
"name": "str", # The name of a workflow. Required.
"triggers": [
{
"type": "str", # Required. Known values are:
"when_term_creation_is_requested", "when_term_deletion_is_requested",
"when_term_update_is_requested", "when_terms_import_is_requested",
"when_data_access_grant_is_requested", and
"when_asset_update_is_requested".
"underCollection": "str", # Optional. The collection name.
"underGlossary": "str", # Optional. The glossary guid.
"underGlossaryHierarchy": "str" # Optional. Glossary term
hierarchy path.
}
],
"createdBy": "str", # Optional. The person who created the workflow.
"createdTime": "2020-02-20 00:00:00", # Optional. The created time of
workflow.
"lastUpdateTime": "2020-02-20 00:00:00", # Optional. The last update time.
"updatedBy": "str" # Optional. The person who updated the workflow.
}
"""
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[JSON] = kwargs.pop("cls", None)
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
request = build_workflows_list_request(
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.endpoint", self._config.endpoint, "str", skip_quote=True
),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.endpoint", self._config.endpoint, "str", skip_quote=True
),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
return request
def extract_data(pipeline_response):
deserialized = pipeline_response.http_response.json()
list_of_elem = deserialized["value"]
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return deserialized.get("nextLink") or None, iter(list_of_elem)
def get_next(next_link=None):
request = prepare_request(next_link)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
response.read() # Load the body in memory and close the socket
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
return pipeline_response
return ItemPaged(get_next, extract_data)
[docs]class WorkflowOperations:
"""
.. warning::
**DO NOT** instantiate this class directly.
Instead, you should access the following operations through
:class:`~azure.purview.workflow.PurviewWorkflowClient`'s
:attr:`workflow` attribute.
"""
def __init__(self, *args, **kwargs):
input_args = list(args)
self._client = input_args.pop(0) if input_args else kwargs.pop("client")
self._config = input_args.pop(0) if input_args else kwargs.pop("config")
self._serialize = input_args.pop(0) if input_args else kwargs.pop("serializer")
self._deserialize = input_args.pop(0) if input_args else kwargs.pop("deserializer")
[docs] @distributed_trace
def get(self, workflow_id: str, **kwargs: Any) -> JSON:
"""Get a specific workflow.
:param workflow_id: The workflow id. Required.
:type workflow_id: str
:return: JSON object
:rtype: JSON
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# response body for status code(s): 200
response == {
"actionDag": {}, # The action DAG(Directed Acyclic Graph), it defines steps
to be executed in a workflow run and their order. Required.
"description": "str", # Description of a workflow. Required.
"id": "str", # The id of workflow. Required.
"isEnabled": bool, # Whether the workflow is enabled or not. Required.
"name": "str", # The name of a workflow. Required.
"triggers": [
{
"type": "str", # Required. Known values are:
"when_term_creation_is_requested", "when_term_deletion_is_requested",
"when_term_update_is_requested", "when_terms_import_is_requested",
"when_data_access_grant_is_requested", and
"when_asset_update_is_requested".
"underCollection": "str", # Optional. The collection name.
"underGlossary": "str", # Optional. The glossary guid.
"underGlossaryHierarchy": "str" # Optional. Glossary term
hierarchy path.
}
],
"createdBy": "str", # Optional. The person who created the workflow.
"createdTime": "2020-02-20 00:00:00", # Optional. The created time of
workflow.
"lastUpdateTime": "2020-02-20 00:00:00", # Optional. The last update time.
"updatedBy": "str" # Optional. The person who updated the workflow.
}
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[JSON] = kwargs.pop("cls", None)
request = build_workflow_get_request(
workflow_id=workflow_id,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
response.read() # Load the body in memory and close the socket
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, cast(JSON, deserialized), {})
return cast(JSON, deserialized)
@overload
def create_or_replace(
self,
workflow_id: str,
workflow_create_or_update_command: JSON,
*,
content_type: str = "application/json",
**kwargs: Any
) -> JSON:
"""Create or replace a workflow.
:param workflow_id: The workflow id. Required.
:type workflow_id: str
:param workflow_create_or_update_command: Create or update workflow payload. Required.
:type workflow_create_or_update_command: JSON
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: JSON object
:rtype: JSON
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# JSON input template you can fill out and use as your body input.
workflow_create_or_update_command = {
"description": "str", # Description of a workflow. Required.
"isEnabled": bool, # Whether the workflow enabled or not. Required.
"name": "str", # The workflow name. Required.
"triggers": [
{
"type": "str", # Required. Known values are:
"when_term_creation_is_requested", "when_term_deletion_is_requested",
"when_term_update_is_requested", "when_terms_import_is_requested",
"when_data_access_grant_is_requested", and
"when_asset_update_is_requested".
"underCollection": "str", # Optional. The collection name.
"underGlossary": "str", # Optional. The glossary guid.
"underGlossaryHierarchy": "str" # Optional. Glossary term
hierarchy path.
}
],
"actionDag": {} # Optional. The action DAG(Directed Acyclic Graph), it
defines actual flow.
}
# response body for status code(s): 200
response == {
"actionDag": {}, # The action DAG(Directed Acyclic Graph), it defines steps
to be executed in a workflow run and their order. Required.
"description": "str", # Description of a workflow. Required.
"id": "str", # The id of workflow. Required.
"isEnabled": bool, # Whether the workflow is enabled or not. Required.
"name": "str", # The name of a workflow. Required.
"triggers": [
{
"type": "str", # Required. Known values are:
"when_term_creation_is_requested", "when_term_deletion_is_requested",
"when_term_update_is_requested", "when_terms_import_is_requested",
"when_data_access_grant_is_requested", and
"when_asset_update_is_requested".
"underCollection": "str", # Optional. The collection name.
"underGlossary": "str", # Optional. The glossary guid.
"underGlossaryHierarchy": "str" # Optional. Glossary term
hierarchy path.
}
],
"createdBy": "str", # Optional. The person who created the workflow.
"createdTime": "2020-02-20 00:00:00", # Optional. The created time of
workflow.
"lastUpdateTime": "2020-02-20 00:00:00", # Optional. The last update time.
"updatedBy": "str" # Optional. The person who updated the workflow.
}
"""
@overload
def create_or_replace(
self,
workflow_id: str,
workflow_create_or_update_command: IO,
*,
content_type: str = "application/json",
**kwargs: Any
) -> JSON:
"""Create or replace a workflow.
:param workflow_id: The workflow id. Required.
:type workflow_id: str
:param workflow_create_or_update_command: Create or update workflow payload. Required.
:type workflow_create_or_update_command: IO
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: JSON object
:rtype: JSON
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# response body for status code(s): 200
response == {
"actionDag": {}, # The action DAG(Directed Acyclic Graph), it defines steps
to be executed in a workflow run and their order. Required.
"description": "str", # Description of a workflow. Required.
"id": "str", # The id of workflow. Required.
"isEnabled": bool, # Whether the workflow is enabled or not. Required.
"name": "str", # The name of a workflow. Required.
"triggers": [
{
"type": "str", # Required. Known values are:
"when_term_creation_is_requested", "when_term_deletion_is_requested",
"when_term_update_is_requested", "when_terms_import_is_requested",
"when_data_access_grant_is_requested", and
"when_asset_update_is_requested".
"underCollection": "str", # Optional. The collection name.
"underGlossary": "str", # Optional. The glossary guid.
"underGlossaryHierarchy": "str" # Optional. Glossary term
hierarchy path.
}
],
"createdBy": "str", # Optional. The person who created the workflow.
"createdTime": "2020-02-20 00:00:00", # Optional. The created time of
workflow.
"lastUpdateTime": "2020-02-20 00:00:00", # Optional. The last update time.
"updatedBy": "str" # Optional. The person who updated the workflow.
}
"""
[docs] @distributed_trace
def create_or_replace(
self, workflow_id: str, workflow_create_or_update_command: Union[JSON, IO], **kwargs: Any
) -> JSON:
"""Create or replace a workflow.
:param workflow_id: The workflow id. Required.
:type workflow_id: str
:param workflow_create_or_update_command: Create or update workflow payload. Is either a JSON
type or a IO type. Required.
:type workflow_create_or_update_command: JSON or IO
:keyword content_type: Body Parameter content-type. Known values are: 'application/json'.
Default value is None.
:paramtype content_type: str
:return: JSON object
:rtype: JSON
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# JSON input template you can fill out and use as your body input.
workflow_create_or_update_command = {
"description": "str", # Description of a workflow. Required.
"isEnabled": bool, # Whether the workflow enabled or not. Required.
"name": "str", # The workflow name. Required.
"triggers": [
{
"type": "str", # Required. Known values are:
"when_term_creation_is_requested", "when_term_deletion_is_requested",
"when_term_update_is_requested", "when_terms_import_is_requested",
"when_data_access_grant_is_requested", and
"when_asset_update_is_requested".
"underCollection": "str", # Optional. The collection name.
"underGlossary": "str", # Optional. The glossary guid.
"underGlossaryHierarchy": "str" # Optional. Glossary term
hierarchy path.
}
],
"actionDag": {} # Optional. The action DAG(Directed Acyclic Graph), it
defines actual flow.
}
# response body for status code(s): 200
response == {
"actionDag": {}, # The action DAG(Directed Acyclic Graph), it defines steps
to be executed in a workflow run and their order. Required.
"description": "str", # Description of a workflow. Required.
"id": "str", # The id of workflow. Required.
"isEnabled": bool, # Whether the workflow is enabled or not. Required.
"name": "str", # The name of a workflow. Required.
"triggers": [
{
"type": "str", # Required. Known values are:
"when_term_creation_is_requested", "when_term_deletion_is_requested",
"when_term_update_is_requested", "when_terms_import_is_requested",
"when_data_access_grant_is_requested", and
"when_asset_update_is_requested".
"underCollection": "str", # Optional. The collection name.
"underGlossary": "str", # Optional. The glossary guid.
"underGlossaryHierarchy": "str" # Optional. Glossary term
hierarchy path.
}
],
"createdBy": "str", # Optional. The person who created the workflow.
"createdTime": "2020-02-20 00:00:00", # Optional. The created time of
workflow.
"lastUpdateTime": "2020-02-20 00:00:00", # Optional. The last update time.
"updatedBy": "str" # Optional. The person who updated the workflow.
}
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[JSON] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(workflow_create_or_update_command, (IOBase, bytes)):
_content = workflow_create_or_update_command
else:
_json = workflow_create_or_update_command
request = build_workflow_create_or_replace_request(
workflow_id=workflow_id,
content_type=content_type,
api_version=self._config.api_version,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
response.read() # Load the body in memory and close the socket
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, cast(JSON, deserialized), {})
return cast(JSON, deserialized)
[docs] @distributed_trace
def delete(self, workflow_id: str, **kwargs: Any) -> None: # pylint: disable=inconsistent-return-statements
"""Delete a workflow.
:param workflow_id: The workflow id. Required.
:type workflow_id: str
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[None] = kwargs.pop("cls", None)
request = build_workflow_delete_request(
workflow_id=workflow_id,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [204]:
if _stream:
response.read() # Load the body in memory and close the socket
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if cls:
return cls(pipeline_response, None, {})
@overload
def validate(
self, workflow_id: str, workflow_validate_query: JSON, *, content_type: str = "application/json", **kwargs: Any
) -> JSON:
"""Validate a workflow.
:param workflow_id: The workflow id. Required.
:type workflow_id: str
:param workflow_validate_query: Check workflow payload. Required.
:type workflow_validate_query: JSON
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: JSON object
:rtype: JSON
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# JSON input template you can fill out and use as your body input.
workflow_validate_query = {
"description": "str", # Description of a workflow. Required.
"isEnabled": bool, # Whether the workflow enabled or not. Required.
"name": "str", # The workflow name. Required.
"triggers": [
{
"type": "str", # Required. Known values are:
"when_term_creation_is_requested", "when_term_deletion_is_requested",
"when_term_update_is_requested", "when_terms_import_is_requested",
"when_data_access_grant_is_requested", and
"when_asset_update_is_requested".
"underCollection": "str", # Optional. The collection name.
"underGlossary": "str", # Optional. The glossary guid.
"underGlossaryHierarchy": "str" # Optional. Glossary term
hierarchy path.
}
],
"actionDag": {} # Optional. The action DAG(Directed Acyclic Graph), it
defines actual flow.
}
# response body for status code(s): 200
response == {
"value": [
{
"location": {
"type": "str", # The validation violation location
type. Required. Known values are: "workflow", "action", and
"actionParameter".
"actionName": "str", # Optional. The name of the
action where the violation happens.
"parameterKey": "str" # Optional. The key of the
action parameter where the violation happens.
},
"message": "str", # The detail about how the validation rule
is violated. Required.
"severity": "str" # The severity of the validation rule.
Required. Known values are: "error" and "warning".
}
]
}
"""
@overload
def validate(
self, workflow_id: str, workflow_validate_query: IO, *, content_type: str = "application/json", **kwargs: Any
) -> JSON:
"""Validate a workflow.
:param workflow_id: The workflow id. Required.
:type workflow_id: str
:param workflow_validate_query: Check workflow payload. Required.
:type workflow_validate_query: IO
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: JSON object
:rtype: JSON
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# response body for status code(s): 200
response == {
"value": [
{
"location": {
"type": "str", # The validation violation location
type. Required. Known values are: "workflow", "action", and
"actionParameter".
"actionName": "str", # Optional. The name of the
action where the violation happens.
"parameterKey": "str" # Optional. The key of the
action parameter where the violation happens.
},
"message": "str", # The detail about how the validation rule
is violated. Required.
"severity": "str" # The severity of the validation rule.
Required. Known values are: "error" and "warning".
}
]
}
"""
[docs] @distributed_trace
def validate(self, workflow_id: str, workflow_validate_query: Union[JSON, IO], **kwargs: Any) -> JSON:
"""Validate a workflow.
:param workflow_id: The workflow id. Required.
:type workflow_id: str
:param workflow_validate_query: Check workflow payload. Is either a JSON type or a IO type.
Required.
:type workflow_validate_query: JSON or IO
:keyword content_type: Body Parameter content-type. Known values are: 'application/json'.
Default value is None.
:paramtype content_type: str
:return: JSON object
:rtype: JSON
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# JSON input template you can fill out and use as your body input.
workflow_validate_query = {
"description": "str", # Description of a workflow. Required.
"isEnabled": bool, # Whether the workflow enabled or not. Required.
"name": "str", # The workflow name. Required.
"triggers": [
{
"type": "str", # Required. Known values are:
"when_term_creation_is_requested", "when_term_deletion_is_requested",
"when_term_update_is_requested", "when_terms_import_is_requested",
"when_data_access_grant_is_requested", and
"when_asset_update_is_requested".
"underCollection": "str", # Optional. The collection name.
"underGlossary": "str", # Optional. The glossary guid.
"underGlossaryHierarchy": "str" # Optional. Glossary term
hierarchy path.
}
],
"actionDag": {} # Optional. The action DAG(Directed Acyclic Graph), it
defines actual flow.
}
# response body for status code(s): 200
response == {
"value": [
{
"location": {
"type": "str", # The validation violation location
type. Required. Known values are: "workflow", "action", and
"actionParameter".
"actionName": "str", # Optional. The name of the
action where the violation happens.
"parameterKey": "str" # Optional. The key of the
action parameter where the violation happens.
},
"message": "str", # The detail about how the validation rule
is violated. Required.
"severity": "str" # The severity of the validation rule.
Required. Known values are: "error" and "warning".
}
]
}
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[JSON] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(workflow_validate_query, (IOBase, bytes)):
_content = workflow_validate_query
else:
_json = workflow_validate_query
request = build_workflow_validate_request(
workflow_id=workflow_id,
content_type=content_type,
api_version=self._config.api_version,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
response.read() # Load the body in memory and close the socket
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, cast(JSON, deserialized), {})
return cast(JSON, deserialized)
[docs]class UserRequestsOperations:
"""
.. warning::
**DO NOT** instantiate this class directly.
Instead, you should access the following operations through
:class:`~azure.purview.workflow.PurviewWorkflowClient`'s
:attr:`user_requests` attribute.
"""
def __init__(self, *args, **kwargs):
input_args = list(args)
self._client = input_args.pop(0) if input_args else kwargs.pop("client")
self._config = input_args.pop(0) if input_args else kwargs.pop("config")
self._serialize = input_args.pop(0) if input_args else kwargs.pop("serializer")
self._deserialize = input_args.pop(0) if input_args else kwargs.pop("deserializer")
@overload
def submit(self, user_requests_payload: JSON, *, content_type: str = "application/json", **kwargs: Any) -> JSON:
"""Submit a user request for requestor, a user request describes user ask to do operation(s) on
Purview. If any workflow's trigger matches with an operation in request, a run of the workflow
is created.
:param user_requests_payload: The payload of submitting a user request. Required.
:type user_requests_payload: JSON
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: JSON object
:rtype: JSON
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# JSON input template you can fill out and use as your body input.
user_requests_payload = {
"operations": [
{
"payload": {}, # The payload of each operation which user
want to submit. Required.
"type": "str" # The operation type. Required. Known values
are: "CreateTerm", "UpdateTerm", "DeleteTerm", "ImportTerms",
"UpdateAsset", and "GrantDataAccess".
}
],
"comment": "str" # Optional. The comment when submit a user request.
}
# response body for status code(s): 200
response == {
"operations": [
{
"payload": {}, # The payload of each operation which user
want to submit. Required.
"type": "str", # The operation type. Required. Known values
are: "CreateTerm", "UpdateTerm", "DeleteTerm", "ImportTerms",
"UpdateAsset", and "GrantDataAccess".
"workflowRunIds": [
"str" # Optional. The list of operations user want
to submit, each operation matches one Purview API call and will do
the operation directly. Required.
]
}
],
"requestId": "str", # The user request id. Required.
"requestor": "str", # The person who submitted the user request. Required.
"status": "str", # The status. Required. Known values are: "NotStarted",
"InProgress", "Failed", "Completed", "Canceling", "CancellationFailed", and
"Canceled".
"comment": "str" # Optional. The comment when submit a user request.
}
"""
@overload
def submit(self, user_requests_payload: IO, *, content_type: str = "application/json", **kwargs: Any) -> JSON:
"""Submit a user request for requestor, a user request describes user ask to do operation(s) on
Purview. If any workflow's trigger matches with an operation in request, a run of the workflow
is created.
:param user_requests_payload: The payload of submitting a user request. Required.
:type user_requests_payload: IO
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: JSON object
:rtype: JSON
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# response body for status code(s): 200
response == {
"operations": [
{
"payload": {}, # The payload of each operation which user
want to submit. Required.
"type": "str", # The operation type. Required. Known values
are: "CreateTerm", "UpdateTerm", "DeleteTerm", "ImportTerms",
"UpdateAsset", and "GrantDataAccess".
"workflowRunIds": [
"str" # Optional. The list of operations user want
to submit, each operation matches one Purview API call and will do
the operation directly. Required.
]
}
],
"requestId": "str", # The user request id. Required.
"requestor": "str", # The person who submitted the user request. Required.
"status": "str", # The status. Required. Known values are: "NotStarted",
"InProgress", "Failed", "Completed", "Canceling", "CancellationFailed", and
"Canceled".
"comment": "str" # Optional. The comment when submit a user request.
}
"""
[docs] @distributed_trace
def submit(self, user_requests_payload: Union[JSON, IO], **kwargs: Any) -> JSON:
"""Submit a user request for requestor, a user request describes user ask to do operation(s) on
Purview. If any workflow's trigger matches with an operation in request, a run of the workflow
is created.
:param user_requests_payload: The payload of submitting a user request. Is either a JSON type
or a IO type. Required.
:type user_requests_payload: JSON or IO
:keyword content_type: Body Parameter content-type. Known values are: 'application/json'.
Default value is None.
:paramtype content_type: str
:return: JSON object
:rtype: JSON
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# JSON input template you can fill out and use as your body input.
user_requests_payload = {
"operations": [
{
"payload": {}, # The payload of each operation which user
want to submit. Required.
"type": "str" # The operation type. Required. Known values
are: "CreateTerm", "UpdateTerm", "DeleteTerm", "ImportTerms",
"UpdateAsset", and "GrantDataAccess".
}
],
"comment": "str" # Optional. The comment when submit a user request.
}
# response body for status code(s): 200
response == {
"operations": [
{
"payload": {}, # The payload of each operation which user
want to submit. Required.
"type": "str", # The operation type. Required. Known values
are: "CreateTerm", "UpdateTerm", "DeleteTerm", "ImportTerms",
"UpdateAsset", and "GrantDataAccess".
"workflowRunIds": [
"str" # Optional. The list of operations user want
to submit, each operation matches one Purview API call and will do
the operation directly. Required.
]
}
],
"requestId": "str", # The user request id. Required.
"requestor": "str", # The person who submitted the user request. Required.
"status": "str", # The status. Required. Known values are: "NotStarted",
"InProgress", "Failed", "Completed", "Canceling", "CancellationFailed", and
"Canceled".
"comment": "str" # Optional. The comment when submit a user request.
}
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[JSON] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(user_requests_payload, (IOBase, bytes)):
_content = user_requests_payload
else:
_json = user_requests_payload
request = build_user_requests_submit_request(
content_type=content_type,
api_version=self._config.api_version,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
response.read() # Load the body in memory and close the socket
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, cast(JSON, deserialized), {})
return cast(JSON, deserialized)
[docs]class WorkflowRunsOperations:
"""
.. warning::
**DO NOT** instantiate this class directly.
Instead, you should access the following operations through
:class:`~azure.purview.workflow.PurviewWorkflowClient`'s
:attr:`workflow_runs` attribute.
"""
def __init__(self, *args, **kwargs):
input_args = list(args)
self._client = input_args.pop(0) if input_args else kwargs.pop("client")
self._config = input_args.pop(0) if input_args else kwargs.pop("config")
self._serialize = input_args.pop(0) if input_args else kwargs.pop("serializer")
self._deserialize = input_args.pop(0) if input_args else kwargs.pop("deserializer")
[docs] @distributed_trace
def list(
self,
*,
view_mode: Optional[str] = None,
time_window: Optional[str] = None,
orderby: Optional[str] = None,
run_statuses: Optional[List[str]] = None,
workflow_ids: Optional[List[str]] = None,
requestors: Optional[List[str]] = None,
**kwargs: Any
) -> Iterable[JSON]:
"""List workflow runs.
:keyword view_mode: To filter user's workflow runs or view as admin. Default value is None.
:paramtype view_mode: str
:keyword time_window: Time window of filtering items. Known values are: "1d", "7d", "30d", and
"90d". Default value is None.
:paramtype time_window: str
:keyword orderby: The key word which used to sort the results. Known values are: "status desc",
"status asc", "requestor desc", "requestor asc", "startTime desc", "startTime asc",
"createdTime desc", and "createdTime asc". Default value is None.
:paramtype orderby: str
:keyword run_statuses: Filter workflow runs by workflow run status. Default value is None.
:paramtype run_statuses: list[str]
:keyword workflow_ids: Filter items by workflow id list. Default value is None.
:paramtype workflow_ids: list[str]
:keyword requestors: Requestors' ids to filter. Default value is None.
:paramtype requestors: list[str]
:return: An iterator like instance of JSON object
:rtype: ~azure.core.paging.ItemPaged[JSON]
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# response body for status code(s): 200
response == {
"id": "str", # The workflow run id. Required.
"requestor": "str", # The person who submitted the user request. Required.
"runPayload": {
"targetValue": "str", # The target value which need involve workflow
to update. Required.
"type": "str" # The workflow run payload type. Required. Known
values are: "CreateTerm", "UpdateTerm", "DeleteTerm", "ImportTerms",
"UpdateAsset", and "GrantDataAccess".
},
"startTime": "2020-02-20 00:00:00", # Workflow run start time. Required.
"status": "str", # The status. Required. Known values are: "NotStarted",
"InProgress", "Failed", "Completed", "Canceling", "CancellationFailed", and
"Canceled".
"workflowId": "str", # The workflow id. Required.
"cancelComment": "str", # Optional. The comment when cancel a workflow run.
"cancelTime": "2020-02-20 00:00:00", # Optional. The time of workflow run be
canceled.
"endTime": "2020-02-20 00:00:00", # Optional. The time of workflow run
completed.
"userRequestId": "str" # Optional. The user request id.
}
"""
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
maxpagesize = kwargs.pop("maxpagesize", None)
cls: ClsType[JSON] = kwargs.pop("cls", None)
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
request = build_workflow_runs_list_request(
view_mode=view_mode,
time_window=time_window,
orderby=orderby,
run_statuses=run_statuses,
workflow_ids=workflow_ids,
requestors=requestors,
maxpagesize=maxpagesize,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.endpoint", self._config.endpoint, "str", skip_quote=True
),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.endpoint", self._config.endpoint, "str", skip_quote=True
),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
return request
def extract_data(pipeline_response):
deserialized = pipeline_response.http_response.json()
list_of_elem = deserialized["value"]
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return deserialized.get("nextLink") or None, iter(list_of_elem)
def get_next(next_link=None):
request = prepare_request(next_link)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
response.read() # Load the body in memory and close the socket
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
return pipeline_response
return ItemPaged(get_next, extract_data)
[docs]class WorkflowRunOperations:
"""
.. warning::
**DO NOT** instantiate this class directly.
Instead, you should access the following operations through
:class:`~azure.purview.workflow.PurviewWorkflowClient`'s
:attr:`workflow_run` attribute.
"""
def __init__(self, *args, **kwargs):
input_args = list(args)
self._client = input_args.pop(0) if input_args else kwargs.pop("client")
self._config = input_args.pop(0) if input_args else kwargs.pop("config")
self._serialize = input_args.pop(0) if input_args else kwargs.pop("serializer")
self._deserialize = input_args.pop(0) if input_args else kwargs.pop("deserializer")
[docs] @distributed_trace
def get(self, workflow_run_id: str, **kwargs: Any) -> JSON:
"""Get a workflow run.
:param workflow_run_id: The workflow run id. Required.
:type workflow_run_id: str
:return: JSON object
:rtype: JSON
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# response body for status code(s): 200
response == {
"actionDag": {}, # The action DAG(Directed Acyclic Graph), it defines actual
flow. Required.
"detail": {
"actions": {}, # Any object. Required.
"runInput": {} # Built-in variables starts with @runInput. Its
properties are determined by trigger type at workflow run time. Required.
},
"cancelComment": "str", # Optional. The comment when cancel a workflow run.
"cancelTime": "2020-02-20 00:00:00", # Optional. The time of workflow run be
canceled.
"endTime": "2020-02-20 00:00:00", # Optional. The time of workflow run
completed.
"id": "str", # Optional. The workflow run id.
"requestor": "str", # Optional. The person who submitted the user request.
"runPayload": {
"payload": {}, # The payload of each operation which user want to
submit. Required.
"targetValue": "str", # The target value which need involve workflow
to update. Required.
"type": "str" # The workflow run payload type. Required. Known
values are: "CreateTerm", "UpdateTerm", "DeleteTerm", "ImportTerms",
"UpdateAsset", and "GrantDataAccess".
},
"startTime": "2020-02-20 00:00:00", # Optional. Workflow run start time.
"status": "str", # Optional. The status. Known values are: "NotStarted",
"InProgress", "Failed", "Completed", "Canceling", "CancellationFailed", and
"Canceled".
"userRequestId": "str", # Optional. The user request id.
"workflowId": "str" # Optional. The workflow id.
}
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[JSON] = kwargs.pop("cls", None)
request = build_workflow_run_get_request(
workflow_run_id=workflow_run_id,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
response.read() # Load the body in memory and close the socket
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, cast(JSON, deserialized), {})
return cast(JSON, deserialized)
@overload
def cancel( # pylint: disable=inconsistent-return-statements
self, workflow_run_id: str, run_cancel_reply: JSON, *, content_type: str = "application/json", **kwargs: Any
) -> None:
"""Cancel a workflow run.
:param workflow_run_id: The workflow run id. Required.
:type workflow_run_id: str
:param run_cancel_reply: Reply of canceling a workflow run. Required.
:type run_cancel_reply: JSON
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# JSON input template you can fill out and use as your body input.
run_cancel_reply = {
"comment": "str" # Optional. The comment of canceling a workflow run.
}
"""
@overload
def cancel( # pylint: disable=inconsistent-return-statements
self, workflow_run_id: str, run_cancel_reply: IO, *, content_type: str = "application/json", **kwargs: Any
) -> None:
"""Cancel a workflow run.
:param workflow_run_id: The workflow run id. Required.
:type workflow_run_id: str
:param run_cancel_reply: Reply of canceling a workflow run. Required.
:type run_cancel_reply: IO
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs] @distributed_trace
def cancel( # pylint: disable=inconsistent-return-statements
self, workflow_run_id: str, run_cancel_reply: Union[JSON, IO], **kwargs: Any
) -> None:
"""Cancel a workflow run.
:param workflow_run_id: The workflow run id. Required.
:type workflow_run_id: str
:param run_cancel_reply: Reply of canceling a workflow run. Is either a JSON type or a IO type.
Required.
:type run_cancel_reply: JSON or IO
:keyword content_type: Body Parameter content-type. Known values are: 'application/json'.
Default value is None.
:paramtype content_type: str
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# JSON input template you can fill out and use as your body input.
run_cancel_reply = {
"comment": "str" # Optional. The comment of canceling a workflow run.
}
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[None] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(run_cancel_reply, (IOBase, bytes)):
_content = run_cancel_reply
else:
_json = run_cancel_reply
request = build_workflow_run_cancel_request(
workflow_run_id=workflow_run_id,
content_type=content_type,
api_version=self._config.api_version,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
response.read() # Load the body in memory and close the socket
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if cls:
return cls(pipeline_response, None, {})
[docs]class WorkflowTasksOperations:
"""
.. warning::
**DO NOT** instantiate this class directly.
Instead, you should access the following operations through
:class:`~azure.purview.workflow.PurviewWorkflowClient`'s
:attr:`workflow_tasks` attribute.
"""
def __init__(self, *args, **kwargs):
input_args = list(args)
self._client = input_args.pop(0) if input_args else kwargs.pop("client")
self._config = input_args.pop(0) if input_args else kwargs.pop("config")
self._serialize = input_args.pop(0) if input_args else kwargs.pop("serializer")
self._deserialize = input_args.pop(0) if input_args else kwargs.pop("deserializer")
[docs] @distributed_trace
def list(
self,
*,
view_mode: Optional[str] = None,
workflow_ids: Optional[List[str]] = None,
time_window: Optional[str] = None,
orderby: Optional[str] = None,
task_types: Optional[List[str]] = None,
task_statuses: Optional[List[str]] = None,
requestors: Optional[List[str]] = None,
assignees: Optional[List[str]] = None,
workflow_name_keyword: Optional[str] = None,
**kwargs: Any
) -> Iterable[JSON]:
"""Get all workflow tasks.
:keyword view_mode: To filter user's sent, received or history workflow tasks. Default value is
None.
:paramtype view_mode: str
:keyword workflow_ids: Filter items by workflow id list. Default value is None.
:paramtype workflow_ids: list[str]
:keyword time_window: Time window of filtering items. Known values are: "1d", "7d", "30d", and
"90d". Default value is None.
:paramtype time_window: str
:keyword orderby: The key word which used to sort the results. Known values are: "status desc",
"status asc", "requestor desc", "requestor asc", "startTime desc", "startTime asc",
"createdTime desc", and "createdTime asc". Default value is None.
:paramtype orderby: str
:keyword task_types: Filter items by workflow task type. Default value is None.
:paramtype task_types: list[str]
:keyword task_statuses: Filter workflow tasks by status. Default value is None.
:paramtype task_statuses: list[str]
:keyword requestors: Requestors' ids to filter. Default value is None.
:paramtype requestors: list[str]
:keyword assignees: Assignees' ids to filter. Default value is None.
:paramtype assignees: list[str]
:keyword workflow_name_keyword: The key word which could used to filter workflow item with
related workflow. Default value is None.
:paramtype workflow_name_keyword: str
:return: An iterator like instance of JSON object
:rtype: ~azure.core.paging.ItemPaged[JSON]
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# The response is polymorphic. The following are possible polymorphic responses based
off discriminator "type":
# JSON input template for discriminator value "Approval":
workflow_task = {
"createdTime": "2020-02-20 00:00:00", # The created time. Required.
"id": "str", # The workflow task id. Required.
"lastUpdateTime": "2020-02-20 00:00:00", # The last update time. Required.
"payload": {
"targetValue": "str", # The target value of entity which user want
to involve workflow to update. Required.
"type": "str", # The task payload type. Required. Known values are:
"CreateTerm", "UpdateTerm", "DeleteTerm", "ImportTerms", "UpdateAsset", and
"GrantDataAccess".
"payload": {} # Optional. The payload of the task.
},
"requestor": "str", # The person who submitted the user request. Required.
"type": "Approval",
"workflowId": "str", # The workflow id. Required.
"workflowRunId": "str", # The workflow run id. Required.
"approvalDetail": {
"approvalType": "str", # The approval type of an approval. Required.
Known values are: "PendingOnAny" and "PendingOnAll".
"approvers": {
"str": {
"reply": "str", # The response for an approval type
of workflow task. Required. Known values are: "Approved", "Rejected",
and "Pending".
"comment": "str", # Optional. The comment of
approving or rejecting an approval type of workflow task.
"responseTime": "2020-02-20 00:00:00" # Optional.
The reply time of approver to a workflow task.
}
},
"status": "str" # The status of an approval. Required. Known values
are: "Pending", "Approved", "Rejected", and "Canceled".
},
"expiryInfo": {
"expirySettings": {
"expireAfter": {}, # The time of expiry. Required.
"notifyOnExpiration": [
"str" # Optional. Required.
]
},
"expiryTime": "2020-02-20 00:00:00", # The expiry time. Required.
"nextExpiryNotificationTime": "2020-02-20 00:00:00", # The next
expiry notification time. Required.
"lastExpiryNotificationTime": "2020-02-20 00:00:00" # Optional. The
last expiry notification time.
},
"reminderInfo": {
"nextRemindTime": "2020-02-20 00:00:00", # The next remind time.
Required.
"reminderSettings": {}, # The reminder settings. Required.
"lastRemindTime": "2020-02-20 00:00:00" # Optional. The last update
time.
},
"title": "str" # Optional. The workflow task title.
}
# JSON input template for discriminator value "SimpleTask":
workflow_task = {
"createdTime": "2020-02-20 00:00:00", # The created time. Required.
"id": "str", # The workflow task id. Required.
"lastUpdateTime": "2020-02-20 00:00:00", # The last update time. Required.
"payload": {
"targetValue": "str", # The target value of entity which user want
to involve workflow to update. Required.
"type": "str", # The task payload type. Required. Known values are:
"CreateTerm", "UpdateTerm", "DeleteTerm", "ImportTerms", "UpdateAsset", and
"GrantDataAccess".
"payload": {} # Optional. The payload of the task.
},
"requestor": "str", # The person who submitted the user request. Required.
"type": "SimpleTask",
"workflowId": "str", # The workflow id. Required.
"workflowRunId": "str", # The workflow run id. Required.
"expiryInfo": {
"expirySettings": {
"expireAfter": {}, # The time of expiry. Required.
"notifyOnExpiration": [
"str" # Optional. Required.
]
},
"expiryTime": "2020-02-20 00:00:00", # The expiry time. Required.
"nextExpiryNotificationTime": "2020-02-20 00:00:00", # The next
expiry notification time. Required.
"lastExpiryNotificationTime": "2020-02-20 00:00:00" # Optional. The
last expiry notification time.
},
"reminderInfo": {
"nextRemindTime": "2020-02-20 00:00:00", # The next remind time.
Required.
"reminderSettings": {}, # The reminder settings. Required.
"lastRemindTime": "2020-02-20 00:00:00" # Optional. The last update
time.
},
"taskDetail": {
"assignedTo": [
"str" # The users or groups were assigned the simple task.
Required.
],
"changeHistory": [
{} # Required.
],
"status": "str", # Simple task status. Required. Known values are:
"NotStarted", "InProgress", "Completed", and "Canceled".
"taskBody": "str" # The simple task body. Required.
},
"title": "str" # Optional. The workflow task title.
}
# response body for status code(s): 200
response == workflow_task
"""
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
maxpagesize = kwargs.pop("maxpagesize", None)
cls: ClsType[JSON] = kwargs.pop("cls", None)
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
request = build_workflow_tasks_list_request(
view_mode=view_mode,
workflow_ids=workflow_ids,
time_window=time_window,
maxpagesize=maxpagesize,
orderby=orderby,
task_types=task_types,
task_statuses=task_statuses,
requestors=requestors,
assignees=assignees,
workflow_name_keyword=workflow_name_keyword,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.endpoint", self._config.endpoint, "str", skip_quote=True
),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
path_format_arguments = {
"endpoint": self._serialize.url(
"self._config.endpoint", self._config.endpoint, "str", skip_quote=True
),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
return request
def extract_data(pipeline_response):
deserialized = pipeline_response.http_response.json()
list_of_elem = deserialized["value"]
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return deserialized.get("nextLink") or None, iter(list_of_elem)
def get_next(next_link=None):
request = prepare_request(next_link)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
response.read() # Load the body in memory and close the socket
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
return pipeline_response
return ItemPaged(get_next, extract_data)
[docs]class WorkflowTaskOperations:
"""
.. warning::
**DO NOT** instantiate this class directly.
Instead, you should access the following operations through
:class:`~azure.purview.workflow.PurviewWorkflowClient`'s
:attr:`workflow_task` attribute.
"""
def __init__(self, *args, **kwargs):
input_args = list(args)
self._client = input_args.pop(0) if input_args else kwargs.pop("client")
self._config = input_args.pop(0) if input_args else kwargs.pop("config")
self._serialize = input_args.pop(0) if input_args else kwargs.pop("serializer")
self._deserialize = input_args.pop(0) if input_args else kwargs.pop("deserializer")
[docs] @distributed_trace
def get(self, task_id: str, **kwargs: Any) -> JSON:
"""Get a workflow task.
:param task_id: The task id. Required.
:type task_id: str
:return: JSON object
:rtype: JSON
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# The response is polymorphic. The following are possible polymorphic responses based
off discriminator "type":
# JSON input template for discriminator value "Approval":
workflow_task = {
"createdTime": "2020-02-20 00:00:00", # The created time. Required.
"id": "str", # The workflow task id. Required.
"lastUpdateTime": "2020-02-20 00:00:00", # The last update time. Required.
"payload": {
"targetValue": "str", # The target value of entity which user want
to involve workflow to update. Required.
"type": "str", # The task payload type. Required. Known values are:
"CreateTerm", "UpdateTerm", "DeleteTerm", "ImportTerms", "UpdateAsset", and
"GrantDataAccess".
"payload": {} # Optional. The payload of the task.
},
"requestor": "str", # The person who submitted the user request. Required.
"type": "Approval",
"workflowId": "str", # The workflow id. Required.
"workflowRunId": "str", # The workflow run id. Required.
"approvalDetail": {
"approvalType": "str", # The approval type of an approval. Required.
Known values are: "PendingOnAny" and "PendingOnAll".
"approvers": {
"str": {
"reply": "str", # The response for an approval type
of workflow task. Required. Known values are: "Approved", "Rejected",
and "Pending".
"comment": "str", # Optional. The comment of
approving or rejecting an approval type of workflow task.
"responseTime": "2020-02-20 00:00:00" # Optional.
The reply time of approver to a workflow task.
}
},
"status": "str" # The status of an approval. Required. Known values
are: "Pending", "Approved", "Rejected", and "Canceled".
},
"expiryInfo": {
"expirySettings": {
"expireAfter": {}, # The time of expiry. Required.
"notifyOnExpiration": [
"str" # Optional. Required.
]
},
"expiryTime": "2020-02-20 00:00:00", # The expiry time. Required.
"nextExpiryNotificationTime": "2020-02-20 00:00:00", # The next
expiry notification time. Required.
"lastExpiryNotificationTime": "2020-02-20 00:00:00" # Optional. The
last expiry notification time.
},
"reminderInfo": {
"nextRemindTime": "2020-02-20 00:00:00", # The next remind time.
Required.
"reminderSettings": {}, # The reminder settings. Required.
"lastRemindTime": "2020-02-20 00:00:00" # Optional. The last update
time.
},
"title": "str" # Optional. The workflow task title.
}
# JSON input template for discriminator value "SimpleTask":
workflow_task = {
"createdTime": "2020-02-20 00:00:00", # The created time. Required.
"id": "str", # The workflow task id. Required.
"lastUpdateTime": "2020-02-20 00:00:00", # The last update time. Required.
"payload": {
"targetValue": "str", # The target value of entity which user want
to involve workflow to update. Required.
"type": "str", # The task payload type. Required. Known values are:
"CreateTerm", "UpdateTerm", "DeleteTerm", "ImportTerms", "UpdateAsset", and
"GrantDataAccess".
"payload": {} # Optional. The payload of the task.
},
"requestor": "str", # The person who submitted the user request. Required.
"type": "SimpleTask",
"workflowId": "str", # The workflow id. Required.
"workflowRunId": "str", # The workflow run id. Required.
"expiryInfo": {
"expirySettings": {
"expireAfter": {}, # The time of expiry. Required.
"notifyOnExpiration": [
"str" # Optional. Required.
]
},
"expiryTime": "2020-02-20 00:00:00", # The expiry time. Required.
"nextExpiryNotificationTime": "2020-02-20 00:00:00", # The next
expiry notification time. Required.
"lastExpiryNotificationTime": "2020-02-20 00:00:00" # Optional. The
last expiry notification time.
},
"reminderInfo": {
"nextRemindTime": "2020-02-20 00:00:00", # The next remind time.
Required.
"reminderSettings": {}, # The reminder settings. Required.
"lastRemindTime": "2020-02-20 00:00:00" # Optional. The last update
time.
},
"taskDetail": {
"assignedTo": [
"str" # The users or groups were assigned the simple task.
Required.
],
"changeHistory": [
{} # Required.
],
"status": "str", # Simple task status. Required. Known values are:
"NotStarted", "InProgress", "Completed", and "Canceled".
"taskBody": "str" # The simple task body. Required.
},
"title": "str" # Optional. The workflow task title.
}
# response body for status code(s): 200
response == workflow_task
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[JSON] = kwargs.pop("cls", None)
request = build_workflow_task_get_request(
task_id=task_id,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
response.read() # Load the body in memory and close the socket
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if response.content:
deserialized = response.json()
else:
deserialized = None
if cls:
return cls(pipeline_response, cast(JSON, deserialized), {})
return cast(JSON, deserialized)
@overload
def reassign( # pylint: disable=inconsistent-return-statements
self, task_id: str, reassign_command: JSON, *, content_type: str = "application/json", **kwargs: Any
) -> None:
"""Reassign a workflow task.
:param task_id: The task id. Required.
:type task_id: str
:param reassign_command: The request body of reassigning a workflow task. Required.
:type reassign_command: JSON
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# JSON input template you can fill out and use as your body input.
reassign_command = {
"reassignments": [
{
"reassignFrom": "str", # Reassign a workflow task from a
user or a group. Required.
"reassignTo": "str" # Reassign a workflow task to a user or
a group. Required.
}
]
}
"""
@overload
def reassign( # pylint: disable=inconsistent-return-statements
self, task_id: str, reassign_command: IO, *, content_type: str = "application/json", **kwargs: Any
) -> None:
"""Reassign a workflow task.
:param task_id: The task id. Required.
:type task_id: str
:param reassign_command: The request body of reassigning a workflow task. Required.
:type reassign_command: IO
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs] @distributed_trace
def reassign( # pylint: disable=inconsistent-return-statements
self, task_id: str, reassign_command: Union[JSON, IO], **kwargs: Any
) -> None:
"""Reassign a workflow task.
:param task_id: The task id. Required.
:type task_id: str
:param reassign_command: The request body of reassigning a workflow task. Is either a JSON type
or a IO type. Required.
:type reassign_command: JSON or IO
:keyword content_type: Body Parameter content-type. Known values are: 'application/json'.
Default value is None.
:paramtype content_type: str
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# JSON input template you can fill out and use as your body input.
reassign_command = {
"reassignments": [
{
"reassignFrom": "str", # Reassign a workflow task from a
user or a group. Required.
"reassignTo": "str" # Reassign a workflow task to a user or
a group. Required.
}
]
}
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[None] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(reassign_command, (IOBase, bytes)):
_content = reassign_command
else:
_json = reassign_command
request = build_workflow_task_reassign_request(
task_id=task_id,
content_type=content_type,
api_version=self._config.api_version,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
response.read() # Load the body in memory and close the socket
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if cls:
return cls(pipeline_response, None, {})
[docs]class ApprovalOperations:
"""
.. warning::
**DO NOT** instantiate this class directly.
Instead, you should access the following operations through
:class:`~azure.purview.workflow.PurviewWorkflowClient`'s
:attr:`approval` attribute.
"""
def __init__(self, *args, **kwargs):
input_args = list(args)
self._client = input_args.pop(0) if input_args else kwargs.pop("client")
self._config = input_args.pop(0) if input_args else kwargs.pop("config")
self._serialize = input_args.pop(0) if input_args else kwargs.pop("serializer")
self._deserialize = input_args.pop(0) if input_args else kwargs.pop("deserializer")
@overload
def approve( # pylint: disable=inconsistent-return-statements
self, task_id: str, approval_response_comment: JSON, *, content_type: str = "application/json", **kwargs: Any
) -> None:
"""Approve an approval.
:param task_id: The task id. Required.
:type task_id: str
:param approval_response_comment: The request body of approving an approval type of workflow
task. Required.
:type approval_response_comment: JSON
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# JSON input template you can fill out and use as your body input.
approval_response_comment = {
"comment": "str" # Optional. The comment of approving or rejecting an
approval type of workflow task.
}
"""
@overload
def approve( # pylint: disable=inconsistent-return-statements
self, task_id: str, approval_response_comment: IO, *, content_type: str = "application/json", **kwargs: Any
) -> None:
"""Approve an approval.
:param task_id: The task id. Required.
:type task_id: str
:param approval_response_comment: The request body of approving an approval type of workflow
task. Required.
:type approval_response_comment: IO
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs] @distributed_trace
def approve( # pylint: disable=inconsistent-return-statements
self, task_id: str, approval_response_comment: Union[JSON, IO], **kwargs: Any
) -> None:
"""Approve an approval.
:param task_id: The task id. Required.
:type task_id: str
:param approval_response_comment: The request body of approving an approval type of workflow
task. Is either a JSON type or a IO type. Required.
:type approval_response_comment: JSON or IO
:keyword content_type: Body Parameter content-type. Known values are: 'application/json'.
Default value is None.
:paramtype content_type: str
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# JSON input template you can fill out and use as your body input.
approval_response_comment = {
"comment": "str" # Optional. The comment of approving or rejecting an
approval type of workflow task.
}
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[None] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(approval_response_comment, (IOBase, bytes)):
_content = approval_response_comment
else:
_json = approval_response_comment
request = build_approval_approve_request(
task_id=task_id,
content_type=content_type,
api_version=self._config.api_version,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
response.read() # Load the body in memory and close the socket
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if cls:
return cls(pipeline_response, None, {})
@overload
def reject( # pylint: disable=inconsistent-return-statements
self, task_id: str, approval_response_comment: JSON, *, content_type: str = "application/json", **kwargs: Any
) -> None:
"""Reject an approval.
:param task_id: The task id. Required.
:type task_id: str
:param approval_response_comment: The request body of rejecting an approval type of workflow
task. Required.
:type approval_response_comment: JSON
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# JSON input template you can fill out and use as your body input.
approval_response_comment = {
"comment": "str" # Optional. The comment of approving or rejecting an
approval type of workflow task.
}
"""
@overload
def reject( # pylint: disable=inconsistent-return-statements
self, task_id: str, approval_response_comment: IO, *, content_type: str = "application/json", **kwargs: Any
) -> None:
"""Reject an approval.
:param task_id: The task id. Required.
:type task_id: str
:param approval_response_comment: The request body of rejecting an approval type of workflow
task. Required.
:type approval_response_comment: IO
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs] @distributed_trace
def reject( # pylint: disable=inconsistent-return-statements
self, task_id: str, approval_response_comment: Union[JSON, IO], **kwargs: Any
) -> None:
"""Reject an approval.
:param task_id: The task id. Required.
:type task_id: str
:param approval_response_comment: The request body of rejecting an approval type of workflow
task. Is either a JSON type or a IO type. Required.
:type approval_response_comment: JSON or IO
:keyword content_type: Body Parameter content-type. Known values are: 'application/json'.
Default value is None.
:paramtype content_type: str
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# JSON input template you can fill out and use as your body input.
approval_response_comment = {
"comment": "str" # Optional. The comment of approving or rejecting an
approval type of workflow task.
}
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[None] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(approval_response_comment, (IOBase, bytes)):
_content = approval_response_comment
else:
_json = approval_response_comment
request = build_approval_reject_request(
task_id=task_id,
content_type=content_type,
api_version=self._config.api_version,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
response.read() # Load the body in memory and close the socket
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if cls:
return cls(pipeline_response, None, {})
[docs]class TaskStatusOperations:
"""
.. warning::
**DO NOT** instantiate this class directly.
Instead, you should access the following operations through
:class:`~azure.purview.workflow.PurviewWorkflowClient`'s
:attr:`task_status` attribute.
"""
def __init__(self, *args, **kwargs):
input_args = list(args)
self._client = input_args.pop(0) if input_args else kwargs.pop("client")
self._config = input_args.pop(0) if input_args else kwargs.pop("config")
self._serialize = input_args.pop(0) if input_args else kwargs.pop("serializer")
self._deserialize = input_args.pop(0) if input_args else kwargs.pop("deserializer")
@overload
def update( # pylint: disable=inconsistent-return-statements
self, task_id: str, task_update_command: JSON, *, content_type: str = "application/json", **kwargs: Any
) -> None:
"""Update the status of a workflow task request.
:param task_id: The task id. Required.
:type task_id: str
:param task_update_command: Request body of updating workflow task request. Required.
:type task_update_command: JSON
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# JSON input template you can fill out and use as your body input.
task_update_command = {
"newStatus": "str", # The new status will be used to update the task.
Required. Known values are: "NotStarted", "InProgress", "Completed", and
"Canceled".
"comment": "str" # Optional. The comment when update a task.
}
"""
@overload
def update( # pylint: disable=inconsistent-return-statements
self, task_id: str, task_update_command: IO, *, content_type: str = "application/json", **kwargs: Any
) -> None:
"""Update the status of a workflow task request.
:param task_id: The task id. Required.
:type task_id: str
:param task_update_command: Request body of updating workflow task request. Required.
:type task_update_command: IO
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs] @distributed_trace
def update( # pylint: disable=inconsistent-return-statements
self, task_id: str, task_update_command: Union[JSON, IO], **kwargs: Any
) -> None:
"""Update the status of a workflow task request.
:param task_id: The task id. Required.
:type task_id: str
:param task_update_command: Request body of updating workflow task request. Is either a JSON
type or a IO type. Required.
:type task_update_command: JSON or IO
:keyword content_type: Body Parameter content-type. Known values are: 'application/json'.
Default value is None.
:paramtype content_type: str
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
Example:
.. code-block:: python
# JSON input template you can fill out and use as your body input.
task_update_command = {
"newStatus": "str", # The new status will be used to update the task.
Required. Known values are: "NotStarted", "InProgress", "Completed", and
"Canceled".
"comment": "str" # Optional. The comment when update a task.
}
"""
error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[None] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_json = None
_content = None
if isinstance(task_update_command, (IOBase, bytes)):
_content = task_update_command
else:
_json = task_update_command
request = build_task_status_update_request(
task_id=task_id,
content_type=content_type,
api_version=self._config.api_version,
json=_json,
content=_content,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True),
}
request.url = self._client.format_url(request.url, **path_format_arguments)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
response.read() # Load the body in memory and close the socket
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if cls:
return cls(pipeline_response, None, {})