Source code for azure.purview.workflow.operations._operations

# pylint: disable=too-many-lines
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
from io import IOBase
import sys
from typing import Any, Callable, Dict, IO, Iterable, List, Optional, TypeVar, Union, cast, overload
import urllib.parse

from azure.core.exceptions import (
    ClientAuthenticationError,
    HttpResponseError,
    ResourceExistsError,
    ResourceNotFoundError,
    ResourceNotModifiedError,
    map_error,
)
from azure.core.paging import ItemPaged
from azure.core.pipeline import PipelineResponse
from azure.core.rest import HttpRequest, HttpResponse
from azure.core.tracing.decorator import distributed_trace
from azure.core.utils import case_insensitive_dict

from .._serialization import Serializer

if sys.version_info >= (3, 9):
    from collections.abc import MutableMapping
else:
    from typing import MutableMapping  # type: ignore  # pylint: disable=ungrouped-imports
JSON = MutableMapping[str, Any]  # pylint: disable=unsubscriptable-object
T = TypeVar("T")
ClsType = Optional[Callable[[PipelineResponse[HttpRequest, HttpResponse], T, Dict[str, Any]], Any]]

_SERIALIZER = Serializer()
_SERIALIZER.client_side_validation = False


def build_workflows_list_request(**kwargs: Any) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = "/workflows"

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)


def build_workflow_get_request(workflow_id: str, **kwargs: Any) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = "/workflows/{workflowId}"
    path_format_arguments = {
        "workflowId": _SERIALIZER.url("workflow_id", workflow_id, "str"),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)


def build_workflow_create_or_replace_request(workflow_id: str, **kwargs: Any) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = "/workflows/{workflowId}"
    path_format_arguments = {
        "workflowId": _SERIALIZER.url("workflow_id", workflow_id, "str"),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    if content_type is not None:
        _headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="PUT", url=_url, params=_params, headers=_headers, **kwargs)


def build_workflow_delete_request(workflow_id: str, **kwargs: Any) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = "/workflows/{workflowId}"
    path_format_arguments = {
        "workflowId": _SERIALIZER.url("workflow_id", workflow_id, "str"),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="DELETE", url=_url, params=_params, headers=_headers, **kwargs)


def build_workflow_validate_request(workflow_id: str, **kwargs: Any) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = "/workflows/{workflowId}/validate"
    path_format_arguments = {
        "workflowId": _SERIALIZER.url("workflow_id", workflow_id, "str"),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    if content_type is not None:
        _headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)


def build_user_requests_submit_request(**kwargs: Any) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = "/userrequests"

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    if content_type is not None:
        _headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)


def build_workflow_runs_list_request(
    *,
    view_mode: Optional[str] = None,
    time_window: Optional[str] = None,
    orderby: Optional[str] = None,
    run_statuses: Optional[List[str]] = None,
    workflow_ids: Optional[List[str]] = None,
    requestors: Optional[List[str]] = None,
    maxpagesize: Optional[int] = None,
    **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = "/workflowruns"

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
    if view_mode is not None:
        _params["viewMode"] = _SERIALIZER.query("view_mode", view_mode, "str")
    if time_window is not None:
        _params["timeWindow"] = _SERIALIZER.query("time_window", time_window, "str")
    if orderby is not None:
        _params["orderby"] = _SERIALIZER.query("orderby", orderby, "str")
    if run_statuses is not None:
        _params["runStatuses"] = _SERIALIZER.query("run_statuses", run_statuses, "[str]", div=",")
    if workflow_ids is not None:
        _params["workflowIds"] = _SERIALIZER.query("workflow_ids", workflow_ids, "[str]", div=",")
    if requestors is not None:
        _params["requestors"] = _SERIALIZER.query("requestors", requestors, "[str]", div=",")
    if maxpagesize is not None:
        _params["maxpagesize"] = _SERIALIZER.query("maxpagesize", maxpagesize, "int")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)


def build_workflow_run_get_request(workflow_run_id: str, **kwargs: Any) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = "/workflowruns/{workflowRunId}"
    path_format_arguments = {
        "workflowRunId": _SERIALIZER.url("workflow_run_id", workflow_run_id, "str"),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)


def build_workflow_run_cancel_request(workflow_run_id: str, **kwargs: Any) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = "/workflowruns/{workflowRunId}/cancel"
    path_format_arguments = {
        "workflowRunId": _SERIALIZER.url("workflow_run_id", workflow_run_id, "str"),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    if content_type is not None:
        _headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)


def build_workflow_tasks_list_request(
    *,
    view_mode: Optional[str] = None,
    workflow_ids: Optional[List[str]] = None,
    time_window: Optional[str] = None,
    maxpagesize: Optional[int] = None,
    orderby: Optional[str] = None,
    task_types: Optional[List[str]] = None,
    task_statuses: Optional[List[str]] = None,
    requestors: Optional[List[str]] = None,
    assignees: Optional[List[str]] = None,
    workflow_name_keyword: Optional[str] = None,
    **kwargs: Any
) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = "/workflowtasks"

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
    if view_mode is not None:
        _params["viewMode"] = _SERIALIZER.query("view_mode", view_mode, "str")
    if workflow_ids is not None:
        _params["workflowIds"] = _SERIALIZER.query("workflow_ids", workflow_ids, "[str]", div=",")
    if time_window is not None:
        _params["timeWindow"] = _SERIALIZER.query("time_window", time_window, "str")
    if maxpagesize is not None:
        _params["maxpagesize"] = _SERIALIZER.query("maxpagesize", maxpagesize, "int")
    if orderby is not None:
        _params["orderby"] = _SERIALIZER.query("orderby", orderby, "str")
    if task_types is not None:
        _params["taskTypes"] = _SERIALIZER.query("task_types", task_types, "[str]", div=",")
    if task_statuses is not None:
        _params["taskStatuses"] = _SERIALIZER.query("task_statuses", task_statuses, "[str]", div=",")
    if requestors is not None:
        _params["requestors"] = _SERIALIZER.query("requestors", requestors, "[str]", div=",")
    if assignees is not None:
        _params["assignees"] = _SERIALIZER.query("assignees", assignees, "[str]", div=",")
    if workflow_name_keyword is not None:
        _params["workflowNameKeyword"] = _SERIALIZER.query("workflow_name_keyword", workflow_name_keyword, "str")

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)


def build_workflow_task_get_request(task_id: str, **kwargs: Any) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})

    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = "/workflowtasks/{taskId}"
    path_format_arguments = {
        "taskId": _SERIALIZER.url("task_id", task_id, "str"),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct headers
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="GET", url=_url, headers=_headers, **kwargs)


def build_workflow_task_reassign_request(task_id: str, **kwargs: Any) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = "/workflowtasks/{taskId}/reassign"
    path_format_arguments = {
        "taskId": _SERIALIZER.url("task_id", task_id, "str"),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    if content_type is not None:
        _headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)


def build_approval_approve_request(task_id: str, **kwargs: Any) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = "/workflowtasks/{taskId}/approve-approval"
    path_format_arguments = {
        "taskId": _SERIALIZER.url("task_id", task_id, "str"),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    if content_type is not None:
        _headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)


def build_approval_reject_request(task_id: str, **kwargs: Any) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = "/workflowtasks/{taskId}/reject-approval"
    path_format_arguments = {
        "taskId": _SERIALIZER.url("task_id", task_id, "str"),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    if content_type is not None:
        _headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)


def build_task_status_update_request(task_id: str, **kwargs: Any) -> HttpRequest:
    _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
    _params = case_insensitive_dict(kwargs.pop("params", {}) or {})

    content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
    api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2023-10-01-preview"))
    accept = _headers.pop("Accept", "application/json")

    # Construct URL
    _url = "/workflowtasks/{taskId}/change-task-status"
    path_format_arguments = {
        "taskId": _SERIALIZER.url("task_id", task_id, "str"),
    }

    _url: str = _url.format(**path_format_arguments)  # type: ignore

    # Construct parameters
    _params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")

    # Construct headers
    if content_type is not None:
        _headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
    _headers["Accept"] = _SERIALIZER.header("accept", accept, "str")

    return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)


[docs]class WorkflowsOperations: """ .. warning:: **DO NOT** instantiate this class directly. Instead, you should access the following operations through :class:`~azure.purview.workflow.PurviewWorkflowClient`'s :attr:`workflows` attribute. """ def __init__(self, *args, **kwargs): input_args = list(args) self._client = input_args.pop(0) if input_args else kwargs.pop("client") self._config = input_args.pop(0) if input_args else kwargs.pop("config") self._serialize = input_args.pop(0) if input_args else kwargs.pop("serializer") self._deserialize = input_args.pop(0) if input_args else kwargs.pop("deserializer")
[docs] @distributed_trace def list(self, **kwargs: Any) -> Iterable[JSON]: """List all workflows. :return: An iterator like instance of JSON object :rtype: ~azure.core.paging.ItemPaged[JSON] :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "description": "str", # Description of a workflow. Required. "id": "str", # The id of workflow. Required. "isEnabled": bool, # Whether the workflow is enabled or not. Required. "name": "str", # The name of a workflow. Required. "triggers": [ { "type": "str", # Required. Known values are: "when_term_creation_is_requested", "when_term_deletion_is_requested", "when_term_update_is_requested", "when_terms_import_is_requested", "when_data_access_grant_is_requested", and "when_asset_update_is_requested". "underCollection": "str", # Optional. The collection name. "underGlossary": "str", # Optional. The glossary guid. "underGlossaryHierarchy": "str" # Optional. Glossary term hierarchy path. } ], "createdBy": "str", # Optional. The person who created the workflow. "createdTime": "2020-02-20 00:00:00", # Optional. The created time of workflow. "lastUpdateTime": "2020-02-20 00:00:00", # Optional. The last update time. "updatedBy": "str" # Optional. The person who updated the workflow. } """ _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[JSON] = kwargs.pop("cls", None) error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) def prepare_request(next_link=None): if not next_link: request = build_workflows_list_request( api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) else: # make call to next link with the client's api-version _parsed_next_link = urllib.parse.urlparse(next_link) _next_request_params = case_insensitive_dict( { key: [urllib.parse.quote(v) for v in value] for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items() } ) _next_request_params["api-version"] = self._config.api_version request = HttpRequest( "GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params ) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) return request def extract_data(pipeline_response): deserialized = pipeline_response.http_response.json() list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) # type: ignore return deserialized.get("nextLink") or None, iter(list_of_elem) def get_next(next_link=None): request = prepare_request(next_link) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: response.read() # Load the body in memory and close the socket map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return ItemPaged(get_next, extract_data)
[docs]class WorkflowOperations: """ .. warning:: **DO NOT** instantiate this class directly. Instead, you should access the following operations through :class:`~azure.purview.workflow.PurviewWorkflowClient`'s :attr:`workflow` attribute. """ def __init__(self, *args, **kwargs): input_args = list(args) self._client = input_args.pop(0) if input_args else kwargs.pop("client") self._config = input_args.pop(0) if input_args else kwargs.pop("config") self._serialize = input_args.pop(0) if input_args else kwargs.pop("serializer") self._deserialize = input_args.pop(0) if input_args else kwargs.pop("deserializer")
[docs] @distributed_trace def get(self, workflow_id: str, **kwargs: Any) -> JSON: """Get a specific workflow. :param workflow_id: The workflow id. Required. :type workflow_id: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "actionDag": {}, # The action DAG(Directed Acyclic Graph), it defines steps to be executed in a workflow run and their order. Required. "description": "str", # Description of a workflow. Required. "id": "str", # The id of workflow. Required. "isEnabled": bool, # Whether the workflow is enabled or not. Required. "name": "str", # The name of a workflow. Required. "triggers": [ { "type": "str", # Required. Known values are: "when_term_creation_is_requested", "when_term_deletion_is_requested", "when_term_update_is_requested", "when_terms_import_is_requested", "when_data_access_grant_is_requested", and "when_asset_update_is_requested". "underCollection": "str", # Optional. The collection name. "underGlossary": "str", # Optional. The glossary guid. "underGlossaryHierarchy": "str" # Optional. Glossary term hierarchy path. } ], "createdBy": "str", # Optional. The person who created the workflow. "createdTime": "2020-02-20 00:00:00", # Optional. The created time of workflow. "lastUpdateTime": "2020-02-20 00:00:00", # Optional. The last update time. "updatedBy": "str" # Optional. The person who updated the workflow. } """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[JSON] = kwargs.pop("cls", None) request = build_workflow_get_request( workflow_id=workflow_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: response.read() # Load the body in memory and close the socket map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, cast(JSON, deserialized), {}) return cast(JSON, deserialized)
@overload def create_or_replace( self, workflow_id: str, workflow_create_or_update_command: JSON, *, content_type: str = "application/json", **kwargs: Any ) -> JSON: """Create or replace a workflow. :param workflow_id: The workflow id. Required. :type workflow_id: str :param workflow_create_or_update_command: Create or update workflow payload. Required. :type workflow_create_or_update_command: JSON :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # JSON input template you can fill out and use as your body input. workflow_create_or_update_command = { "description": "str", # Description of a workflow. Required. "isEnabled": bool, # Whether the workflow enabled or not. Required. "name": "str", # The workflow name. Required. "triggers": [ { "type": "str", # Required. Known values are: "when_term_creation_is_requested", "when_term_deletion_is_requested", "when_term_update_is_requested", "when_terms_import_is_requested", "when_data_access_grant_is_requested", and "when_asset_update_is_requested". "underCollection": "str", # Optional. The collection name. "underGlossary": "str", # Optional. The glossary guid. "underGlossaryHierarchy": "str" # Optional. Glossary term hierarchy path. } ], "actionDag": {} # Optional. The action DAG(Directed Acyclic Graph), it defines actual flow. } # response body for status code(s): 200 response == { "actionDag": {}, # The action DAG(Directed Acyclic Graph), it defines steps to be executed in a workflow run and their order. Required. "description": "str", # Description of a workflow. Required. "id": "str", # The id of workflow. Required. "isEnabled": bool, # Whether the workflow is enabled or not. Required. "name": "str", # The name of a workflow. Required. "triggers": [ { "type": "str", # Required. Known values are: "when_term_creation_is_requested", "when_term_deletion_is_requested", "when_term_update_is_requested", "when_terms_import_is_requested", "when_data_access_grant_is_requested", and "when_asset_update_is_requested". "underCollection": "str", # Optional. The collection name. "underGlossary": "str", # Optional. The glossary guid. "underGlossaryHierarchy": "str" # Optional. Glossary term hierarchy path. } ], "createdBy": "str", # Optional. The person who created the workflow. "createdTime": "2020-02-20 00:00:00", # Optional. The created time of workflow. "lastUpdateTime": "2020-02-20 00:00:00", # Optional. The last update time. "updatedBy": "str" # Optional. The person who updated the workflow. } """ @overload def create_or_replace( self, workflow_id: str, workflow_create_or_update_command: IO, *, content_type: str = "application/json", **kwargs: Any ) -> JSON: """Create or replace a workflow. :param workflow_id: The workflow id. Required. :type workflow_id: str :param workflow_create_or_update_command: Create or update workflow payload. Required. :type workflow_create_or_update_command: IO :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "actionDag": {}, # The action DAG(Directed Acyclic Graph), it defines steps to be executed in a workflow run and their order. Required. "description": "str", # Description of a workflow. Required. "id": "str", # The id of workflow. Required. "isEnabled": bool, # Whether the workflow is enabled or not. Required. "name": "str", # The name of a workflow. Required. "triggers": [ { "type": "str", # Required. Known values are: "when_term_creation_is_requested", "when_term_deletion_is_requested", "when_term_update_is_requested", "when_terms_import_is_requested", "when_data_access_grant_is_requested", and "when_asset_update_is_requested". "underCollection": "str", # Optional. The collection name. "underGlossary": "str", # Optional. The glossary guid. "underGlossaryHierarchy": "str" # Optional. Glossary term hierarchy path. } ], "createdBy": "str", # Optional. The person who created the workflow. "createdTime": "2020-02-20 00:00:00", # Optional. The created time of workflow. "lastUpdateTime": "2020-02-20 00:00:00", # Optional. The last update time. "updatedBy": "str" # Optional. The person who updated the workflow. } """
[docs] @distributed_trace def create_or_replace( self, workflow_id: str, workflow_create_or_update_command: Union[JSON, IO], **kwargs: Any ) -> JSON: """Create or replace a workflow. :param workflow_id: The workflow id. Required. :type workflow_id: str :param workflow_create_or_update_command: Create or update workflow payload. Is either a JSON type or a IO type. Required. :type workflow_create_or_update_command: JSON or IO :keyword content_type: Body Parameter content-type. Known values are: 'application/json'. Default value is None. :paramtype content_type: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # JSON input template you can fill out and use as your body input. workflow_create_or_update_command = { "description": "str", # Description of a workflow. Required. "isEnabled": bool, # Whether the workflow enabled or not. Required. "name": "str", # The workflow name. Required. "triggers": [ { "type": "str", # Required. Known values are: "when_term_creation_is_requested", "when_term_deletion_is_requested", "when_term_update_is_requested", "when_terms_import_is_requested", "when_data_access_grant_is_requested", and "when_asset_update_is_requested". "underCollection": "str", # Optional. The collection name. "underGlossary": "str", # Optional. The glossary guid. "underGlossaryHierarchy": "str" # Optional. Glossary term hierarchy path. } ], "actionDag": {} # Optional. The action DAG(Directed Acyclic Graph), it defines actual flow. } # response body for status code(s): 200 response == { "actionDag": {}, # The action DAG(Directed Acyclic Graph), it defines steps to be executed in a workflow run and their order. Required. "description": "str", # Description of a workflow. Required. "id": "str", # The id of workflow. Required. "isEnabled": bool, # Whether the workflow is enabled or not. Required. "name": "str", # The name of a workflow. Required. "triggers": [ { "type": "str", # Required. Known values are: "when_term_creation_is_requested", "when_term_deletion_is_requested", "when_term_update_is_requested", "when_terms_import_is_requested", "when_data_access_grant_is_requested", and "when_asset_update_is_requested". "underCollection": "str", # Optional. The collection name. "underGlossary": "str", # Optional. The glossary guid. "underGlossaryHierarchy": "str" # Optional. Glossary term hierarchy path. } ], "createdBy": "str", # Optional. The person who created the workflow. "createdTime": "2020-02-20 00:00:00", # Optional. The created time of workflow. "lastUpdateTime": "2020-02-20 00:00:00", # Optional. The last update time. "updatedBy": "str" # Optional. The person who updated the workflow. } """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = kwargs.pop("params", {}) or {} content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[JSON] = kwargs.pop("cls", None) content_type = content_type or "application/json" _json = None _content = None if isinstance(workflow_create_or_update_command, (IOBase, bytes)): _content = workflow_create_or_update_command else: _json = workflow_create_or_update_command request = build_workflow_create_or_replace_request( workflow_id=workflow_id, content_type=content_type, api_version=self._config.api_version, json=_json, content=_content, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: response.read() # Load the body in memory and close the socket map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, cast(JSON, deserialized), {}) return cast(JSON, deserialized)
[docs] @distributed_trace def delete(self, workflow_id: str, **kwargs: Any) -> None: # pylint: disable=inconsistent-return-statements """Delete a workflow. :param workflow_id: The workflow id. Required. :type workflow_id: str :return: None :rtype: None :raises ~azure.core.exceptions.HttpResponseError: """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[None] = kwargs.pop("cls", None) request = build_workflow_delete_request( workflow_id=workflow_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [204]: if _stream: response.read() # Load the body in memory and close the socket map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if cls: return cls(pipeline_response, None, {})
@overload def validate( self, workflow_id: str, workflow_validate_query: JSON, *, content_type: str = "application/json", **kwargs: Any ) -> JSON: """Validate a workflow. :param workflow_id: The workflow id. Required. :type workflow_id: str :param workflow_validate_query: Check workflow payload. Required. :type workflow_validate_query: JSON :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # JSON input template you can fill out and use as your body input. workflow_validate_query = { "description": "str", # Description of a workflow. Required. "isEnabled": bool, # Whether the workflow enabled or not. Required. "name": "str", # The workflow name. Required. "triggers": [ { "type": "str", # Required. Known values are: "when_term_creation_is_requested", "when_term_deletion_is_requested", "when_term_update_is_requested", "when_terms_import_is_requested", "when_data_access_grant_is_requested", and "when_asset_update_is_requested". "underCollection": "str", # Optional. The collection name. "underGlossary": "str", # Optional. The glossary guid. "underGlossaryHierarchy": "str" # Optional. Glossary term hierarchy path. } ], "actionDag": {} # Optional. The action DAG(Directed Acyclic Graph), it defines actual flow. } # response body for status code(s): 200 response == { "value": [ { "location": { "type": "str", # The validation violation location type. Required. Known values are: "workflow", "action", and "actionParameter". "actionName": "str", # Optional. The name of the action where the violation happens. "parameterKey": "str" # Optional. The key of the action parameter where the violation happens. }, "message": "str", # The detail about how the validation rule is violated. Required. "severity": "str" # The severity of the validation rule. Required. Known values are: "error" and "warning". } ] } """ @overload def validate( self, workflow_id: str, workflow_validate_query: IO, *, content_type: str = "application/json", **kwargs: Any ) -> JSON: """Validate a workflow. :param workflow_id: The workflow id. Required. :type workflow_id: str :param workflow_validate_query: Check workflow payload. Required. :type workflow_validate_query: IO :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "value": [ { "location": { "type": "str", # The validation violation location type. Required. Known values are: "workflow", "action", and "actionParameter". "actionName": "str", # Optional. The name of the action where the violation happens. "parameterKey": "str" # Optional. The key of the action parameter where the violation happens. }, "message": "str", # The detail about how the validation rule is violated. Required. "severity": "str" # The severity of the validation rule. Required. Known values are: "error" and "warning". } ] } """
[docs] @distributed_trace def validate(self, workflow_id: str, workflow_validate_query: Union[JSON, IO], **kwargs: Any) -> JSON: """Validate a workflow. :param workflow_id: The workflow id. Required. :type workflow_id: str :param workflow_validate_query: Check workflow payload. Is either a JSON type or a IO type. Required. :type workflow_validate_query: JSON or IO :keyword content_type: Body Parameter content-type. Known values are: 'application/json'. Default value is None. :paramtype content_type: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # JSON input template you can fill out and use as your body input. workflow_validate_query = { "description": "str", # Description of a workflow. Required. "isEnabled": bool, # Whether the workflow enabled or not. Required. "name": "str", # The workflow name. Required. "triggers": [ { "type": "str", # Required. Known values are: "when_term_creation_is_requested", "when_term_deletion_is_requested", "when_term_update_is_requested", "when_terms_import_is_requested", "when_data_access_grant_is_requested", and "when_asset_update_is_requested". "underCollection": "str", # Optional. The collection name. "underGlossary": "str", # Optional. The glossary guid. "underGlossaryHierarchy": "str" # Optional. Glossary term hierarchy path. } ], "actionDag": {} # Optional. The action DAG(Directed Acyclic Graph), it defines actual flow. } # response body for status code(s): 200 response == { "value": [ { "location": { "type": "str", # The validation violation location type. Required. Known values are: "workflow", "action", and "actionParameter". "actionName": "str", # Optional. The name of the action where the violation happens. "parameterKey": "str" # Optional. The key of the action parameter where the violation happens. }, "message": "str", # The detail about how the validation rule is violated. Required. "severity": "str" # The severity of the validation rule. Required. Known values are: "error" and "warning". } ] } """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = kwargs.pop("params", {}) or {} content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[JSON] = kwargs.pop("cls", None) content_type = content_type or "application/json" _json = None _content = None if isinstance(workflow_validate_query, (IOBase, bytes)): _content = workflow_validate_query else: _json = workflow_validate_query request = build_workflow_validate_request( workflow_id=workflow_id, content_type=content_type, api_version=self._config.api_version, json=_json, content=_content, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: response.read() # Load the body in memory and close the socket map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, cast(JSON, deserialized), {}) return cast(JSON, deserialized)
[docs]class UserRequestsOperations: """ .. warning:: **DO NOT** instantiate this class directly. Instead, you should access the following operations through :class:`~azure.purview.workflow.PurviewWorkflowClient`'s :attr:`user_requests` attribute. """ def __init__(self, *args, **kwargs): input_args = list(args) self._client = input_args.pop(0) if input_args else kwargs.pop("client") self._config = input_args.pop(0) if input_args else kwargs.pop("config") self._serialize = input_args.pop(0) if input_args else kwargs.pop("serializer") self._deserialize = input_args.pop(0) if input_args else kwargs.pop("deserializer") @overload def submit(self, user_requests_payload: JSON, *, content_type: str = "application/json", **kwargs: Any) -> JSON: """Submit a user request for requestor, a user request describes user ask to do operation(s) on Purview. If any workflow's trigger matches with an operation in request, a run of the workflow is created. :param user_requests_payload: The payload of submitting a user request. Required. :type user_requests_payload: JSON :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # JSON input template you can fill out and use as your body input. user_requests_payload = { "operations": [ { "payload": {}, # The payload of each operation which user want to submit. Required. "type": "str" # The operation type. Required. Known values are: "CreateTerm", "UpdateTerm", "DeleteTerm", "ImportTerms", "UpdateAsset", and "GrantDataAccess". } ], "comment": "str" # Optional. The comment when submit a user request. } # response body for status code(s): 200 response == { "operations": [ { "payload": {}, # The payload of each operation which user want to submit. Required. "type": "str", # The operation type. Required. Known values are: "CreateTerm", "UpdateTerm", "DeleteTerm", "ImportTerms", "UpdateAsset", and "GrantDataAccess". "workflowRunIds": [ "str" # Optional. The list of operations user want to submit, each operation matches one Purview API call and will do the operation directly. Required. ] } ], "requestId": "str", # The user request id. Required. "requestor": "str", # The person who submitted the user request. Required. "status": "str", # The status. Required. Known values are: "NotStarted", "InProgress", "Failed", "Completed", "Canceling", "CancellationFailed", and "Canceled". "comment": "str" # Optional. The comment when submit a user request. } """ @overload def submit(self, user_requests_payload: IO, *, content_type: str = "application/json", **kwargs: Any) -> JSON: """Submit a user request for requestor, a user request describes user ask to do operation(s) on Purview. If any workflow's trigger matches with an operation in request, a run of the workflow is created. :param user_requests_payload: The payload of submitting a user request. Required. :type user_requests_payload: IO :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "operations": [ { "payload": {}, # The payload of each operation which user want to submit. Required. "type": "str", # The operation type. Required. Known values are: "CreateTerm", "UpdateTerm", "DeleteTerm", "ImportTerms", "UpdateAsset", and "GrantDataAccess". "workflowRunIds": [ "str" # Optional. The list of operations user want to submit, each operation matches one Purview API call and will do the operation directly. Required. ] } ], "requestId": "str", # The user request id. Required. "requestor": "str", # The person who submitted the user request. Required. "status": "str", # The status. Required. Known values are: "NotStarted", "InProgress", "Failed", "Completed", "Canceling", "CancellationFailed", and "Canceled". "comment": "str" # Optional. The comment when submit a user request. } """
[docs] @distributed_trace def submit(self, user_requests_payload: Union[JSON, IO], **kwargs: Any) -> JSON: """Submit a user request for requestor, a user request describes user ask to do operation(s) on Purview. If any workflow's trigger matches with an operation in request, a run of the workflow is created. :param user_requests_payload: The payload of submitting a user request. Is either a JSON type or a IO type. Required. :type user_requests_payload: JSON or IO :keyword content_type: Body Parameter content-type. Known values are: 'application/json'. Default value is None. :paramtype content_type: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # JSON input template you can fill out and use as your body input. user_requests_payload = { "operations": [ { "payload": {}, # The payload of each operation which user want to submit. Required. "type": "str" # The operation type. Required. Known values are: "CreateTerm", "UpdateTerm", "DeleteTerm", "ImportTerms", "UpdateAsset", and "GrantDataAccess". } ], "comment": "str" # Optional. The comment when submit a user request. } # response body for status code(s): 200 response == { "operations": [ { "payload": {}, # The payload of each operation which user want to submit. Required. "type": "str", # The operation type. Required. Known values are: "CreateTerm", "UpdateTerm", "DeleteTerm", "ImportTerms", "UpdateAsset", and "GrantDataAccess". "workflowRunIds": [ "str" # Optional. The list of operations user want to submit, each operation matches one Purview API call and will do the operation directly. Required. ] } ], "requestId": "str", # The user request id. Required. "requestor": "str", # The person who submitted the user request. Required. "status": "str", # The status. Required. Known values are: "NotStarted", "InProgress", "Failed", "Completed", "Canceling", "CancellationFailed", and "Canceled". "comment": "str" # Optional. The comment when submit a user request. } """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = kwargs.pop("params", {}) or {} content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[JSON] = kwargs.pop("cls", None) content_type = content_type or "application/json" _json = None _content = None if isinstance(user_requests_payload, (IOBase, bytes)): _content = user_requests_payload else: _json = user_requests_payload request = build_user_requests_submit_request( content_type=content_type, api_version=self._config.api_version, json=_json, content=_content, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: response.read() # Load the body in memory and close the socket map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, cast(JSON, deserialized), {}) return cast(JSON, deserialized)
[docs]class WorkflowRunsOperations: """ .. warning:: **DO NOT** instantiate this class directly. Instead, you should access the following operations through :class:`~azure.purview.workflow.PurviewWorkflowClient`'s :attr:`workflow_runs` attribute. """ def __init__(self, *args, **kwargs): input_args = list(args) self._client = input_args.pop(0) if input_args else kwargs.pop("client") self._config = input_args.pop(0) if input_args else kwargs.pop("config") self._serialize = input_args.pop(0) if input_args else kwargs.pop("serializer") self._deserialize = input_args.pop(0) if input_args else kwargs.pop("deserializer")
[docs] @distributed_trace def list( self, *, view_mode: Optional[str] = None, time_window: Optional[str] = None, orderby: Optional[str] = None, run_statuses: Optional[List[str]] = None, workflow_ids: Optional[List[str]] = None, requestors: Optional[List[str]] = None, **kwargs: Any ) -> Iterable[JSON]: """List workflow runs. :keyword view_mode: To filter user's workflow runs or view as admin. Default value is None. :paramtype view_mode: str :keyword time_window: Time window of filtering items. Known values are: "1d", "7d", "30d", and "90d". Default value is None. :paramtype time_window: str :keyword orderby: The key word which used to sort the results. Known values are: "status desc", "status asc", "requestor desc", "requestor asc", "startTime desc", "startTime asc", "createdTime desc", and "createdTime asc". Default value is None. :paramtype orderby: str :keyword run_statuses: Filter workflow runs by workflow run status. Default value is None. :paramtype run_statuses: list[str] :keyword workflow_ids: Filter items by workflow id list. Default value is None. :paramtype workflow_ids: list[str] :keyword requestors: Requestors' ids to filter. Default value is None. :paramtype requestors: list[str] :return: An iterator like instance of JSON object :rtype: ~azure.core.paging.ItemPaged[JSON] :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "id": "str", # The workflow run id. Required. "requestor": "str", # The person who submitted the user request. Required. "runPayload": { "targetValue": "str", # The target value which need involve workflow to update. Required. "type": "str" # The workflow run payload type. Required. Known values are: "CreateTerm", "UpdateTerm", "DeleteTerm", "ImportTerms", "UpdateAsset", and "GrantDataAccess". }, "startTime": "2020-02-20 00:00:00", # Workflow run start time. Required. "status": "str", # The status. Required. Known values are: "NotStarted", "InProgress", "Failed", "Completed", "Canceling", "CancellationFailed", and "Canceled". "workflowId": "str", # The workflow id. Required. "cancelComment": "str", # Optional. The comment when cancel a workflow run. "cancelTime": "2020-02-20 00:00:00", # Optional. The time of workflow run be canceled. "endTime": "2020-02-20 00:00:00", # Optional. The time of workflow run completed. "userRequestId": "str" # Optional. The user request id. } """ _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} maxpagesize = kwargs.pop("maxpagesize", None) cls: ClsType[JSON] = kwargs.pop("cls", None) error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) def prepare_request(next_link=None): if not next_link: request = build_workflow_runs_list_request( view_mode=view_mode, time_window=time_window, orderby=orderby, run_statuses=run_statuses, workflow_ids=workflow_ids, requestors=requestors, maxpagesize=maxpagesize, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) else: # make call to next link with the client's api-version _parsed_next_link = urllib.parse.urlparse(next_link) _next_request_params = case_insensitive_dict( { key: [urllib.parse.quote(v) for v in value] for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items() } ) _next_request_params["api-version"] = self._config.api_version request = HttpRequest( "GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params ) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) return request def extract_data(pipeline_response): deserialized = pipeline_response.http_response.json() list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) # type: ignore return deserialized.get("nextLink") or None, iter(list_of_elem) def get_next(next_link=None): request = prepare_request(next_link) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: response.read() # Load the body in memory and close the socket map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return ItemPaged(get_next, extract_data)
[docs]class WorkflowRunOperations: """ .. warning:: **DO NOT** instantiate this class directly. Instead, you should access the following operations through :class:`~azure.purview.workflow.PurviewWorkflowClient`'s :attr:`workflow_run` attribute. """ def __init__(self, *args, **kwargs): input_args = list(args) self._client = input_args.pop(0) if input_args else kwargs.pop("client") self._config = input_args.pop(0) if input_args else kwargs.pop("config") self._serialize = input_args.pop(0) if input_args else kwargs.pop("serializer") self._deserialize = input_args.pop(0) if input_args else kwargs.pop("deserializer")
[docs] @distributed_trace def get(self, workflow_run_id: str, **kwargs: Any) -> JSON: """Get a workflow run. :param workflow_run_id: The workflow run id. Required. :type workflow_run_id: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # response body for status code(s): 200 response == { "actionDag": {}, # The action DAG(Directed Acyclic Graph), it defines actual flow. Required. "detail": { "actions": {}, # Any object. Required. "runInput": {} # Built-in variables starts with @runInput. Its properties are determined by trigger type at workflow run time. Required. }, "cancelComment": "str", # Optional. The comment when cancel a workflow run. "cancelTime": "2020-02-20 00:00:00", # Optional. The time of workflow run be canceled. "endTime": "2020-02-20 00:00:00", # Optional. The time of workflow run completed. "id": "str", # Optional. The workflow run id. "requestor": "str", # Optional. The person who submitted the user request. "runPayload": { "payload": {}, # The payload of each operation which user want to submit. Required. "targetValue": "str", # The target value which need involve workflow to update. Required. "type": "str" # The workflow run payload type. Required. Known values are: "CreateTerm", "UpdateTerm", "DeleteTerm", "ImportTerms", "UpdateAsset", and "GrantDataAccess". }, "startTime": "2020-02-20 00:00:00", # Optional. Workflow run start time. "status": "str", # Optional. The status. Known values are: "NotStarted", "InProgress", "Failed", "Completed", "Canceling", "CancellationFailed", and "Canceled". "userRequestId": "str", # Optional. The user request id. "workflowId": "str" # Optional. The workflow id. } """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[JSON] = kwargs.pop("cls", None) request = build_workflow_run_get_request( workflow_run_id=workflow_run_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: response.read() # Load the body in memory and close the socket map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, cast(JSON, deserialized), {}) return cast(JSON, deserialized)
@overload def cancel( # pylint: disable=inconsistent-return-statements self, workflow_run_id: str, run_cancel_reply: JSON, *, content_type: str = "application/json", **kwargs: Any ) -> None: """Cancel a workflow run. :param workflow_run_id: The workflow run id. Required. :type workflow_run_id: str :param run_cancel_reply: Reply of canceling a workflow run. Required. :type run_cancel_reply: JSON :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: None :rtype: None :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # JSON input template you can fill out and use as your body input. run_cancel_reply = { "comment": "str" # Optional. The comment of canceling a workflow run. } """ @overload def cancel( # pylint: disable=inconsistent-return-statements self, workflow_run_id: str, run_cancel_reply: IO, *, content_type: str = "application/json", **kwargs: Any ) -> None: """Cancel a workflow run. :param workflow_run_id: The workflow run id. Required. :type workflow_run_id: str :param run_cancel_reply: Reply of canceling a workflow run. Required. :type run_cancel_reply: IO :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: None :rtype: None :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace def cancel( # pylint: disable=inconsistent-return-statements self, workflow_run_id: str, run_cancel_reply: Union[JSON, IO], **kwargs: Any ) -> None: """Cancel a workflow run. :param workflow_run_id: The workflow run id. Required. :type workflow_run_id: str :param run_cancel_reply: Reply of canceling a workflow run. Is either a JSON type or a IO type. Required. :type run_cancel_reply: JSON or IO :keyword content_type: Body Parameter content-type. Known values are: 'application/json'. Default value is None. :paramtype content_type: str :return: None :rtype: None :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # JSON input template you can fill out and use as your body input. run_cancel_reply = { "comment": "str" # Optional. The comment of canceling a workflow run. } """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = kwargs.pop("params", {}) or {} content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[None] = kwargs.pop("cls", None) content_type = content_type or "application/json" _json = None _content = None if isinstance(run_cancel_reply, (IOBase, bytes)): _content = run_cancel_reply else: _json = run_cancel_reply request = build_workflow_run_cancel_request( workflow_run_id=workflow_run_id, content_type=content_type, api_version=self._config.api_version, json=_json, content=_content, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: response.read() # Load the body in memory and close the socket map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if cls: return cls(pipeline_response, None, {})
[docs]class WorkflowTasksOperations: """ .. warning:: **DO NOT** instantiate this class directly. Instead, you should access the following operations through :class:`~azure.purview.workflow.PurviewWorkflowClient`'s :attr:`workflow_tasks` attribute. """ def __init__(self, *args, **kwargs): input_args = list(args) self._client = input_args.pop(0) if input_args else kwargs.pop("client") self._config = input_args.pop(0) if input_args else kwargs.pop("config") self._serialize = input_args.pop(0) if input_args else kwargs.pop("serializer") self._deserialize = input_args.pop(0) if input_args else kwargs.pop("deserializer")
[docs] @distributed_trace def list( self, *, view_mode: Optional[str] = None, workflow_ids: Optional[List[str]] = None, time_window: Optional[str] = None, orderby: Optional[str] = None, task_types: Optional[List[str]] = None, task_statuses: Optional[List[str]] = None, requestors: Optional[List[str]] = None, assignees: Optional[List[str]] = None, workflow_name_keyword: Optional[str] = None, **kwargs: Any ) -> Iterable[JSON]: """Get all workflow tasks. :keyword view_mode: To filter user's sent, received or history workflow tasks. Default value is None. :paramtype view_mode: str :keyword workflow_ids: Filter items by workflow id list. Default value is None. :paramtype workflow_ids: list[str] :keyword time_window: Time window of filtering items. Known values are: "1d", "7d", "30d", and "90d". Default value is None. :paramtype time_window: str :keyword orderby: The key word which used to sort the results. Known values are: "status desc", "status asc", "requestor desc", "requestor asc", "startTime desc", "startTime asc", "createdTime desc", and "createdTime asc". Default value is None. :paramtype orderby: str :keyword task_types: Filter items by workflow task type. Default value is None. :paramtype task_types: list[str] :keyword task_statuses: Filter workflow tasks by status. Default value is None. :paramtype task_statuses: list[str] :keyword requestors: Requestors' ids to filter. Default value is None. :paramtype requestors: list[str] :keyword assignees: Assignees' ids to filter. Default value is None. :paramtype assignees: list[str] :keyword workflow_name_keyword: The key word which could used to filter workflow item with related workflow. Default value is None. :paramtype workflow_name_keyword: str :return: An iterator like instance of JSON object :rtype: ~azure.core.paging.ItemPaged[JSON] :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # The response is polymorphic. The following are possible polymorphic responses based off discriminator "type": # JSON input template for discriminator value "Approval": workflow_task = { "createdTime": "2020-02-20 00:00:00", # The created time. Required. "id": "str", # The workflow task id. Required. "lastUpdateTime": "2020-02-20 00:00:00", # The last update time. Required. "payload": { "targetValue": "str", # The target value of entity which user want to involve workflow to update. Required. "type": "str", # The task payload type. Required. Known values are: "CreateTerm", "UpdateTerm", "DeleteTerm", "ImportTerms", "UpdateAsset", and "GrantDataAccess". "payload": {} # Optional. The payload of the task. }, "requestor": "str", # The person who submitted the user request. Required. "type": "Approval", "workflowId": "str", # The workflow id. Required. "workflowRunId": "str", # The workflow run id. Required. "approvalDetail": { "approvalType": "str", # The approval type of an approval. Required. Known values are: "PendingOnAny" and "PendingOnAll". "approvers": { "str": { "reply": "str", # The response for an approval type of workflow task. Required. Known values are: "Approved", "Rejected", and "Pending". "comment": "str", # Optional. The comment of approving or rejecting an approval type of workflow task. "responseTime": "2020-02-20 00:00:00" # Optional. The reply time of approver to a workflow task. } }, "status": "str" # The status of an approval. Required. Known values are: "Pending", "Approved", "Rejected", and "Canceled". }, "expiryInfo": { "expirySettings": { "expireAfter": {}, # The time of expiry. Required. "notifyOnExpiration": [ "str" # Optional. Required. ] }, "expiryTime": "2020-02-20 00:00:00", # The expiry time. Required. "nextExpiryNotificationTime": "2020-02-20 00:00:00", # The next expiry notification time. Required. "lastExpiryNotificationTime": "2020-02-20 00:00:00" # Optional. The last expiry notification time. }, "reminderInfo": { "nextRemindTime": "2020-02-20 00:00:00", # The next remind time. Required. "reminderSettings": {}, # The reminder settings. Required. "lastRemindTime": "2020-02-20 00:00:00" # Optional. The last update time. }, "title": "str" # Optional. The workflow task title. } # JSON input template for discriminator value "SimpleTask": workflow_task = { "createdTime": "2020-02-20 00:00:00", # The created time. Required. "id": "str", # The workflow task id. Required. "lastUpdateTime": "2020-02-20 00:00:00", # The last update time. Required. "payload": { "targetValue": "str", # The target value of entity which user want to involve workflow to update. Required. "type": "str", # The task payload type. Required. Known values are: "CreateTerm", "UpdateTerm", "DeleteTerm", "ImportTerms", "UpdateAsset", and "GrantDataAccess". "payload": {} # Optional. The payload of the task. }, "requestor": "str", # The person who submitted the user request. Required. "type": "SimpleTask", "workflowId": "str", # The workflow id. Required. "workflowRunId": "str", # The workflow run id. Required. "expiryInfo": { "expirySettings": { "expireAfter": {}, # The time of expiry. Required. "notifyOnExpiration": [ "str" # Optional. Required. ] }, "expiryTime": "2020-02-20 00:00:00", # The expiry time. Required. "nextExpiryNotificationTime": "2020-02-20 00:00:00", # The next expiry notification time. Required. "lastExpiryNotificationTime": "2020-02-20 00:00:00" # Optional. The last expiry notification time. }, "reminderInfo": { "nextRemindTime": "2020-02-20 00:00:00", # The next remind time. Required. "reminderSettings": {}, # The reminder settings. Required. "lastRemindTime": "2020-02-20 00:00:00" # Optional. The last update time. }, "taskDetail": { "assignedTo": [ "str" # The users or groups were assigned the simple task. Required. ], "changeHistory": [ {} # Required. ], "status": "str", # Simple task status. Required. Known values are: "NotStarted", "InProgress", "Completed", and "Canceled". "taskBody": "str" # The simple task body. Required. }, "title": "str" # Optional. The workflow task title. } # response body for status code(s): 200 response == workflow_task """ _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} maxpagesize = kwargs.pop("maxpagesize", None) cls: ClsType[JSON] = kwargs.pop("cls", None) error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) def prepare_request(next_link=None): if not next_link: request = build_workflow_tasks_list_request( view_mode=view_mode, workflow_ids=workflow_ids, time_window=time_window, maxpagesize=maxpagesize, orderby=orderby, task_types=task_types, task_statuses=task_statuses, requestors=requestors, assignees=assignees, workflow_name_keyword=workflow_name_keyword, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) else: # make call to next link with the client's api-version _parsed_next_link = urllib.parse.urlparse(next_link) _next_request_params = case_insensitive_dict( { key: [urllib.parse.quote(v) for v in value] for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items() } ) _next_request_params["api-version"] = self._config.api_version request = HttpRequest( "GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params ) path_format_arguments = { "endpoint": self._serialize.url( "self._config.endpoint", self._config.endpoint, "str", skip_quote=True ), } request.url = self._client.format_url(request.url, **path_format_arguments) return request def extract_data(pipeline_response): deserialized = pipeline_response.http_response.json() list_of_elem = deserialized["value"] if cls: list_of_elem = cls(list_of_elem) # type: ignore return deserialized.get("nextLink") or None, iter(list_of_elem) def get_next(next_link=None): request = prepare_request(next_link) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: response.read() # Load the body in memory and close the socket map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return ItemPaged(get_next, extract_data)
[docs]class WorkflowTaskOperations: """ .. warning:: **DO NOT** instantiate this class directly. Instead, you should access the following operations through :class:`~azure.purview.workflow.PurviewWorkflowClient`'s :attr:`workflow_task` attribute. """ def __init__(self, *args, **kwargs): input_args = list(args) self._client = input_args.pop(0) if input_args else kwargs.pop("client") self._config = input_args.pop(0) if input_args else kwargs.pop("config") self._serialize = input_args.pop(0) if input_args else kwargs.pop("serializer") self._deserialize = input_args.pop(0) if input_args else kwargs.pop("deserializer")
[docs] @distributed_trace def get(self, task_id: str, **kwargs: Any) -> JSON: """Get a workflow task. :param task_id: The task id. Required. :type task_id: str :return: JSON object :rtype: JSON :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # The response is polymorphic. The following are possible polymorphic responses based off discriminator "type": # JSON input template for discriminator value "Approval": workflow_task = { "createdTime": "2020-02-20 00:00:00", # The created time. Required. "id": "str", # The workflow task id. Required. "lastUpdateTime": "2020-02-20 00:00:00", # The last update time. Required. "payload": { "targetValue": "str", # The target value of entity which user want to involve workflow to update. Required. "type": "str", # The task payload type. Required. Known values are: "CreateTerm", "UpdateTerm", "DeleteTerm", "ImportTerms", "UpdateAsset", and "GrantDataAccess". "payload": {} # Optional. The payload of the task. }, "requestor": "str", # The person who submitted the user request. Required. "type": "Approval", "workflowId": "str", # The workflow id. Required. "workflowRunId": "str", # The workflow run id. Required. "approvalDetail": { "approvalType": "str", # The approval type of an approval. Required. Known values are: "PendingOnAny" and "PendingOnAll". "approvers": { "str": { "reply": "str", # The response for an approval type of workflow task. Required. Known values are: "Approved", "Rejected", and "Pending". "comment": "str", # Optional. The comment of approving or rejecting an approval type of workflow task. "responseTime": "2020-02-20 00:00:00" # Optional. The reply time of approver to a workflow task. } }, "status": "str" # The status of an approval. Required. Known values are: "Pending", "Approved", "Rejected", and "Canceled". }, "expiryInfo": { "expirySettings": { "expireAfter": {}, # The time of expiry. Required. "notifyOnExpiration": [ "str" # Optional. Required. ] }, "expiryTime": "2020-02-20 00:00:00", # The expiry time. Required. "nextExpiryNotificationTime": "2020-02-20 00:00:00", # The next expiry notification time. Required. "lastExpiryNotificationTime": "2020-02-20 00:00:00" # Optional. The last expiry notification time. }, "reminderInfo": { "nextRemindTime": "2020-02-20 00:00:00", # The next remind time. Required. "reminderSettings": {}, # The reminder settings. Required. "lastRemindTime": "2020-02-20 00:00:00" # Optional. The last update time. }, "title": "str" # Optional. The workflow task title. } # JSON input template for discriminator value "SimpleTask": workflow_task = { "createdTime": "2020-02-20 00:00:00", # The created time. Required. "id": "str", # The workflow task id. Required. "lastUpdateTime": "2020-02-20 00:00:00", # The last update time. Required. "payload": { "targetValue": "str", # The target value of entity which user want to involve workflow to update. Required. "type": "str", # The task payload type. Required. Known values are: "CreateTerm", "UpdateTerm", "DeleteTerm", "ImportTerms", "UpdateAsset", and "GrantDataAccess". "payload": {} # Optional. The payload of the task. }, "requestor": "str", # The person who submitted the user request. Required. "type": "SimpleTask", "workflowId": "str", # The workflow id. Required. "workflowRunId": "str", # The workflow run id. Required. "expiryInfo": { "expirySettings": { "expireAfter": {}, # The time of expiry. Required. "notifyOnExpiration": [ "str" # Optional. Required. ] }, "expiryTime": "2020-02-20 00:00:00", # The expiry time. Required. "nextExpiryNotificationTime": "2020-02-20 00:00:00", # The next expiry notification time. Required. "lastExpiryNotificationTime": "2020-02-20 00:00:00" # Optional. The last expiry notification time. }, "reminderInfo": { "nextRemindTime": "2020-02-20 00:00:00", # The next remind time. Required. "reminderSettings": {}, # The reminder settings. Required. "lastRemindTime": "2020-02-20 00:00:00" # Optional. The last update time. }, "taskDetail": { "assignedTo": [ "str" # The users or groups were assigned the simple task. Required. ], "changeHistory": [ {} # Required. ], "status": "str", # Simple task status. Required. Known values are: "NotStarted", "InProgress", "Completed", and "Canceled". "taskBody": "str" # The simple task body. Required. }, "title": "str" # Optional. The workflow task title. } # response body for status code(s): 200 response == workflow_task """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[JSON] = kwargs.pop("cls", None) request = build_workflow_task_get_request( task_id=task_id, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: response.read() # Load the body in memory and close the socket map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if response.content: deserialized = response.json() else: deserialized = None if cls: return cls(pipeline_response, cast(JSON, deserialized), {}) return cast(JSON, deserialized)
@overload def reassign( # pylint: disable=inconsistent-return-statements self, task_id: str, reassign_command: JSON, *, content_type: str = "application/json", **kwargs: Any ) -> None: """Reassign a workflow task. :param task_id: The task id. Required. :type task_id: str :param reassign_command: The request body of reassigning a workflow task. Required. :type reassign_command: JSON :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: None :rtype: None :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # JSON input template you can fill out and use as your body input. reassign_command = { "reassignments": [ { "reassignFrom": "str", # Reassign a workflow task from a user or a group. Required. "reassignTo": "str" # Reassign a workflow task to a user or a group. Required. } ] } """ @overload def reassign( # pylint: disable=inconsistent-return-statements self, task_id: str, reassign_command: IO, *, content_type: str = "application/json", **kwargs: Any ) -> None: """Reassign a workflow task. :param task_id: The task id. Required. :type task_id: str :param reassign_command: The request body of reassigning a workflow task. Required. :type reassign_command: IO :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: None :rtype: None :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace def reassign( # pylint: disable=inconsistent-return-statements self, task_id: str, reassign_command: Union[JSON, IO], **kwargs: Any ) -> None: """Reassign a workflow task. :param task_id: The task id. Required. :type task_id: str :param reassign_command: The request body of reassigning a workflow task. Is either a JSON type or a IO type. Required. :type reassign_command: JSON or IO :keyword content_type: Body Parameter content-type. Known values are: 'application/json'. Default value is None. :paramtype content_type: str :return: None :rtype: None :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # JSON input template you can fill out and use as your body input. reassign_command = { "reassignments": [ { "reassignFrom": "str", # Reassign a workflow task from a user or a group. Required. "reassignTo": "str" # Reassign a workflow task to a user or a group. Required. } ] } """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = kwargs.pop("params", {}) or {} content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[None] = kwargs.pop("cls", None) content_type = content_type or "application/json" _json = None _content = None if isinstance(reassign_command, (IOBase, bytes)): _content = reassign_command else: _json = reassign_command request = build_workflow_task_reassign_request( task_id=task_id, content_type=content_type, api_version=self._config.api_version, json=_json, content=_content, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: response.read() # Load the body in memory and close the socket map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if cls: return cls(pipeline_response, None, {})
[docs]class ApprovalOperations: """ .. warning:: **DO NOT** instantiate this class directly. Instead, you should access the following operations through :class:`~azure.purview.workflow.PurviewWorkflowClient`'s :attr:`approval` attribute. """ def __init__(self, *args, **kwargs): input_args = list(args) self._client = input_args.pop(0) if input_args else kwargs.pop("client") self._config = input_args.pop(0) if input_args else kwargs.pop("config") self._serialize = input_args.pop(0) if input_args else kwargs.pop("serializer") self._deserialize = input_args.pop(0) if input_args else kwargs.pop("deserializer") @overload def approve( # pylint: disable=inconsistent-return-statements self, task_id: str, approval_response_comment: JSON, *, content_type: str = "application/json", **kwargs: Any ) -> None: """Approve an approval. :param task_id: The task id. Required. :type task_id: str :param approval_response_comment: The request body of approving an approval type of workflow task. Required. :type approval_response_comment: JSON :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: None :rtype: None :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # JSON input template you can fill out and use as your body input. approval_response_comment = { "comment": "str" # Optional. The comment of approving or rejecting an approval type of workflow task. } """ @overload def approve( # pylint: disable=inconsistent-return-statements self, task_id: str, approval_response_comment: IO, *, content_type: str = "application/json", **kwargs: Any ) -> None: """Approve an approval. :param task_id: The task id. Required. :type task_id: str :param approval_response_comment: The request body of approving an approval type of workflow task. Required. :type approval_response_comment: IO :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: None :rtype: None :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace def approve( # pylint: disable=inconsistent-return-statements self, task_id: str, approval_response_comment: Union[JSON, IO], **kwargs: Any ) -> None: """Approve an approval. :param task_id: The task id. Required. :type task_id: str :param approval_response_comment: The request body of approving an approval type of workflow task. Is either a JSON type or a IO type. Required. :type approval_response_comment: JSON or IO :keyword content_type: Body Parameter content-type. Known values are: 'application/json'. Default value is None. :paramtype content_type: str :return: None :rtype: None :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # JSON input template you can fill out and use as your body input. approval_response_comment = { "comment": "str" # Optional. The comment of approving or rejecting an approval type of workflow task. } """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = kwargs.pop("params", {}) or {} content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[None] = kwargs.pop("cls", None) content_type = content_type or "application/json" _json = None _content = None if isinstance(approval_response_comment, (IOBase, bytes)): _content = approval_response_comment else: _json = approval_response_comment request = build_approval_approve_request( task_id=task_id, content_type=content_type, api_version=self._config.api_version, json=_json, content=_content, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: response.read() # Load the body in memory and close the socket map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if cls: return cls(pipeline_response, None, {})
@overload def reject( # pylint: disable=inconsistent-return-statements self, task_id: str, approval_response_comment: JSON, *, content_type: str = "application/json", **kwargs: Any ) -> None: """Reject an approval. :param task_id: The task id. Required. :type task_id: str :param approval_response_comment: The request body of rejecting an approval type of workflow task. Required. :type approval_response_comment: JSON :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: None :rtype: None :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # JSON input template you can fill out and use as your body input. approval_response_comment = { "comment": "str" # Optional. The comment of approving or rejecting an approval type of workflow task. } """ @overload def reject( # pylint: disable=inconsistent-return-statements self, task_id: str, approval_response_comment: IO, *, content_type: str = "application/json", **kwargs: Any ) -> None: """Reject an approval. :param task_id: The task id. Required. :type task_id: str :param approval_response_comment: The request body of rejecting an approval type of workflow task. Required. :type approval_response_comment: IO :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: None :rtype: None :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace def reject( # pylint: disable=inconsistent-return-statements self, task_id: str, approval_response_comment: Union[JSON, IO], **kwargs: Any ) -> None: """Reject an approval. :param task_id: The task id. Required. :type task_id: str :param approval_response_comment: The request body of rejecting an approval type of workflow task. Is either a JSON type or a IO type. Required. :type approval_response_comment: JSON or IO :keyword content_type: Body Parameter content-type. Known values are: 'application/json'. Default value is None. :paramtype content_type: str :return: None :rtype: None :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # JSON input template you can fill out and use as your body input. approval_response_comment = { "comment": "str" # Optional. The comment of approving or rejecting an approval type of workflow task. } """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = kwargs.pop("params", {}) or {} content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[None] = kwargs.pop("cls", None) content_type = content_type or "application/json" _json = None _content = None if isinstance(approval_response_comment, (IOBase, bytes)): _content = approval_response_comment else: _json = approval_response_comment request = build_approval_reject_request( task_id=task_id, content_type=content_type, api_version=self._config.api_version, json=_json, content=_content, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: response.read() # Load the body in memory and close the socket map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if cls: return cls(pipeline_response, None, {})
[docs]class TaskStatusOperations: """ .. warning:: **DO NOT** instantiate this class directly. Instead, you should access the following operations through :class:`~azure.purview.workflow.PurviewWorkflowClient`'s :attr:`task_status` attribute. """ def __init__(self, *args, **kwargs): input_args = list(args) self._client = input_args.pop(0) if input_args else kwargs.pop("client") self._config = input_args.pop(0) if input_args else kwargs.pop("config") self._serialize = input_args.pop(0) if input_args else kwargs.pop("serializer") self._deserialize = input_args.pop(0) if input_args else kwargs.pop("deserializer") @overload def update( # pylint: disable=inconsistent-return-statements self, task_id: str, task_update_command: JSON, *, content_type: str = "application/json", **kwargs: Any ) -> None: """Update the status of a workflow task request. :param task_id: The task id. Required. :type task_id: str :param task_update_command: Request body of updating workflow task request. Required. :type task_update_command: JSON :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: None :rtype: None :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # JSON input template you can fill out and use as your body input. task_update_command = { "newStatus": "str", # The new status will be used to update the task. Required. Known values are: "NotStarted", "InProgress", "Completed", and "Canceled". "comment": "str" # Optional. The comment when update a task. } """ @overload def update( # pylint: disable=inconsistent-return-statements self, task_id: str, task_update_command: IO, *, content_type: str = "application/json", **kwargs: Any ) -> None: """Update the status of a workflow task request. :param task_id: The task id. Required. :type task_id: str :param task_update_command: Request body of updating workflow task request. Required. :type task_update_command: IO :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: None :rtype: None :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace def update( # pylint: disable=inconsistent-return-statements self, task_id: str, task_update_command: Union[JSON, IO], **kwargs: Any ) -> None: """Update the status of a workflow task request. :param task_id: The task id. Required. :type task_id: str :param task_update_command: Request body of updating workflow task request. Is either a JSON type or a IO type. Required. :type task_update_command: JSON or IO :keyword content_type: Body Parameter content-type. Known values are: 'application/json'. Default value is None. :paramtype content_type: str :return: None :rtype: None :raises ~azure.core.exceptions.HttpResponseError: Example: .. code-block:: python # JSON input template you can fill out and use as your body input. task_update_command = { "newStatus": "str", # The new status will be used to update the task. Required. Known values are: "NotStarted", "InProgress", "Completed", and "Canceled". "comment": "str" # Optional. The comment when update a task. } """ error_map = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = kwargs.pop("params", {}) or {} content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[None] = kwargs.pop("cls", None) content_type = content_type or "application/json" _json = None _content = None if isinstance(task_update_command, (IOBase, bytes)): _content = task_update_command else: _json = task_update_command request = build_task_status_update_request( task_id=task_id, content_type=content_type, api_version=self._config.api_version, json=_json, content=_content, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str", skip_quote=True), } request.url = self._client.format_url(request.url, **path_format_arguments) _stream = False pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: response.read() # Load the body in memory and close the socket map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if cls: return cls(pipeline_response, None, {})