Source code for azure.ai.projects.aio.operations._operations

# pylint: disable=too-many-lines
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) Python Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
from io import IOBase
import json
import sys
from typing import Any, AsyncIterable, Callable, Dict, IO, List, Optional, TYPE_CHECKING, TypeVar, Union, overload
import urllib.parse

from azure.core import AsyncPipelineClient
from azure.core.async_paging import AsyncItemPaged, AsyncList
from azure.core.exceptions import (
    ClientAuthenticationError,
    HttpResponseError,
    ResourceExistsError,
    ResourceNotFoundError,
    ResourceNotModifiedError,
    StreamClosedError,
    StreamConsumedError,
    map_error,
)
from azure.core.pipeline import PipelineResponse
from azure.core.rest import AsyncHttpResponse, HttpRequest
from azure.core.tracing.decorator import distributed_trace
from azure.core.tracing.decorator_async import distributed_trace_async
from azure.core.utils import case_insensitive_dict

from ... import _model_base, models as _models
from ..._model_base import SdkJSONEncoder, _deserialize
from ..._serialization import Deserializer, Serializer
from ..._vendor import FileType, prepare_multipart_form_data
from ...operations._operations import (
    build_agents_cancel_run_request,
    build_agents_cancel_vector_store_file_batch_request,
    build_agents_create_agent_request,
    build_agents_create_message_request,
    build_agents_create_run_request,
    build_agents_create_thread_and_run_request,
    build_agents_create_thread_request,
    build_agents_create_vector_store_file_batch_request,
    build_agents_create_vector_store_file_request,
    build_agents_create_vector_store_request,
    build_agents_delete_agent_request,
    build_agents_delete_file_request,
    build_agents_delete_thread_request,
    build_agents_delete_vector_store_file_request,
    build_agents_delete_vector_store_request,
    build_agents_get_agent_request,
    build_agents_get_file_content_request,
    build_agents_get_file_request,
    build_agents_get_message_request,
    build_agents_get_run_request,
    build_agents_get_run_step_request,
    build_agents_get_thread_request,
    build_agents_get_vector_store_file_batch_request,
    build_agents_get_vector_store_file_request,
    build_agents_get_vector_store_request,
    build_agents_list_agents_request,
    build_agents_list_files_request,
    build_agents_list_messages_request,
    build_agents_list_run_steps_request,
    build_agents_list_runs_request,
    build_agents_list_vector_store_file_batch_files_request,
    build_agents_list_vector_store_files_request,
    build_agents_list_vector_stores_request,
    build_agents_modify_vector_store_request,
    build_agents_submit_tool_outputs_to_run_request,
    build_agents_update_agent_request,
    build_agents_update_message_request,
    build_agents_update_run_request,
    build_agents_update_thread_request,
    build_agents_upload_file_request,
    build_connections_get_connection_request,
    build_connections_get_connection_with_secrets_request,
    build_connections_get_workspace_request,
    build_connections_list_connections_request,
    build_evaluations_create_or_replace_schedule_request,
    build_evaluations_create_request,
    build_evaluations_disable_schedule_request,
    build_evaluations_get_request,
    build_evaluations_get_schedule_request,
    build_evaluations_list_request,
    build_evaluations_list_schedule_request,
    build_evaluations_update_request,
    build_telemetry_get_app_insights_request,
)
from .._configuration import AIProjectClientConfiguration

if sys.version_info >= (3, 9):
    from collections.abc import MutableMapping
else:
    from typing import MutableMapping  # type: ignore

if TYPE_CHECKING:
    from ... import _types
JSON = MutableMapping[str, Any]  # pylint: disable=unsubscriptable-object
_Unset: Any = object()
T = TypeVar("T")
ClsType = Optional[Callable[[PipelineResponse[HttpRequest, AsyncHttpResponse], T, Dict[str, Any]], Any]]


class AgentsOperations:  # pylint: disable=too-many-public-methods
    """
    .. warning::
        **DO NOT** instantiate this class directly.

        Instead, you should access the following operations through
        :class:`~azure.ai.projects.aio.AIProjectClient`'s
        :attr:`agents` attribute.
    """

    def __init__(self, *args, **kwargs) -> None:
        input_args = list(args)
        self._client: AsyncPipelineClient = input_args.pop(0) if input_args else kwargs.pop("client")
        self._config: AIProjectClientConfiguration = input_args.pop(0) if input_args else kwargs.pop("config")
        self._serialize: Serializer = input_args.pop(0) if input_args else kwargs.pop("serializer")
        self._deserialize: Deserializer = input_args.pop(0) if input_args else kwargs.pop("deserializer")

    @overload
    async def create_agent(
        self,
        *,
        model: str,
        content_type: str = "application/json",
        name: Optional[str] = None,
        description: Optional[str] = None,
        instructions: Optional[str] = None,
        tools: Optional[List[_models.ToolDefinition]] = None,
        tool_resources: Optional[_models.ToolResources] = None,
        temperature: Optional[float] = None,
        top_p: Optional[float] = None,
        response_format: Optional["_types.AgentsApiResponseFormatOption"] = None,
        metadata: Optional[Dict[str, str]] = None,
        **kwargs: Any
    ) -> _models.Agent:
        """Creates a new agent.

        :keyword model: The ID of the model to use. Required.
        :paramtype model: str
        :keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
         Default value is "application/json".
        :paramtype content_type: str
        :keyword name: The name of the new agent. Default value is None.
        :paramtype name: str
        :keyword description: The description of the new agent. Default value is None.
        :paramtype description: str
        :keyword instructions: The system instructions for the new agent to use. Default value is None.
        :paramtype instructions: str
        :keyword tools: The collection of tools to enable for the new agent. Default value is None.
        :paramtype tools: list[~azure.ai.projects.models.ToolDefinition]
        :keyword tool_resources: A set of resources that are used by the agent's tools. The resources
         are specific to the type of tool. For example, the ``code_interpreter``
         tool requires a list of file IDs, while the ``file_search`` tool requires a list of vector
         store IDs. Default value is None.
        :paramtype tool_resources: ~azure.ai.projects.models.ToolResources
        :keyword temperature: What sampling temperature to use, between 0 and 2. Higher values like 0.8
         will make the output more random,
         while lower values like 0.2 will make it more focused and deterministic. Default value is
         None.
        :paramtype temperature: float
        :keyword top_p: An alternative to sampling with temperature, called nucleus sampling, where the
         model considers the results of the tokens with top_p probability mass.
         So 0.1 means only the tokens comprising the top 10% probability mass are considered.

         We generally recommend altering this or temperature but not both. Default value is None.
        :paramtype top_p: float
        :keyword response_format: The response format of the tool calls used by this agent. Is one of
         the following types: str, Union[str, "_models.AgentsApiResponseFormatMode"],
         AgentsApiResponseFormat, ResponseFormatJsonSchemaType Default value is None.
        :paramtype response_format: str or str or ~azure.ai.projects.models.AgentsApiResponseFormatMode
         or ~azure.ai.projects.models.AgentsApiResponseFormat or
         ~azure.ai.projects.models.ResponseFormatJsonSchemaType
        :keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
         for storing additional information about that object in a structured format. Keys may be up to
         64 characters in length and values may be up to 512 characters in length. Default value is
         None.
        :paramtype metadata: dict[str, str]
        :return: Agent. The Agent is compatible with MutableMapping
        :rtype: ~azure.ai.projects.models.Agent
        :raises ~azure.core.exceptions.HttpResponseError:
        """

    @overload
    async def create_agent(self, body: JSON, *, content_type: str = "application/json", **kwargs: Any) -> _models.Agent:
        """Creates a new agent.

        :param body: Required.
        :type body: JSON
        :keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
         Default value is "application/json".
        :paramtype content_type: str
        :return: Agent. The Agent is compatible with MutableMapping
        :rtype: ~azure.ai.projects.models.Agent
        :raises ~azure.core.exceptions.HttpResponseError:
        """

    @overload
    async def create_agent(
        self, body: IO[bytes], *, content_type: str = "application/json", **kwargs: Any
    ) -> _models.Agent:
        """Creates a new agent.

        :param body: Required.
        :type body: IO[bytes]
        :keyword content_type: Body Parameter content-type. Content type parameter for binary body.
         Default value is "application/json".
        :paramtype content_type: str
        :return: Agent. The Agent is compatible with MutableMapping
        :rtype: ~azure.ai.projects.models.Agent
        :raises ~azure.core.exceptions.HttpResponseError:
        """

    @distributed_trace_async
    async def create_agent(
        self,
        body: Union[JSON, IO[bytes]] = _Unset,
        *,
        model: str = _Unset,
        name: Optional[str] = None,
        description: Optional[str] = None,
        instructions: Optional[str] = None,
        tools: Optional[List[_models.ToolDefinition]] = None,
        tool_resources: Optional[_models.ToolResources] = None,
        temperature: Optional[float] = None,
        top_p: Optional[float] = None,
        response_format: Optional["_types.AgentsApiResponseFormatOption"] = None,
        metadata: Optional[Dict[str, str]] = None,
        **kwargs: Any
    ) -> _models.Agent:
        """Creates a new agent.

        :param body: Is either a JSON type or a IO[bytes] type. Required.
        :type body: JSON or IO[bytes]
        :keyword model: The ID of the model to use. Required.
        :paramtype model: str
        :keyword name: The name of the new agent. Default value is None.
        :paramtype name: str
        :keyword description: The description of the new agent. Default value is None.
        :paramtype description: str
        :keyword instructions: The system instructions for the new agent to use. Default value is None.
        :paramtype instructions: str
        :keyword tools: The collection of tools to enable for the new agent. Default value is None.
        :paramtype tools: list[~azure.ai.projects.models.ToolDefinition]
        :keyword tool_resources: A set of resources that are used by the agent's tools. The resources
         are specific to the type of tool. For example, the ``code_interpreter``
         tool requires a list of file IDs, while the ``file_search`` tool requires a list of vector
         store IDs. Default value is None.
        :paramtype tool_resources: ~azure.ai.projects.models.ToolResources
        :keyword temperature: What sampling temperature to use, between 0 and 2. Higher values like 0.8
         will make the output more random,
         while lower values like 0.2 will make it more focused and deterministic. Default value is
         None.
        :paramtype temperature: float
        :keyword top_p: An alternative to sampling with temperature, called nucleus sampling, where the
         model considers the results of the tokens with top_p probability mass.
         So 0.1 means only the tokens comprising the top 10% probability mass are considered.

         We generally recommend altering this or temperature but not both. Default value is None.
        :paramtype top_p: float
        :keyword response_format: The response format of the tool calls used by this agent. Is one of
         the following types: str, Union[str, "_models.AgentsApiResponseFormatMode"],
         AgentsApiResponseFormat, ResponseFormatJsonSchemaType Default value is None.
        :paramtype response_format: str or str or ~azure.ai.projects.models.AgentsApiResponseFormatMode
         or ~azure.ai.projects.models.AgentsApiResponseFormat or
         ~azure.ai.projects.models.ResponseFormatJsonSchemaType
        :keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
         for storing additional information about that object in a structured format. Keys may be up to
         64 characters in length and values may be up to 512 characters in length. Default value is
         None.
        :paramtype metadata: dict[str, str]
        :return: Agent. The Agent is compatible with MutableMapping
        :rtype: ~azure.ai.projects.models.Agent
        :raises ~azure.core.exceptions.HttpResponseError:
        """
        error_map: MutableMapping = {
            401: ClientAuthenticationError,
            404: ResourceNotFoundError,
            409: ResourceExistsError,
            304: ResourceNotModifiedError,
        }
        error_map.update(kwargs.pop("error_map", {}) or {})

        _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
        _params = kwargs.pop("params", {}) or {}

        content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
        cls: ClsType[_models.Agent] = kwargs.pop("cls", None)

        if body is _Unset:
            if model is _Unset:
                raise TypeError("missing required argument: model")
            body = {
                "description": description,
                "instructions": instructions,
                "metadata": metadata,
                "model": model,
                "name": name,
                "response_format": response_format,
                "temperature": temperature,
                "tool_resources": tool_resources,
                "tools": tools,
                "top_p": top_p,
            }
            body = {k: v for k, v in body.items() if v is not None}
        content_type = content_type or "application/json"
        _content = None
        if isinstance(body, (IOBase, bytes)):
            _content = body
        else:
            _content = json.dumps(body, cls=SdkJSONEncoder, exclude_readonly=True)  # type: ignore

        _request = build_agents_create_agent_request(
            content_type=content_type,
            api_version=self._config.api_version,
            content=_content,
            headers=_headers,
            params=_params,
        )
        path_format_arguments = {
            "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
            "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
            "resourceGroupName": self._serialize.url(
                "self._config.resource_group_name", self._config.resource_group_name, "str"
            ),
            "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
        }
        _request.url = self._client.format_url(_request.url, **path_format_arguments)

        _stream = kwargs.pop("stream", False)
        pipeline_response: PipelineResponse = await self._client._pipeline.run(  # pylint: disable=protected-access
            _request, stream=_stream, **kwargs
        )

        response = pipeline_response.http_response

        if response.status_code not in [200]:
            if _stream:
                try:
                    await response.read()  # Load the body in memory and close the socket
                except (StreamConsumedError, StreamClosedError):
                    pass
            map_error(status_code=response.status_code, response=response, error_map=error_map)
            raise HttpResponseError(response=response)

        if _stream:
            deserialized = response.iter_bytes()
        else:
            deserialized = _deserialize(_models.Agent, response.json())

        if cls:
            return cls(pipeline_response, deserialized, {})  # type: ignore

        return deserialized  # type: ignore

[docs] @distributed_trace_async async def list_agents( self, *, limit: Optional[int] = None, order: Optional[Union[str, _models.ListSortOrder]] = None, after: Optional[str] = None, before: Optional[str] = None, **kwargs: Any ) -> _models.OpenAIPageableListOfAgent: """Gets a list of agents that were previously created. :keyword limit: A limit on the number of objects to be returned. Limit can range between 1 and 100, and the default is 20. Default value is None. :paramtype limit: int :keyword order: Sort order by the created_at timestamp of the objects. asc for ascending order and desc for descending order. Known values are: "asc" and "desc". Default value is None. :paramtype order: str or ~azure.ai.projects.models.ListSortOrder :keyword after: A cursor for use in pagination. after is an object ID that defines your place in the list. For instance, if you make a list request and receive 100 objects, ending with obj_foo, your subsequent call can include after=obj_foo in order to fetch the next page of the list. Default value is None. :paramtype after: str :keyword before: A cursor for use in pagination. before is an object ID that defines your place in the list. For instance, if you make a list request and receive 100 objects, ending with obj_foo, your subsequent call can include before=obj_foo in order to fetch the previous page of the list. Default value is None. :paramtype before: str :return: OpenAIPageableListOfAgent. The OpenAIPageableListOfAgent is compatible with MutableMapping :rtype: ~azure.ai.projects.models.OpenAIPageableListOfAgent :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[_models.OpenAIPageableListOfAgent] = kwargs.pop("cls", None) _request = build_agents_list_agents_request( limit=limit, order=order, after=after, before=before, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.OpenAIPageableListOfAgent, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace_async async def get_agent(self, assistant_id: str, **kwargs: Any) -> _models.Agent: """Retrieves an existing agent. :param assistant_id: Identifier of the agent. Required. :type assistant_id: str :return: Agent. The Agent is compatible with MutableMapping :rtype: ~azure.ai.projects.models.Agent :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[_models.Agent] = kwargs.pop("cls", None) _request = build_agents_get_agent_request( assistant_id=assistant_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.Agent, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
@overload async def update_agent( self, assistant_id: str, *, content_type: str = "application/json", model: Optional[str] = None, name: Optional[str] = None, description: Optional[str] = None, instructions: Optional[str] = None, tools: Optional[List[_models.ToolDefinition]] = None, tool_resources: Optional[_models.ToolResources] = None, temperature: Optional[float] = None, top_p: Optional[float] = None, response_format: Optional["_types.AgentsApiResponseFormatOption"] = None, metadata: Optional[Dict[str, str]] = None, **kwargs: Any ) -> _models.Agent: """Modifies an existing agent. :param assistant_id: The ID of the agent to modify. Required. :type assistant_id: str :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :keyword model: The ID of the model to use. Default value is None. :paramtype model: str :keyword name: The modified name for the agent to use. Default value is None. :paramtype name: str :keyword description: The modified description for the agent to use. Default value is None. :paramtype description: str :keyword instructions: The modified system instructions for the new agent to use. Default value is None. :paramtype instructions: str :keyword tools: The modified collection of tools to enable for the agent. Default value is None. :paramtype tools: list[~azure.ai.projects.models.ToolDefinition] :keyword tool_resources: A set of resources that are used by the agent's tools. The resources are specific to the type of tool. For example, the ``code_interpreter`` tool requires a list of file IDs, while the ``file_search`` tool requires a list of vector store IDs. Default value is None. :paramtype tool_resources: ~azure.ai.projects.models.ToolResources :keyword temperature: What sampling temperature to use, between 0 and 2. Higher values like 0.8 will make the output more random, while lower values like 0.2 will make it more focused and deterministic. Default value is None. :paramtype temperature: float :keyword top_p: An alternative to sampling with temperature, called nucleus sampling, where the model considers the results of the tokens with top_p probability mass. So 0.1 means only the tokens comprising the top 10% probability mass are considered. We generally recommend altering this or temperature but not both. Default value is None. :paramtype top_p: float :keyword response_format: The response format of the tool calls used by this agent. Is one of the following types: str, Union[str, "_models.AgentsApiResponseFormatMode"], AgentsApiResponseFormat, ResponseFormatJsonSchemaType Default value is None. :paramtype response_format: str or str or ~azure.ai.projects.models.AgentsApiResponseFormatMode or ~azure.ai.projects.models.AgentsApiResponseFormat or ~azure.ai.projects.models.ResponseFormatJsonSchemaType :keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used for storing additional information about that object in a structured format. Keys may be up to 64 characters in length and values may be up to 512 characters in length. Default value is None. :paramtype metadata: dict[str, str] :return: Agent. The Agent is compatible with MutableMapping :rtype: ~azure.ai.projects.models.Agent :raises ~azure.core.exceptions.HttpResponseError: """ @overload async def update_agent( self, assistant_id: str, body: JSON, *, content_type: str = "application/json", **kwargs: Any ) -> _models.Agent: """Modifies an existing agent. :param assistant_id: The ID of the agent to modify. Required. :type assistant_id: str :param body: Required. :type body: JSON :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: Agent. The Agent is compatible with MutableMapping :rtype: ~azure.ai.projects.models.Agent :raises ~azure.core.exceptions.HttpResponseError: """ @overload async def update_agent( self, assistant_id: str, body: IO[bytes], *, content_type: str = "application/json", **kwargs: Any ) -> _models.Agent: """Modifies an existing agent. :param assistant_id: The ID of the agent to modify. Required. :type assistant_id: str :param body: Required. :type body: IO[bytes] :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: Agent. The Agent is compatible with MutableMapping :rtype: ~azure.ai.projects.models.Agent :raises ~azure.core.exceptions.HttpResponseError: """ @distributed_trace_async async def update_agent( self, assistant_id: str, body: Union[JSON, IO[bytes]] = _Unset, *, model: Optional[str] = None, name: Optional[str] = None, description: Optional[str] = None, instructions: Optional[str] = None, tools: Optional[List[_models.ToolDefinition]] = None, tool_resources: Optional[_models.ToolResources] = None, temperature: Optional[float] = None, top_p: Optional[float] = None, response_format: Optional["_types.AgentsApiResponseFormatOption"] = None, metadata: Optional[Dict[str, str]] = None, **kwargs: Any ) -> _models.Agent: """Modifies an existing agent. :param assistant_id: The ID of the agent to modify. Required. :type assistant_id: str :param body: Is either a JSON type or a IO[bytes] type. Required. :type body: JSON or IO[bytes] :keyword model: The ID of the model to use. Default value is None. :paramtype model: str :keyword name: The modified name for the agent to use. Default value is None. :paramtype name: str :keyword description: The modified description for the agent to use. Default value is None. :paramtype description: str :keyword instructions: The modified system instructions for the new agent to use. Default value is None. :paramtype instructions: str :keyword tools: The modified collection of tools to enable for the agent. Default value is None. :paramtype tools: list[~azure.ai.projects.models.ToolDefinition] :keyword tool_resources: A set of resources that are used by the agent's tools. The resources are specific to the type of tool. For example, the ``code_interpreter`` tool requires a list of file IDs, while the ``file_search`` tool requires a list of vector store IDs. Default value is None. :paramtype tool_resources: ~azure.ai.projects.models.ToolResources :keyword temperature: What sampling temperature to use, between 0 and 2. Higher values like 0.8 will make the output more random, while lower values like 0.2 will make it more focused and deterministic. Default value is None. :paramtype temperature: float :keyword top_p: An alternative to sampling with temperature, called nucleus sampling, where the model considers the results of the tokens with top_p probability mass. So 0.1 means only the tokens comprising the top 10% probability mass are considered. We generally recommend altering this or temperature but not both. Default value is None. :paramtype top_p: float :keyword response_format: The response format of the tool calls used by this agent. Is one of the following types: str, Union[str, "_models.AgentsApiResponseFormatMode"], AgentsApiResponseFormat, ResponseFormatJsonSchemaType Default value is None. :paramtype response_format: str or str or ~azure.ai.projects.models.AgentsApiResponseFormatMode or ~azure.ai.projects.models.AgentsApiResponseFormat or ~azure.ai.projects.models.ResponseFormatJsonSchemaType :keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used for storing additional information about that object in a structured format. Keys may be up to 64 characters in length and values may be up to 512 characters in length. Default value is None. :paramtype metadata: dict[str, str] :return: Agent. The Agent is compatible with MutableMapping :rtype: ~azure.ai.projects.models.Agent :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = kwargs.pop("params", {}) or {} content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[_models.Agent] = kwargs.pop("cls", None) if body is _Unset: body = { "description": description, "instructions": instructions, "metadata": metadata, "model": model, "name": name, "response_format": response_format, "temperature": temperature, "tool_resources": tool_resources, "tools": tools, "top_p": top_p, } body = {k: v for k, v in body.items() if v is not None} content_type = content_type or "application/json" _content = None if isinstance(body, (IOBase, bytes)): _content = body else: _content = json.dumps(body, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore _request = build_agents_update_agent_request( assistant_id=assistant_id, content_type=content_type, api_version=self._config.api_version, content=_content, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.Agent, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore @distributed_trace_async async def delete_agent(self, assistant_id: str, **kwargs: Any) -> _models.AgentDeletionStatus: """Deletes an agent. :param assistant_id: Identifier of the agent. Required. :type assistant_id: str :return: AgentDeletionStatus. The AgentDeletionStatus is compatible with MutableMapping :rtype: ~azure.ai.projects.models.AgentDeletionStatus :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[_models.AgentDeletionStatus] = kwargs.pop("cls", None) _request = build_agents_delete_agent_request( assistant_id=assistant_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.AgentDeletionStatus, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore @overload async def create_thread( self, *, content_type: str = "application/json", messages: Optional[List[_models.ThreadMessageOptions]] = None, tool_resources: Optional[_models.ToolResources] = None, metadata: Optional[Dict[str, str]] = None, **kwargs: Any ) -> _models.AgentThread: """Creates a new thread. Threads contain messages and can be run by agents. :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :keyword messages: The initial messages to associate with the new thread. Default value is None. :paramtype messages: list[~azure.ai.projects.models.ThreadMessageOptions] :keyword tool_resources: A set of resources that are made available to the agent's tools in this thread. The resources are specific to the type of tool. For example, the ``code_interpreter`` tool requires a list of file IDs, while the ``file_search`` tool requires a list of vector store IDs. Default value is None. :paramtype tool_resources: ~azure.ai.projects.models.ToolResources :keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used for storing additional information about that object in a structured format. Keys may be up to 64 characters in length and values may be up to 512 characters in length. Default value is None. :paramtype metadata: dict[str, str] :return: AgentThread. The AgentThread is compatible with MutableMapping :rtype: ~azure.ai.projects.models.AgentThread :raises ~azure.core.exceptions.HttpResponseError: """ @overload async def create_thread( self, body: JSON, *, content_type: str = "application/json", **kwargs: Any ) -> _models.AgentThread: """Creates a new thread. Threads contain messages and can be run by agents. :param body: Required. :type body: JSON :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: AgentThread. The AgentThread is compatible with MutableMapping :rtype: ~azure.ai.projects.models.AgentThread :raises ~azure.core.exceptions.HttpResponseError: """ @overload async def create_thread( self, body: IO[bytes], *, content_type: str = "application/json", **kwargs: Any ) -> _models.AgentThread: """Creates a new thread. Threads contain messages and can be run by agents. :param body: Required. :type body: IO[bytes] :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: AgentThread. The AgentThread is compatible with MutableMapping :rtype: ~azure.ai.projects.models.AgentThread :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace_async async def create_thread( self, body: Union[JSON, IO[bytes]] = _Unset, *, messages: Optional[List[_models.ThreadMessageOptions]] = None, tool_resources: Optional[_models.ToolResources] = None, metadata: Optional[Dict[str, str]] = None, **kwargs: Any ) -> _models.AgentThread: """Creates a new thread. Threads contain messages and can be run by agents. :param body: Is either a JSON type or a IO[bytes] type. Required. :type body: JSON or IO[bytes] :keyword messages: The initial messages to associate with the new thread. Default value is None. :paramtype messages: list[~azure.ai.projects.models.ThreadMessageOptions] :keyword tool_resources: A set of resources that are made available to the agent's tools in this thread. The resources are specific to the type of tool. For example, the ``code_interpreter`` tool requires a list of file IDs, while the ``file_search`` tool requires a list of vector store IDs. Default value is None. :paramtype tool_resources: ~azure.ai.projects.models.ToolResources :keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used for storing additional information about that object in a structured format. Keys may be up to 64 characters in length and values may be up to 512 characters in length. Default value is None. :paramtype metadata: dict[str, str] :return: AgentThread. The AgentThread is compatible with MutableMapping :rtype: ~azure.ai.projects.models.AgentThread :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = kwargs.pop("params", {}) or {} content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[_models.AgentThread] = kwargs.pop("cls", None) if body is _Unset: body = {"messages": messages, "metadata": metadata, "tool_resources": tool_resources} body = {k: v for k, v in body.items() if v is not None} content_type = content_type or "application/json" _content = None if isinstance(body, (IOBase, bytes)): _content = body else: _content = json.dumps(body, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore _request = build_agents_create_thread_request( content_type=content_type, api_version=self._config.api_version, content=_content, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.AgentThread, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace_async async def get_thread(self, thread_id: str, **kwargs: Any) -> _models.AgentThread: """Gets information about an existing thread. :param thread_id: Identifier of the thread. Required. :type thread_id: str :return: AgentThread. The AgentThread is compatible with MutableMapping :rtype: ~azure.ai.projects.models.AgentThread :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[_models.AgentThread] = kwargs.pop("cls", None) _request = build_agents_get_thread_request( thread_id=thread_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.AgentThread, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
@overload async def update_thread( self, thread_id: str, *, content_type: str = "application/json", tool_resources: Optional[_models.ToolResources] = None, metadata: Optional[Dict[str, str]] = None, **kwargs: Any ) -> _models.AgentThread: """Modifies an existing thread. :param thread_id: The ID of the thread to modify. Required. :type thread_id: str :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :keyword tool_resources: A set of resources that are made available to the agent's tools in this thread. The resources are specific to the type of tool. For example, the ``code_interpreter`` tool requires a list of file IDs, while the ``file_search`` tool requires a list of vector store IDs. Default value is None. :paramtype tool_resources: ~azure.ai.projects.models.ToolResources :keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used for storing additional information about that object in a structured format. Keys may be up to 64 characters in length and values may be up to 512 characters in length. Default value is None. :paramtype metadata: dict[str, str] :return: AgentThread. The AgentThread is compatible with MutableMapping :rtype: ~azure.ai.projects.models.AgentThread :raises ~azure.core.exceptions.HttpResponseError: """ @overload async def update_thread( self, thread_id: str, body: JSON, *, content_type: str = "application/json", **kwargs: Any ) -> _models.AgentThread: """Modifies an existing thread. :param thread_id: The ID of the thread to modify. Required. :type thread_id: str :param body: Required. :type body: JSON :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: AgentThread. The AgentThread is compatible with MutableMapping :rtype: ~azure.ai.projects.models.AgentThread :raises ~azure.core.exceptions.HttpResponseError: """ @overload async def update_thread( self, thread_id: str, body: IO[bytes], *, content_type: str = "application/json", **kwargs: Any ) -> _models.AgentThread: """Modifies an existing thread. :param thread_id: The ID of the thread to modify. Required. :type thread_id: str :param body: Required. :type body: IO[bytes] :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: AgentThread. The AgentThread is compatible with MutableMapping :rtype: ~azure.ai.projects.models.AgentThread :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace_async async def update_thread( self, thread_id: str, body: Union[JSON, IO[bytes]] = _Unset, *, tool_resources: Optional[_models.ToolResources] = None, metadata: Optional[Dict[str, str]] = None, **kwargs: Any ) -> _models.AgentThread: """Modifies an existing thread. :param thread_id: The ID of the thread to modify. Required. :type thread_id: str :param body: Is either a JSON type or a IO[bytes] type. Required. :type body: JSON or IO[bytes] :keyword tool_resources: A set of resources that are made available to the agent's tools in this thread. The resources are specific to the type of tool. For example, the ``code_interpreter`` tool requires a list of file IDs, while the ``file_search`` tool requires a list of vector store IDs. Default value is None. :paramtype tool_resources: ~azure.ai.projects.models.ToolResources :keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used for storing additional information about that object in a structured format. Keys may be up to 64 characters in length and values may be up to 512 characters in length. Default value is None. :paramtype metadata: dict[str, str] :return: AgentThread. The AgentThread is compatible with MutableMapping :rtype: ~azure.ai.projects.models.AgentThread :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = kwargs.pop("params", {}) or {} content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[_models.AgentThread] = kwargs.pop("cls", None) if body is _Unset: body = {"metadata": metadata, "tool_resources": tool_resources} body = {k: v for k, v in body.items() if v is not None} content_type = content_type or "application/json" _content = None if isinstance(body, (IOBase, bytes)): _content = body else: _content = json.dumps(body, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore _request = build_agents_update_thread_request( thread_id=thread_id, content_type=content_type, api_version=self._config.api_version, content=_content, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.AgentThread, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace_async async def delete_thread(self, thread_id: str, **kwargs: Any) -> _models.ThreadDeletionStatus: """Deletes an existing thread. :param thread_id: Identifier of the thread. Required. :type thread_id: str :return: ThreadDeletionStatus. The ThreadDeletionStatus is compatible with MutableMapping :rtype: ~azure.ai.projects.models.ThreadDeletionStatus :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[_models.ThreadDeletionStatus] = kwargs.pop("cls", None) _request = build_agents_delete_thread_request( thread_id=thread_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.ThreadDeletionStatus, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
@overload async def create_message( self, thread_id: str, *, role: Union[str, _models.MessageRole], content: str, content_type: str = "application/json", attachments: Optional[List[_models.MessageAttachment]] = None, metadata: Optional[Dict[str, str]] = None, **kwargs: Any ) -> _models.ThreadMessage: """Creates a new message on a specified thread. :param thread_id: Identifier of the thread. Required. :type thread_id: str :keyword role: The role of the entity that is creating the message. Allowed values include: * ``user``\\ : Indicates the message is sent by an actual user and should be used in most cases to represent user-generated messages. * ``assistant``\\ : Indicates the message is generated by the agent. Use this value to insert messages from the agent into the conversation. Known values are: "user" and "assistant". Required. :paramtype role: str or ~azure.ai.projects.models.MessageRole :keyword content: The textual content of the initial message. Currently, robust input including images and annotated text may only be provided via a separate call to the create message API. Required. :paramtype content: str :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :keyword attachments: A list of files attached to the message, and the tools they should be added to. Default value is None. :paramtype attachments: list[~azure.ai.projects.models.MessageAttachment] :keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used for storing additional information about that object in a structured format. Keys may be up to 64 characters in length and values may be up to 512 characters in length. Default value is None. :paramtype metadata: dict[str, str] :return: ThreadMessage. The ThreadMessage is compatible with MutableMapping :rtype: ~azure.ai.projects.models.ThreadMessage :raises ~azure.core.exceptions.HttpResponseError: """ @overload async def create_message( self, thread_id: str, body: JSON, *, content_type: str = "application/json", **kwargs: Any ) -> _models.ThreadMessage: """Creates a new message on a specified thread. :param thread_id: Identifier of the thread. Required. :type thread_id: str :param body: Required. :type body: JSON :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: ThreadMessage. The ThreadMessage is compatible with MutableMapping :rtype: ~azure.ai.projects.models.ThreadMessage :raises ~azure.core.exceptions.HttpResponseError: """ @overload async def create_message( self, thread_id: str, body: IO[bytes], *, content_type: str = "application/json", **kwargs: Any ) -> _models.ThreadMessage: """Creates a new message on a specified thread. :param thread_id: Identifier of the thread. Required. :type thread_id: str :param body: Required. :type body: IO[bytes] :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: ThreadMessage. The ThreadMessage is compatible with MutableMapping :rtype: ~azure.ai.projects.models.ThreadMessage :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace_async async def create_message( self, thread_id: str, body: Union[JSON, IO[bytes]] = _Unset, *, role: Union[str, _models.MessageRole] = _Unset, content: str = _Unset, attachments: Optional[List[_models.MessageAttachment]] = None, metadata: Optional[Dict[str, str]] = None, **kwargs: Any ) -> _models.ThreadMessage: """Creates a new message on a specified thread. :param thread_id: Identifier of the thread. Required. :type thread_id: str :param body: Is either a JSON type or a IO[bytes] type. Required. :type body: JSON or IO[bytes] :keyword role: The role of the entity that is creating the message. Allowed values include: * ``user``\\ : Indicates the message is sent by an actual user and should be used in most cases to represent user-generated messages. * ``assistant``\\ : Indicates the message is generated by the agent. Use this value to insert messages from the agent into the conversation. Known values are: "user" and "assistant". Required. :paramtype role: str or ~azure.ai.projects.models.MessageRole :keyword content: The textual content of the initial message. Currently, robust input including images and annotated text may only be provided via a separate call to the create message API. Required. :paramtype content: str :keyword attachments: A list of files attached to the message, and the tools they should be added to. Default value is None. :paramtype attachments: list[~azure.ai.projects.models.MessageAttachment] :keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used for storing additional information about that object in a structured format. Keys may be up to 64 characters in length and values may be up to 512 characters in length. Default value is None. :paramtype metadata: dict[str, str] :return: ThreadMessage. The ThreadMessage is compatible with MutableMapping :rtype: ~azure.ai.projects.models.ThreadMessage :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = kwargs.pop("params", {}) or {} content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[_models.ThreadMessage] = kwargs.pop("cls", None) if body is _Unset: if role is _Unset: raise TypeError("missing required argument: role") if content is _Unset: raise TypeError("missing required argument: content") body = {"attachments": attachments, "content": content, "metadata": metadata, "role": role} body = {k: v for k, v in body.items() if v is not None} content_type = content_type or "application/json" _content = None if isinstance(body, (IOBase, bytes)): _content = body else: _content = json.dumps(body, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore _request = build_agents_create_message_request( thread_id=thread_id, content_type=content_type, api_version=self._config.api_version, content=_content, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.ThreadMessage, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace_async async def list_messages( self, thread_id: str, *, run_id: Optional[str] = None, limit: Optional[int] = None, order: Optional[Union[str, _models.ListSortOrder]] = None, after: Optional[str] = None, before: Optional[str] = None, **kwargs: Any ) -> _models.OpenAIPageableListOfThreadMessage: """Gets a list of messages that exist on a thread. :param thread_id: Identifier of the thread. Required. :type thread_id: str :keyword run_id: Filter messages by the run ID that generated them. Default value is None. :paramtype run_id: str :keyword limit: A limit on the number of objects to be returned. Limit can range between 1 and 100, and the default is 20. Default value is None. :paramtype limit: int :keyword order: Sort order by the created_at timestamp of the objects. asc for ascending order and desc for descending order. Known values are: "asc" and "desc". Default value is None. :paramtype order: str or ~azure.ai.projects.models.ListSortOrder :keyword after: A cursor for use in pagination. after is an object ID that defines your place in the list. For instance, if you make a list request and receive 100 objects, ending with obj_foo, your subsequent call can include after=obj_foo in order to fetch the next page of the list. Default value is None. :paramtype after: str :keyword before: A cursor for use in pagination. before is an object ID that defines your place in the list. For instance, if you make a list request and receive 100 objects, ending with obj_foo, your subsequent call can include before=obj_foo in order to fetch the previous page of the list. Default value is None. :paramtype before: str :return: OpenAIPageableListOfThreadMessage. The OpenAIPageableListOfThreadMessage is compatible with MutableMapping :rtype: ~azure.ai.projects.models.OpenAIPageableListOfThreadMessage :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[_models.OpenAIPageableListOfThreadMessage] = kwargs.pop("cls", None) _request = build_agents_list_messages_request( thread_id=thread_id, run_id=run_id, limit=limit, order=order, after=after, before=before, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.OpenAIPageableListOfThreadMessage, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace_async async def get_message(self, thread_id: str, message_id: str, **kwargs: Any) -> _models.ThreadMessage: """Gets an existing message from an existing thread. :param thread_id: Identifier of the thread. Required. :type thread_id: str :param message_id: Identifier of the message. Required. :type message_id: str :return: ThreadMessage. The ThreadMessage is compatible with MutableMapping :rtype: ~azure.ai.projects.models.ThreadMessage :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[_models.ThreadMessage] = kwargs.pop("cls", None) _request = build_agents_get_message_request( thread_id=thread_id, message_id=message_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.ThreadMessage, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
@overload async def update_message( self, thread_id: str, message_id: str, *, content_type: str = "application/json", metadata: Optional[Dict[str, str]] = None, **kwargs: Any ) -> _models.ThreadMessage: """Modifies an existing message on an existing thread. :param thread_id: Identifier of the thread. Required. :type thread_id: str :param message_id: Identifier of the message. Required. :type message_id: str :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used for storing additional information about that object in a structured format. Keys may be up to 64 characters in length and values may be up to 512 characters in length. Default value is None. :paramtype metadata: dict[str, str] :return: ThreadMessage. The ThreadMessage is compatible with MutableMapping :rtype: ~azure.ai.projects.models.ThreadMessage :raises ~azure.core.exceptions.HttpResponseError: """ @overload async def update_message( self, thread_id: str, message_id: str, body: JSON, *, content_type: str = "application/json", **kwargs: Any ) -> _models.ThreadMessage: """Modifies an existing message on an existing thread. :param thread_id: Identifier of the thread. Required. :type thread_id: str :param message_id: Identifier of the message. Required. :type message_id: str :param body: Required. :type body: JSON :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: ThreadMessage. The ThreadMessage is compatible with MutableMapping :rtype: ~azure.ai.projects.models.ThreadMessage :raises ~azure.core.exceptions.HttpResponseError: """ @overload async def update_message( self, thread_id: str, message_id: str, body: IO[bytes], *, content_type: str = "application/json", **kwargs: Any ) -> _models.ThreadMessage: """Modifies an existing message on an existing thread. :param thread_id: Identifier of the thread. Required. :type thread_id: str :param message_id: Identifier of the message. Required. :type message_id: str :param body: Required. :type body: IO[bytes] :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: ThreadMessage. The ThreadMessage is compatible with MutableMapping :rtype: ~azure.ai.projects.models.ThreadMessage :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace_async async def update_message( self, thread_id: str, message_id: str, body: Union[JSON, IO[bytes]] = _Unset, *, metadata: Optional[Dict[str, str]] = None, **kwargs: Any ) -> _models.ThreadMessage: """Modifies an existing message on an existing thread. :param thread_id: Identifier of the thread. Required. :type thread_id: str :param message_id: Identifier of the message. Required. :type message_id: str :param body: Is either a JSON type or a IO[bytes] type. Required. :type body: JSON or IO[bytes] :keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used for storing additional information about that object in a structured format. Keys may be up to 64 characters in length and values may be up to 512 characters in length. Default value is None. :paramtype metadata: dict[str, str] :return: ThreadMessage. The ThreadMessage is compatible with MutableMapping :rtype: ~azure.ai.projects.models.ThreadMessage :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = kwargs.pop("params", {}) or {} content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[_models.ThreadMessage] = kwargs.pop("cls", None) if body is _Unset: body = {"metadata": metadata} body = {k: v for k, v in body.items() if v is not None} content_type = content_type or "application/json" _content = None if isinstance(body, (IOBase, bytes)): _content = body else: _content = json.dumps(body, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore _request = build_agents_update_message_request( thread_id=thread_id, message_id=message_id, content_type=content_type, api_version=self._config.api_version, content=_content, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.ThreadMessage, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
@overload async def create_run( self, thread_id: str, *, assistant_id: str, include: Optional[List[Union[str, _models.RunAdditionalFieldList]]] = None, content_type: str = "application/json", model: Optional[str] = None, instructions: Optional[str] = None, additional_instructions: Optional[str] = None, additional_messages: Optional[List[_models.ThreadMessageOptions]] = None, tools: Optional[List[_models.ToolDefinition]] = None, stream_parameter: Optional[bool] = None, temperature: Optional[float] = None, top_p: Optional[float] = None, max_prompt_tokens: Optional[int] = None, max_completion_tokens: Optional[int] = None, truncation_strategy: Optional[_models.TruncationObject] = None, tool_choice: Optional["_types.AgentsApiToolChoiceOption"] = None, response_format: Optional["_types.AgentsApiResponseFormatOption"] = None, parallel_tool_calls: Optional[bool] = None, metadata: Optional[Dict[str, str]] = None, **kwargs: Any ) -> _models.ThreadRun: """Creates a new run for an agent thread. :param thread_id: Identifier of the thread. Required. :type thread_id: str :keyword assistant_id: The ID of the agent that should run the thread. Required. :paramtype assistant_id: str :keyword include: A list of additional fields to include in the response. Currently the only supported value is ``step_details.tool_calls[*].file_search.results[*].content`` to fetch the file search result content. Default value is None. :paramtype include: list[str or ~azure.ai.projects.models.RunAdditionalFieldList] :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :keyword model: The overridden model name that the agent should use to run the thread. Default value is None. :paramtype model: str :keyword instructions: The overridden system instructions that the agent should use to run the thread. Default value is None. :paramtype instructions: str :keyword additional_instructions: Additional instructions to append at the end of the instructions for the run. This is useful for modifying the behavior on a per-run basis without overriding other instructions. Default value is None. :paramtype additional_instructions: str :keyword additional_messages: Adds additional messages to the thread before creating the run. Default value is None. :paramtype additional_messages: list[~azure.ai.projects.models.ThreadMessageOptions] :keyword tools: The overridden list of enabled tools that the agent should use to run the thread. Default value is None. :paramtype tools: list[~azure.ai.projects.models.ToolDefinition] :keyword stream_parameter: If ``true``\\ , returns a stream of events that happen during the Run as server-sent events, terminating when the Run enters a terminal state with a ``data: [DONE]`` message. Default value is None. :paramtype stream_parameter: bool :keyword temperature: What sampling temperature to use, between 0 and 2. Higher values like 0.8 will make the output more random, while lower values like 0.2 will make it more focused and deterministic. Default value is None. :paramtype temperature: float :keyword top_p: An alternative to sampling with temperature, called nucleus sampling, where the model considers the results of the tokens with top_p probability mass. So 0.1 means only the tokens comprising the top 10% probability mass are considered. We generally recommend altering this or temperature but not both. Default value is None. :paramtype top_p: float :keyword max_prompt_tokens: The maximum number of prompt tokens that may be used over the course of the run. The run will make a best effort to use only the number of prompt tokens specified, across multiple turns of the run. If the run exceeds the number of prompt tokens specified, the run will end with status ``incomplete``. See ``incomplete_details`` for more info. Default value is None. :paramtype max_prompt_tokens: int :keyword max_completion_tokens: The maximum number of completion tokens that may be used over the course of the run. The run will make a best effort to use only the number of completion tokens specified, across multiple turns of the run. If the run exceeds the number of completion tokens specified, the run will end with status ``incomplete``. See ``incomplete_details`` for more info. Default value is None. :paramtype max_completion_tokens: int :keyword truncation_strategy: The strategy to use for dropping messages as the context windows moves forward. Default value is None. :paramtype truncation_strategy: ~azure.ai.projects.models.TruncationObject :keyword tool_choice: Controls whether or not and which tool is called by the model. Is one of the following types: str, Union[str, "_models.AgentsApiToolChoiceOptionMode"], AgentsNamedToolChoice Default value is None. :paramtype tool_choice: str or str or ~azure.ai.projects.models.AgentsApiToolChoiceOptionMode or ~azure.ai.projects.models.AgentsNamedToolChoice :keyword response_format: Specifies the format that the model must output. Is one of the following types: str, Union[str, "_models.AgentsApiResponseFormatMode"], AgentsApiResponseFormat, ResponseFormatJsonSchemaType Default value is None. :paramtype response_format: str or str or ~azure.ai.projects.models.AgentsApiResponseFormatMode or ~azure.ai.projects.models.AgentsApiResponseFormat or ~azure.ai.projects.models.ResponseFormatJsonSchemaType :keyword parallel_tool_calls: If ``true`` functions will run in parallel during tool use. Default value is None. :paramtype parallel_tool_calls: bool :keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used for storing additional information about that object in a structured format. Keys may be up to 64 characters in length and values may be up to 512 characters in length. Default value is None. :paramtype metadata: dict[str, str] :return: ThreadRun. The ThreadRun is compatible with MutableMapping :rtype: ~azure.ai.projects.models.ThreadRun :raises ~azure.core.exceptions.HttpResponseError: """ @overload async def create_run( self, thread_id: str, body: JSON, *, include: Optional[List[Union[str, _models.RunAdditionalFieldList]]] = None, content_type: str = "application/json", **kwargs: Any ) -> _models.ThreadRun: """Creates a new run for an agent thread. :param thread_id: Identifier of the thread. Required. :type thread_id: str :param body: Required. :type body: JSON :keyword include: A list of additional fields to include in the response. Currently the only supported value is ``step_details.tool_calls[*].file_search.results[*].content`` to fetch the file search result content. Default value is None. :paramtype include: list[str or ~azure.ai.projects.models.RunAdditionalFieldList] :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: ThreadRun. The ThreadRun is compatible with MutableMapping :rtype: ~azure.ai.projects.models.ThreadRun :raises ~azure.core.exceptions.HttpResponseError: """ @overload async def create_run( self, thread_id: str, body: IO[bytes], *, include: Optional[List[Union[str, _models.RunAdditionalFieldList]]] = None, content_type: str = "application/json", **kwargs: Any ) -> _models.ThreadRun: """Creates a new run for an agent thread. :param thread_id: Identifier of the thread. Required. :type thread_id: str :param body: Required. :type body: IO[bytes] :keyword include: A list of additional fields to include in the response. Currently the only supported value is ``step_details.tool_calls[*].file_search.results[*].content`` to fetch the file search result content. Default value is None. :paramtype include: list[str or ~azure.ai.projects.models.RunAdditionalFieldList] :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: ThreadRun. The ThreadRun is compatible with MutableMapping :rtype: ~azure.ai.projects.models.ThreadRun :raises ~azure.core.exceptions.HttpResponseError: """ @distributed_trace_async async def create_run( self, thread_id: str, body: Union[JSON, IO[bytes]] = _Unset, *, assistant_id: str = _Unset, include: Optional[List[Union[str, _models.RunAdditionalFieldList]]] = None, model: Optional[str] = None, instructions: Optional[str] = None, additional_instructions: Optional[str] = None, additional_messages: Optional[List[_models.ThreadMessageOptions]] = None, tools: Optional[List[_models.ToolDefinition]] = None, stream_parameter: Optional[bool] = None, temperature: Optional[float] = None, top_p: Optional[float] = None, max_prompt_tokens: Optional[int] = None, max_completion_tokens: Optional[int] = None, truncation_strategy: Optional[_models.TruncationObject] = None, tool_choice: Optional["_types.AgentsApiToolChoiceOption"] = None, response_format: Optional["_types.AgentsApiResponseFormatOption"] = None, parallel_tool_calls: Optional[bool] = None, metadata: Optional[Dict[str, str]] = None, **kwargs: Any ) -> _models.ThreadRun: """Creates a new run for an agent thread. :param thread_id: Identifier of the thread. Required. :type thread_id: str :param body: Is either a JSON type or a IO[bytes] type. Required. :type body: JSON or IO[bytes] :keyword assistant_id: The ID of the agent that should run the thread. Required. :paramtype assistant_id: str :keyword include: A list of additional fields to include in the response. Currently the only supported value is ``step_details.tool_calls[*].file_search.results[*].content`` to fetch the file search result content. Default value is None. :paramtype include: list[str or ~azure.ai.projects.models.RunAdditionalFieldList] :keyword model: The overridden model name that the agent should use to run the thread. Default value is None. :paramtype model: str :keyword instructions: The overridden system instructions that the agent should use to run the thread. Default value is None. :paramtype instructions: str :keyword additional_instructions: Additional instructions to append at the end of the instructions for the run. This is useful for modifying the behavior on a per-run basis without overriding other instructions. Default value is None. :paramtype additional_instructions: str :keyword additional_messages: Adds additional messages to the thread before creating the run. Default value is None. :paramtype additional_messages: list[~azure.ai.projects.models.ThreadMessageOptions] :keyword tools: The overridden list of enabled tools that the agent should use to run the thread. Default value is None. :paramtype tools: list[~azure.ai.projects.models.ToolDefinition] :keyword stream_parameter: If ``true``\\ , returns a stream of events that happen during the Run as server-sent events, terminating when the Run enters a terminal state with a ``data: [DONE]`` message. Default value is None. :paramtype stream_parameter: bool :keyword temperature: What sampling temperature to use, between 0 and 2. Higher values like 0.8 will make the output more random, while lower values like 0.2 will make it more focused and deterministic. Default value is None. :paramtype temperature: float :keyword top_p: An alternative to sampling with temperature, called nucleus sampling, where the model considers the results of the tokens with top_p probability mass. So 0.1 means only the tokens comprising the top 10% probability mass are considered. We generally recommend altering this or temperature but not both. Default value is None. :paramtype top_p: float :keyword max_prompt_tokens: The maximum number of prompt tokens that may be used over the course of the run. The run will make a best effort to use only the number of prompt tokens specified, across multiple turns of the run. If the run exceeds the number of prompt tokens specified, the run will end with status ``incomplete``. See ``incomplete_details`` for more info. Default value is None. :paramtype max_prompt_tokens: int :keyword max_completion_tokens: The maximum number of completion tokens that may be used over the course of the run. The run will make a best effort to use only the number of completion tokens specified, across multiple turns of the run. If the run exceeds the number of completion tokens specified, the run will end with status ``incomplete``. See ``incomplete_details`` for more info. Default value is None. :paramtype max_completion_tokens: int :keyword truncation_strategy: The strategy to use for dropping messages as the context windows moves forward. Default value is None. :paramtype truncation_strategy: ~azure.ai.projects.models.TruncationObject :keyword tool_choice: Controls whether or not and which tool is called by the model. Is one of the following types: str, Union[str, "_models.AgentsApiToolChoiceOptionMode"], AgentsNamedToolChoice Default value is None. :paramtype tool_choice: str or str or ~azure.ai.projects.models.AgentsApiToolChoiceOptionMode or ~azure.ai.projects.models.AgentsNamedToolChoice :keyword response_format: Specifies the format that the model must output. Is one of the following types: str, Union[str, "_models.AgentsApiResponseFormatMode"], AgentsApiResponseFormat, ResponseFormatJsonSchemaType Default value is None. :paramtype response_format: str or str or ~azure.ai.projects.models.AgentsApiResponseFormatMode or ~azure.ai.projects.models.AgentsApiResponseFormat or ~azure.ai.projects.models.ResponseFormatJsonSchemaType :keyword parallel_tool_calls: If ``true`` functions will run in parallel during tool use. Default value is None. :paramtype parallel_tool_calls: bool :keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used for storing additional information about that object in a structured format. Keys may be up to 64 characters in length and values may be up to 512 characters in length. Default value is None. :paramtype metadata: dict[str, str] :return: ThreadRun. The ThreadRun is compatible with MutableMapping :rtype: ~azure.ai.projects.models.ThreadRun :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = kwargs.pop("params", {}) or {} content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[_models.ThreadRun] = kwargs.pop("cls", None) if body is _Unset: if assistant_id is _Unset: raise TypeError("missing required argument: assistant_id") body = { "additional_instructions": additional_instructions, "additional_messages": additional_messages, "assistant_id": assistant_id, "instructions": instructions, "max_completion_tokens": max_completion_tokens, "max_prompt_tokens": max_prompt_tokens, "metadata": metadata, "model": model, "parallel_tool_calls": parallel_tool_calls, "response_format": response_format, "stream": stream_parameter, "temperature": temperature, "tool_choice": tool_choice, "tools": tools, "top_p": top_p, "truncation_strategy": truncation_strategy, } body = {k: v for k, v in body.items() if v is not None} content_type = content_type or "application/json" _content = None if isinstance(body, (IOBase, bytes)): _content = body else: _content = json.dumps(body, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore _request = build_agents_create_run_request( thread_id=thread_id, include=include, content_type=content_type, api_version=self._config.api_version, content=_content, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.ThreadRun, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace_async async def list_runs( self, thread_id: str, *, limit: Optional[int] = None, order: Optional[Union[str, _models.ListSortOrder]] = None, after: Optional[str] = None, before: Optional[str] = None, **kwargs: Any ) -> _models.OpenAIPageableListOfThreadRun: """Gets a list of runs for a specified thread. :param thread_id: Identifier of the thread. Required. :type thread_id: str :keyword limit: A limit on the number of objects to be returned. Limit can range between 1 and 100, and the default is 20. Default value is None. :paramtype limit: int :keyword order: Sort order by the created_at timestamp of the objects. asc for ascending order and desc for descending order. Known values are: "asc" and "desc". Default value is None. :paramtype order: str or ~azure.ai.projects.models.ListSortOrder :keyword after: A cursor for use in pagination. after is an object ID that defines your place in the list. For instance, if you make a list request and receive 100 objects, ending with obj_foo, your subsequent call can include after=obj_foo in order to fetch the next page of the list. Default value is None. :paramtype after: str :keyword before: A cursor for use in pagination. before is an object ID that defines your place in the list. For instance, if you make a list request and receive 100 objects, ending with obj_foo, your subsequent call can include before=obj_foo in order to fetch the previous page of the list. Default value is None. :paramtype before: str :return: OpenAIPageableListOfThreadRun. The OpenAIPageableListOfThreadRun is compatible with MutableMapping :rtype: ~azure.ai.projects.models.OpenAIPageableListOfThreadRun :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[_models.OpenAIPageableListOfThreadRun] = kwargs.pop("cls", None) _request = build_agents_list_runs_request( thread_id=thread_id, limit=limit, order=order, after=after, before=before, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.OpenAIPageableListOfThreadRun, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace_async async def get_run(self, thread_id: str, run_id: str, **kwargs: Any) -> _models.ThreadRun: """Gets an existing run from an existing thread. :param thread_id: Identifier of the thread. Required. :type thread_id: str :param run_id: Identifier of the run. Required. :type run_id: str :return: ThreadRun. The ThreadRun is compatible with MutableMapping :rtype: ~azure.ai.projects.models.ThreadRun :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[_models.ThreadRun] = kwargs.pop("cls", None) _request = build_agents_get_run_request( thread_id=thread_id, run_id=run_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.ThreadRun, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
@overload async def update_run( self, thread_id: str, run_id: str, *, content_type: str = "application/json", metadata: Optional[Dict[str, str]] = None, **kwargs: Any ) -> _models.ThreadRun: """Modifies an existing thread run. :param thread_id: Identifier of the thread. Required. :type thread_id: str :param run_id: Identifier of the run. Required. :type run_id: str :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used for storing additional information about that object in a structured format. Keys may be up to 64 characters in length and values may be up to 512 characters in length. Default value is None. :paramtype metadata: dict[str, str] :return: ThreadRun. The ThreadRun is compatible with MutableMapping :rtype: ~azure.ai.projects.models.ThreadRun :raises ~azure.core.exceptions.HttpResponseError: """ @overload async def update_run( self, thread_id: str, run_id: str, body: JSON, *, content_type: str = "application/json", **kwargs: Any ) -> _models.ThreadRun: """Modifies an existing thread run. :param thread_id: Identifier of the thread. Required. :type thread_id: str :param run_id: Identifier of the run. Required. :type run_id: str :param body: Required. :type body: JSON :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: ThreadRun. The ThreadRun is compatible with MutableMapping :rtype: ~azure.ai.projects.models.ThreadRun :raises ~azure.core.exceptions.HttpResponseError: """ @overload async def update_run( self, thread_id: str, run_id: str, body: IO[bytes], *, content_type: str = "application/json", **kwargs: Any ) -> _models.ThreadRun: """Modifies an existing thread run. :param thread_id: Identifier of the thread. Required. :type thread_id: str :param run_id: Identifier of the run. Required. :type run_id: str :param body: Required. :type body: IO[bytes] :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: ThreadRun. The ThreadRun is compatible with MutableMapping :rtype: ~azure.ai.projects.models.ThreadRun :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace_async async def update_run( self, thread_id: str, run_id: str, body: Union[JSON, IO[bytes]] = _Unset, *, metadata: Optional[Dict[str, str]] = None, **kwargs: Any ) -> _models.ThreadRun: """Modifies an existing thread run. :param thread_id: Identifier of the thread. Required. :type thread_id: str :param run_id: Identifier of the run. Required. :type run_id: str :param body: Is either a JSON type or a IO[bytes] type. Required. :type body: JSON or IO[bytes] :keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used for storing additional information about that object in a structured format. Keys may be up to 64 characters in length and values may be up to 512 characters in length. Default value is None. :paramtype metadata: dict[str, str] :return: ThreadRun. The ThreadRun is compatible with MutableMapping :rtype: ~azure.ai.projects.models.ThreadRun :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = kwargs.pop("params", {}) or {} content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[_models.ThreadRun] = kwargs.pop("cls", None) if body is _Unset: body = {"metadata": metadata} body = {k: v for k, v in body.items() if v is not None} content_type = content_type or "application/json" _content = None if isinstance(body, (IOBase, bytes)): _content = body else: _content = json.dumps(body, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore _request = build_agents_update_run_request( thread_id=thread_id, run_id=run_id, content_type=content_type, api_version=self._config.api_version, content=_content, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.ThreadRun, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
@overload async def submit_tool_outputs_to_run( self, thread_id: str, run_id: str, *, tool_outputs: List[_models.ToolOutput], content_type: str = "application/json", stream_parameter: Optional[bool] = None, **kwargs: Any ) -> _models.ThreadRun: """Submits outputs from tools as requested by tool calls in a run. Runs that need submitted tool outputs will have a status of 'requires_action' with a required_action.type of 'submit_tool_outputs'. :param thread_id: Identifier of the thread. Required. :type thread_id: str :param run_id: Identifier of the run. Required. :type run_id: str :keyword tool_outputs: A list of tools for which the outputs are being submitted. Required. :paramtype tool_outputs: list[~azure.ai.projects.models.ToolOutput] :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :keyword stream_parameter: If true, returns a stream of events that happen during the Run as server-sent events, terminating when the run enters a terminal state. Default value is None. :paramtype stream_parameter: bool :return: ThreadRun. The ThreadRun is compatible with MutableMapping :rtype: ~azure.ai.projects.models.ThreadRun :raises ~azure.core.exceptions.HttpResponseError: """ @overload async def submit_tool_outputs_to_run( self, thread_id: str, run_id: str, body: JSON, *, content_type: str = "application/json", **kwargs: Any ) -> _models.ThreadRun: """Submits outputs from tools as requested by tool calls in a run. Runs that need submitted tool outputs will have a status of 'requires_action' with a required_action.type of 'submit_tool_outputs'. :param thread_id: Identifier of the thread. Required. :type thread_id: str :param run_id: Identifier of the run. Required. :type run_id: str :param body: Required. :type body: JSON :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: ThreadRun. The ThreadRun is compatible with MutableMapping :rtype: ~azure.ai.projects.models.ThreadRun :raises ~azure.core.exceptions.HttpResponseError: """ @overload async def submit_tool_outputs_to_run( self, thread_id: str, run_id: str, body: IO[bytes], *, content_type: str = "application/json", **kwargs: Any ) -> _models.ThreadRun: """Submits outputs from tools as requested by tool calls in a run. Runs that need submitted tool outputs will have a status of 'requires_action' with a required_action.type of 'submit_tool_outputs'. :param thread_id: Identifier of the thread. Required. :type thread_id: str :param run_id: Identifier of the run. Required. :type run_id: str :param body: Required. :type body: IO[bytes] :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: ThreadRun. The ThreadRun is compatible with MutableMapping :rtype: ~azure.ai.projects.models.ThreadRun :raises ~azure.core.exceptions.HttpResponseError: """ @distributed_trace_async async def submit_tool_outputs_to_run( self, thread_id: str, run_id: str, body: Union[JSON, IO[bytes]] = _Unset, *, tool_outputs: List[_models.ToolOutput] = _Unset, stream_parameter: Optional[bool] = None, **kwargs: Any ) -> _models.ThreadRun: """Submits outputs from tools as requested by tool calls in a run. Runs that need submitted tool outputs will have a status of 'requires_action' with a required_action.type of 'submit_tool_outputs'. :param thread_id: Identifier of the thread. Required. :type thread_id: str :param run_id: Identifier of the run. Required. :type run_id: str :param body: Is either a JSON type or a IO[bytes] type. Required. :type body: JSON or IO[bytes] :keyword tool_outputs: A list of tools for which the outputs are being submitted. Required. :paramtype tool_outputs: list[~azure.ai.projects.models.ToolOutput] :keyword stream_parameter: If true, returns a stream of events that happen during the Run as server-sent events, terminating when the run enters a terminal state. Default value is None. :paramtype stream_parameter: bool :return: ThreadRun. The ThreadRun is compatible with MutableMapping :rtype: ~azure.ai.projects.models.ThreadRun :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = kwargs.pop("params", {}) or {} content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[_models.ThreadRun] = kwargs.pop("cls", None) if body is _Unset: if tool_outputs is _Unset: raise TypeError("missing required argument: tool_outputs") body = {"stream": stream_parameter, "tool_outputs": tool_outputs} body = {k: v for k, v in body.items() if v is not None} content_type = content_type or "application/json" _content = None if isinstance(body, (IOBase, bytes)): _content = body else: _content = json.dumps(body, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore _request = build_agents_submit_tool_outputs_to_run_request( thread_id=thread_id, run_id=run_id, content_type=content_type, api_version=self._config.api_version, content=_content, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.ThreadRun, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace_async async def cancel_run(self, thread_id: str, run_id: str, **kwargs: Any) -> _models.ThreadRun: """Cancels a run of an in progress thread. :param thread_id: Identifier of the thread. Required. :type thread_id: str :param run_id: Identifier of the run. Required. :type run_id: str :return: ThreadRun. The ThreadRun is compatible with MutableMapping :rtype: ~azure.ai.projects.models.ThreadRun :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[_models.ThreadRun] = kwargs.pop("cls", None) _request = build_agents_cancel_run_request( thread_id=thread_id, run_id=run_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.ThreadRun, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
@overload async def create_thread_and_run( self, *, assistant_id: str, content_type: str = "application/json", thread: Optional[_models.AgentThreadCreationOptions] = None, model: Optional[str] = None, instructions: Optional[str] = None, tools: Optional[List[_models.ToolDefinition]] = None, tool_resources: Optional[_models.UpdateToolResourcesOptions] = None, stream_parameter: Optional[bool] = None, temperature: Optional[float] = None, top_p: Optional[float] = None, max_prompt_tokens: Optional[int] = None, max_completion_tokens: Optional[int] = None, truncation_strategy: Optional[_models.TruncationObject] = None, tool_choice: Optional["_types.AgentsApiToolChoiceOption"] = None, response_format: Optional["_types.AgentsApiResponseFormatOption"] = None, parallel_tool_calls: Optional[bool] = None, metadata: Optional[Dict[str, str]] = None, **kwargs: Any ) -> _models.ThreadRun: """Creates a new agent thread and immediately starts a run using that new thread. :keyword assistant_id: The ID of the agent for which the thread should be created. Required. :paramtype assistant_id: str :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :keyword thread: The details used to create the new thread. If no thread is provided, an empty one will be created. Default value is None. :paramtype thread: ~azure.ai.projects.models.AgentThreadCreationOptions :keyword model: The overridden model that the agent should use to run the thread. Default value is None. :paramtype model: str :keyword instructions: The overridden system instructions the agent should use to run the thread. Default value is None. :paramtype instructions: str :keyword tools: The overridden list of enabled tools the agent should use to run the thread. Default value is None. :paramtype tools: list[~azure.ai.projects.models.ToolDefinition] :keyword tool_resources: Override the tools the agent can use for this run. This is useful for modifying the behavior on a per-run basis. Default value is None. :paramtype tool_resources: ~azure.ai.projects.models.UpdateToolResourcesOptions :keyword stream_parameter: If ``true``\\ , returns a stream of events that happen during the Run as server-sent events, terminating when the Run enters a terminal state with a ``data: [DONE]`` message. Default value is None. :paramtype stream_parameter: bool :keyword temperature: What sampling temperature to use, between 0 and 2. Higher values like 0.8 will make the output more random, while lower values like 0.2 will make it more focused and deterministic. Default value is None. :paramtype temperature: float :keyword top_p: An alternative to sampling with temperature, called nucleus sampling, where the model considers the results of the tokens with top_p probability mass. So 0.1 means only the tokens comprising the top 10% probability mass are considered. We generally recommend altering this or temperature but not both. Default value is None. :paramtype top_p: float :keyword max_prompt_tokens: The maximum number of prompt tokens that may be used over the course of the run. The run will make a best effort to use only the number of prompt tokens specified, across multiple turns of the run. If the run exceeds the number of prompt tokens specified, the run will end with status ``incomplete``. See ``incomplete_details`` for more info. Default value is None. :paramtype max_prompt_tokens: int :keyword max_completion_tokens: The maximum number of completion tokens that may be used over the course of the run. The run will make a best effort to use only the number of completion tokens specified, across multiple turns of the run. If the run exceeds the number of completion tokens specified, the run will end with status ``incomplete``. See ``incomplete_details`` for more info. Default value is None. :paramtype max_completion_tokens: int :keyword truncation_strategy: The strategy to use for dropping messages as the context windows moves forward. Default value is None. :paramtype truncation_strategy: ~azure.ai.projects.models.TruncationObject :keyword tool_choice: Controls whether or not and which tool is called by the model. Is one of the following types: str, Union[str, "_models.AgentsApiToolChoiceOptionMode"], AgentsNamedToolChoice Default value is None. :paramtype tool_choice: str or str or ~azure.ai.projects.models.AgentsApiToolChoiceOptionMode or ~azure.ai.projects.models.AgentsNamedToolChoice :keyword response_format: Specifies the format that the model must output. Is one of the following types: str, Union[str, "_models.AgentsApiResponseFormatMode"], AgentsApiResponseFormat, ResponseFormatJsonSchemaType Default value is None. :paramtype response_format: str or str or ~azure.ai.projects.models.AgentsApiResponseFormatMode or ~azure.ai.projects.models.AgentsApiResponseFormat or ~azure.ai.projects.models.ResponseFormatJsonSchemaType :keyword parallel_tool_calls: If ``true`` functions will run in parallel during tool use. Default value is None. :paramtype parallel_tool_calls: bool :keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used for storing additional information about that object in a structured format. Keys may be up to 64 characters in length and values may be up to 512 characters in length. Default value is None. :paramtype metadata: dict[str, str] :return: ThreadRun. The ThreadRun is compatible with MutableMapping :rtype: ~azure.ai.projects.models.ThreadRun :raises ~azure.core.exceptions.HttpResponseError: """ @overload async def create_thread_and_run( self, body: JSON, *, content_type: str = "application/json", **kwargs: Any ) -> _models.ThreadRun: """Creates a new agent thread and immediately starts a run using that new thread. :param body: Required. :type body: JSON :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: ThreadRun. The ThreadRun is compatible with MutableMapping :rtype: ~azure.ai.projects.models.ThreadRun :raises ~azure.core.exceptions.HttpResponseError: """ @overload async def create_thread_and_run( self, body: IO[bytes], *, content_type: str = "application/json", **kwargs: Any ) -> _models.ThreadRun: """Creates a new agent thread and immediately starts a run using that new thread. :param body: Required. :type body: IO[bytes] :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: ThreadRun. The ThreadRun is compatible with MutableMapping :rtype: ~azure.ai.projects.models.ThreadRun :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace_async async def create_thread_and_run( self, body: Union[JSON, IO[bytes]] = _Unset, *, assistant_id: str = _Unset, thread: Optional[_models.AgentThreadCreationOptions] = None, model: Optional[str] = None, instructions: Optional[str] = None, tools: Optional[List[_models.ToolDefinition]] = None, tool_resources: Optional[_models.UpdateToolResourcesOptions] = None, stream_parameter: Optional[bool] = None, temperature: Optional[float] = None, top_p: Optional[float] = None, max_prompt_tokens: Optional[int] = None, max_completion_tokens: Optional[int] = None, truncation_strategy: Optional[_models.TruncationObject] = None, tool_choice: Optional["_types.AgentsApiToolChoiceOption"] = None, response_format: Optional["_types.AgentsApiResponseFormatOption"] = None, parallel_tool_calls: Optional[bool] = None, metadata: Optional[Dict[str, str]] = None, **kwargs: Any ) -> _models.ThreadRun: """Creates a new agent thread and immediately starts a run using that new thread. :param body: Is either a JSON type or a IO[bytes] type. Required. :type body: JSON or IO[bytes] :keyword assistant_id: The ID of the agent for which the thread should be created. Required. :paramtype assistant_id: str :keyword thread: The details used to create the new thread. If no thread is provided, an empty one will be created. Default value is None. :paramtype thread: ~azure.ai.projects.models.AgentThreadCreationOptions :keyword model: The overridden model that the agent should use to run the thread. Default value is None. :paramtype model: str :keyword instructions: The overridden system instructions the agent should use to run the thread. Default value is None. :paramtype instructions: str :keyword tools: The overridden list of enabled tools the agent should use to run the thread. Default value is None. :paramtype tools: list[~azure.ai.projects.models.ToolDefinition] :keyword tool_resources: Override the tools the agent can use for this run. This is useful for modifying the behavior on a per-run basis. Default value is None. :paramtype tool_resources: ~azure.ai.projects.models.UpdateToolResourcesOptions :keyword stream_parameter: If ``true``\\ , returns a stream of events that happen during the Run as server-sent events, terminating when the Run enters a terminal state with a ``data: [DONE]`` message. Default value is None. :paramtype stream_parameter: bool :keyword temperature: What sampling temperature to use, between 0 and 2. Higher values like 0.8 will make the output more random, while lower values like 0.2 will make it more focused and deterministic. Default value is None. :paramtype temperature: float :keyword top_p: An alternative to sampling with temperature, called nucleus sampling, where the model considers the results of the tokens with top_p probability mass. So 0.1 means only the tokens comprising the top 10% probability mass are considered. We generally recommend altering this or temperature but not both. Default value is None. :paramtype top_p: float :keyword max_prompt_tokens: The maximum number of prompt tokens that may be used over the course of the run. The run will make a best effort to use only the number of prompt tokens specified, across multiple turns of the run. If the run exceeds the number of prompt tokens specified, the run will end with status ``incomplete``. See ``incomplete_details`` for more info. Default value is None. :paramtype max_prompt_tokens: int :keyword max_completion_tokens: The maximum number of completion tokens that may be used over the course of the run. The run will make a best effort to use only the number of completion tokens specified, across multiple turns of the run. If the run exceeds the number of completion tokens specified, the run will end with status ``incomplete``. See ``incomplete_details`` for more info. Default value is None. :paramtype max_completion_tokens: int :keyword truncation_strategy: The strategy to use for dropping messages as the context windows moves forward. Default value is None. :paramtype truncation_strategy: ~azure.ai.projects.models.TruncationObject :keyword tool_choice: Controls whether or not and which tool is called by the model. Is one of the following types: str, Union[str, "_models.AgentsApiToolChoiceOptionMode"], AgentsNamedToolChoice Default value is None. :paramtype tool_choice: str or str or ~azure.ai.projects.models.AgentsApiToolChoiceOptionMode or ~azure.ai.projects.models.AgentsNamedToolChoice :keyword response_format: Specifies the format that the model must output. Is one of the following types: str, Union[str, "_models.AgentsApiResponseFormatMode"], AgentsApiResponseFormat, ResponseFormatJsonSchemaType Default value is None. :paramtype response_format: str or str or ~azure.ai.projects.models.AgentsApiResponseFormatMode or ~azure.ai.projects.models.AgentsApiResponseFormat or ~azure.ai.projects.models.ResponseFormatJsonSchemaType :keyword parallel_tool_calls: If ``true`` functions will run in parallel during tool use. Default value is None. :paramtype parallel_tool_calls: bool :keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used for storing additional information about that object in a structured format. Keys may be up to 64 characters in length and values may be up to 512 characters in length. Default value is None. :paramtype metadata: dict[str, str] :return: ThreadRun. The ThreadRun is compatible with MutableMapping :rtype: ~azure.ai.projects.models.ThreadRun :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = kwargs.pop("params", {}) or {} content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[_models.ThreadRun] = kwargs.pop("cls", None) if body is _Unset: if assistant_id is _Unset: raise TypeError("missing required argument: assistant_id") body = { "assistant_id": assistant_id, "instructions": instructions, "max_completion_tokens": max_completion_tokens, "max_prompt_tokens": max_prompt_tokens, "metadata": metadata, "model": model, "parallel_tool_calls": parallel_tool_calls, "response_format": response_format, "stream": stream_parameter, "temperature": temperature, "thread": thread, "tool_choice": tool_choice, "tool_resources": tool_resources, "tools": tools, "top_p": top_p, "truncation_strategy": truncation_strategy, } body = {k: v for k, v in body.items() if v is not None} content_type = content_type or "application/json" _content = None if isinstance(body, (IOBase, bytes)): _content = body else: _content = json.dumps(body, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore _request = build_agents_create_thread_and_run_request( content_type=content_type, api_version=self._config.api_version, content=_content, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.ThreadRun, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace_async async def get_run_step( self, thread_id: str, run_id: str, step_id: str, *, include: Optional[List[Union[str, _models.RunAdditionalFieldList]]] = None, **kwargs: Any ) -> _models.RunStep: """Gets a single run step from a thread run. :param thread_id: Identifier of the thread. Required. :type thread_id: str :param run_id: Identifier of the run. Required. :type run_id: str :param step_id: Identifier of the run step. Required. :type step_id: str :keyword include: A list of additional fields to include in the response. Currently the only supported value is ``step_details.tool_calls[*].file_search.results[*].content`` to fetch the file search result content. Default value is None. :paramtype include: list[str or ~azure.ai.projects.models.RunAdditionalFieldList] :return: RunStep. The RunStep is compatible with MutableMapping :rtype: ~azure.ai.projects.models.RunStep :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[_models.RunStep] = kwargs.pop("cls", None) _request = build_agents_get_run_step_request( thread_id=thread_id, run_id=run_id, step_id=step_id, include=include, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.RunStep, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace_async async def list_run_steps( self, thread_id: str, run_id: str, *, include: Optional[List[Union[str, _models.RunAdditionalFieldList]]] = None, limit: Optional[int] = None, order: Optional[Union[str, _models.ListSortOrder]] = None, after: Optional[str] = None, before: Optional[str] = None, **kwargs: Any ) -> _models.OpenAIPageableListOfRunStep: """Gets a list of run steps from a thread run. :param thread_id: Identifier of the thread. Required. :type thread_id: str :param run_id: Identifier of the run. Required. :type run_id: str :keyword include: A list of additional fields to include in the response. Currently the only supported value is ``step_details.tool_calls[*].file_search.results[*].content`` to fetch the file search result content. Default value is None. :paramtype include: list[str or ~azure.ai.projects.models.RunAdditionalFieldList] :keyword limit: A limit on the number of objects to be returned. Limit can range between 1 and 100, and the default is 20. Default value is None. :paramtype limit: int :keyword order: Sort order by the created_at timestamp of the objects. asc for ascending order and desc for descending order. Known values are: "asc" and "desc". Default value is None. :paramtype order: str or ~azure.ai.projects.models.ListSortOrder :keyword after: A cursor for use in pagination. after is an object ID that defines your place in the list. For instance, if you make a list request and receive 100 objects, ending with obj_foo, your subsequent call can include after=obj_foo in order to fetch the next page of the list. Default value is None. :paramtype after: str :keyword before: A cursor for use in pagination. before is an object ID that defines your place in the list. For instance, if you make a list request and receive 100 objects, ending with obj_foo, your subsequent call can include before=obj_foo in order to fetch the previous page of the list. Default value is None. :paramtype before: str :return: OpenAIPageableListOfRunStep. The OpenAIPageableListOfRunStep is compatible with MutableMapping :rtype: ~azure.ai.projects.models.OpenAIPageableListOfRunStep :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[_models.OpenAIPageableListOfRunStep] = kwargs.pop("cls", None) _request = build_agents_list_run_steps_request( thread_id=thread_id, run_id=run_id, include=include, limit=limit, order=order, after=after, before=before, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.OpenAIPageableListOfRunStep, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace_async async def list_files( self, *, purpose: Optional[Union[str, _models.FilePurpose]] = None, **kwargs: Any ) -> _models.FileListResponse: """Gets a list of previously uploaded files. :keyword purpose: The purpose of the file. Known values are: "fine-tune", "fine-tune-results", "assistants", "assistants_output", "batch", "batch_output", and "vision". Default value is None. :paramtype purpose: str or ~azure.ai.projects.models.FilePurpose :return: FileListResponse. The FileListResponse is compatible with MutableMapping :rtype: ~azure.ai.projects.models.FileListResponse :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[_models.FileListResponse] = kwargs.pop("cls", None) _request = build_agents_list_files_request( purpose=purpose, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.FileListResponse, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
@overload async def upload_file( self, *, file: FileType, purpose: Union[str, _models.FilePurpose], filename: Optional[str] = None, **kwargs: Any ) -> _models.OpenAIFile: """Uploads a file for use by other operations. :keyword file: The file data, in bytes. Required. :paramtype file: ~azure.ai.projects._vendor.FileType :keyword purpose: The intended purpose of the uploaded file. Use ``assistants`` for Agents and Message files, ``vision`` for Agents image file inputs, ``batch`` for Batch API, and ``fine-tune`` for Fine-tuning. Known values are: "fine-tune", "fine-tune-results", "assistants", "assistants_output", "batch", "batch_output", and "vision". Required. :paramtype purpose: str or ~azure.ai.projects.models.FilePurpose :keyword filename: The name of the file. Default value is None. :paramtype filename: str :return: OpenAIFile. The OpenAIFile is compatible with MutableMapping :rtype: ~azure.ai.projects.models.OpenAIFile :raises ~azure.core.exceptions.HttpResponseError: """ @overload async def upload_file(self, body: JSON, **kwargs: Any) -> _models.OpenAIFile: """Uploads a file for use by other operations. :param body: Required. :type body: JSON :return: OpenAIFile. The OpenAIFile is compatible with MutableMapping :rtype: ~azure.ai.projects.models.OpenAIFile :raises ~azure.core.exceptions.HttpResponseError: """ @distributed_trace_async async def upload_file( self, body: JSON = _Unset, *, file: FileType = _Unset, purpose: Union[str, _models.FilePurpose] = _Unset, filename: Optional[str] = None, **kwargs: Any ) -> _models.OpenAIFile: """Uploads a file for use by other operations. :param body: Is one of the following types: JSON Required. :type body: JSON :keyword file: The file data, in bytes. Required. :paramtype file: ~azure.ai.projects._vendor.FileType :keyword purpose: The intended purpose of the uploaded file. Use ``assistants`` for Agents and Message files, ``vision`` for Agents image file inputs, ``batch`` for Batch API, and ``fine-tune`` for Fine-tuning. Known values are: "fine-tune", "fine-tune-results", "assistants", "assistants_output", "batch", "batch_output", and "vision". Required. :paramtype purpose: str or ~azure.ai.projects.models.FilePurpose :keyword filename: The name of the file. Default value is None. :paramtype filename: str :return: OpenAIFile. The OpenAIFile is compatible with MutableMapping :rtype: ~azure.ai.projects.models.OpenAIFile :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[_models.OpenAIFile] = kwargs.pop("cls", None) if body is _Unset: if file is _Unset: raise TypeError("missing required argument: file") if purpose is _Unset: raise TypeError("missing required argument: purpose") body = {"file": file, "filename": filename, "purpose": purpose} body = {k: v for k, v in body.items() if v is not None} _body = body.as_dict() if isinstance(body, _model_base.Model) else body _file_fields: List[str] = ["file"] _data_fields: List[str] = ["purpose", "filename"] _files, _data = prepare_multipart_form_data(_body, _file_fields, _data_fields) _request = build_agents_upload_file_request( api_version=self._config.api_version, files=_files, data=_data, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.OpenAIFile, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace_async async def delete_file(self, file_id: str, **kwargs: Any) -> _models.FileDeletionStatus: """Delete a previously uploaded file. :param file_id: The ID of the file to delete. Required. :type file_id: str :return: FileDeletionStatus. The FileDeletionStatus is compatible with MutableMapping :rtype: ~azure.ai.projects.models.FileDeletionStatus :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[_models.FileDeletionStatus] = kwargs.pop("cls", None) _request = build_agents_delete_file_request( file_id=file_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.FileDeletionStatus, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace_async async def get_file(self, file_id: str, **kwargs: Any) -> _models.OpenAIFile: """Returns information about a specific file. Does not retrieve file content. :param file_id: The ID of the file to retrieve. Required. :type file_id: str :return: OpenAIFile. The OpenAIFile is compatible with MutableMapping :rtype: ~azure.ai.projects.models.OpenAIFile :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[_models.OpenAIFile] = kwargs.pop("cls", None) _request = build_agents_get_file_request( file_id=file_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.OpenAIFile, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
@distributed_trace_async async def _get_file_content(self, file_id: str, **kwargs: Any) -> bytes: """Retrieves the raw content of a specific file. :param file_id: The ID of the file to retrieve. Required. :type file_id: str :return: bytes :rtype: bytes :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[bytes] = kwargs.pop("cls", None) _request = build_agents_get_file_content_request( file_id=file_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(bytes, response.json(), format="base64") if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace_async async def list_vector_stores( self, *, limit: Optional[int] = None, order: Optional[Union[str, _models.ListSortOrder]] = None, after: Optional[str] = None, before: Optional[str] = None, **kwargs: Any ) -> _models.OpenAIPageableListOfVectorStore: """Returns a list of vector stores. :keyword limit: A limit on the number of objects to be returned. Limit can range between 1 and 100, and the default is 20. Default value is None. :paramtype limit: int :keyword order: Sort order by the created_at timestamp of the objects. asc for ascending order and desc for descending order. Known values are: "asc" and "desc". Default value is None. :paramtype order: str or ~azure.ai.projects.models.ListSortOrder :keyword after: A cursor for use in pagination. after is an object ID that defines your place in the list. For instance, if you make a list request and receive 100 objects, ending with obj_foo, your subsequent call can include after=obj_foo in order to fetch the next page of the list. Default value is None. :paramtype after: str :keyword before: A cursor for use in pagination. before is an object ID that defines your place in the list. For instance, if you make a list request and receive 100 objects, ending with obj_foo, your subsequent call can include before=obj_foo in order to fetch the previous page of the list. Default value is None. :paramtype before: str :return: OpenAIPageableListOfVectorStore. The OpenAIPageableListOfVectorStore is compatible with MutableMapping :rtype: ~azure.ai.projects.models.OpenAIPageableListOfVectorStore :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[_models.OpenAIPageableListOfVectorStore] = kwargs.pop("cls", None) _request = build_agents_list_vector_stores_request( limit=limit, order=order, after=after, before=before, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.OpenAIPageableListOfVectorStore, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
@overload async def create_vector_store( self, *, content_type: str = "application/json", file_ids: Optional[List[str]] = None, name: Optional[str] = None, store_configuration: Optional[_models.VectorStoreConfiguration] = None, expires_after: Optional[_models.VectorStoreExpirationPolicy] = None, chunking_strategy: Optional[_models.VectorStoreChunkingStrategyRequest] = None, metadata: Optional[Dict[str, str]] = None, **kwargs: Any ) -> _models.VectorStore: """Creates a vector store. :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :keyword file_ids: A list of file IDs that the vector store should use. Useful for tools like ``file_search`` that can access files. Default value is None. :paramtype file_ids: list[str] :keyword name: The name of the vector store. Default value is None. :paramtype name: str :keyword store_configuration: The vector store configuration, used when vector store is created from Azure asset URIs. Default value is None. :paramtype store_configuration: ~azure.ai.projects.models.VectorStoreConfiguration :keyword expires_after: Details on when this vector store expires. Default value is None. :paramtype expires_after: ~azure.ai.projects.models.VectorStoreExpirationPolicy :keyword chunking_strategy: The chunking strategy used to chunk the file(s). If not set, will use the auto strategy. Only applicable if file_ids is non-empty. Default value is None. :paramtype chunking_strategy: ~azure.ai.projects.models.VectorStoreChunkingStrategyRequest :keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used for storing additional information about that object in a structured format. Keys may be up to 64 characters in length and values may be up to 512 characters in length. Default value is None. :paramtype metadata: dict[str, str] :return: VectorStore. The VectorStore is compatible with MutableMapping :rtype: ~azure.ai.projects.models.VectorStore :raises ~azure.core.exceptions.HttpResponseError: """ @overload async def create_vector_store( self, body: JSON, *, content_type: str = "application/json", **kwargs: Any ) -> _models.VectorStore: """Creates a vector store. :param body: Required. :type body: JSON :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: VectorStore. The VectorStore is compatible with MutableMapping :rtype: ~azure.ai.projects.models.VectorStore :raises ~azure.core.exceptions.HttpResponseError: """ @overload async def create_vector_store( self, body: IO[bytes], *, content_type: str = "application/json", **kwargs: Any ) -> _models.VectorStore: """Creates a vector store. :param body: Required. :type body: IO[bytes] :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: VectorStore. The VectorStore is compatible with MutableMapping :rtype: ~azure.ai.projects.models.VectorStore :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace_async async def create_vector_store( self, body: Union[JSON, IO[bytes]] = _Unset, *, file_ids: Optional[List[str]] = None, name: Optional[str] = None, store_configuration: Optional[_models.VectorStoreConfiguration] = None, expires_after: Optional[_models.VectorStoreExpirationPolicy] = None, chunking_strategy: Optional[_models.VectorStoreChunkingStrategyRequest] = None, metadata: Optional[Dict[str, str]] = None, **kwargs: Any ) -> _models.VectorStore: """Creates a vector store. :param body: Is either a JSON type or a IO[bytes] type. Required. :type body: JSON or IO[bytes] :keyword file_ids: A list of file IDs that the vector store should use. Useful for tools like ``file_search`` that can access files. Default value is None. :paramtype file_ids: list[str] :keyword name: The name of the vector store. Default value is None. :paramtype name: str :keyword store_configuration: The vector store configuration, used when vector store is created from Azure asset URIs. Default value is None. :paramtype store_configuration: ~azure.ai.projects.models.VectorStoreConfiguration :keyword expires_after: Details on when this vector store expires. Default value is None. :paramtype expires_after: ~azure.ai.projects.models.VectorStoreExpirationPolicy :keyword chunking_strategy: The chunking strategy used to chunk the file(s). If not set, will use the auto strategy. Only applicable if file_ids is non-empty. Default value is None. :paramtype chunking_strategy: ~azure.ai.projects.models.VectorStoreChunkingStrategyRequest :keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used for storing additional information about that object in a structured format. Keys may be up to 64 characters in length and values may be up to 512 characters in length. Default value is None. :paramtype metadata: dict[str, str] :return: VectorStore. The VectorStore is compatible with MutableMapping :rtype: ~azure.ai.projects.models.VectorStore :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = kwargs.pop("params", {}) or {} content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[_models.VectorStore] = kwargs.pop("cls", None) if body is _Unset: body = { "chunking_strategy": chunking_strategy, "configuration": store_configuration, "expires_after": expires_after, "file_ids": file_ids, "metadata": metadata, "name": name, } body = {k: v for k, v in body.items() if v is not None} content_type = content_type or "application/json" _content = None if isinstance(body, (IOBase, bytes)): _content = body else: _content = json.dumps(body, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore _request = build_agents_create_vector_store_request( content_type=content_type, api_version=self._config.api_version, content=_content, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.VectorStore, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace_async async def get_vector_store(self, vector_store_id: str, **kwargs: Any) -> _models.VectorStore: """Returns the vector store object matching the specified ID. :param vector_store_id: Identifier of the vector store. Required. :type vector_store_id: str :return: VectorStore. The VectorStore is compatible with MutableMapping :rtype: ~azure.ai.projects.models.VectorStore :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[_models.VectorStore] = kwargs.pop("cls", None) _request = build_agents_get_vector_store_request( vector_store_id=vector_store_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.VectorStore, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
@overload async def modify_vector_store( self, vector_store_id: str, *, content_type: str = "application/json", name: Optional[str] = None, expires_after: Optional[_models.VectorStoreExpirationPolicy] = None, metadata: Optional[Dict[str, str]] = None, **kwargs: Any ) -> _models.VectorStore: """The ID of the vector store to modify. :param vector_store_id: Identifier of the vector store. Required. :type vector_store_id: str :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :keyword name: The name of the vector store. Default value is None. :paramtype name: str :keyword expires_after: Details on when this vector store expires. Default value is None. :paramtype expires_after: ~azure.ai.projects.models.VectorStoreExpirationPolicy :keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used for storing additional information about that object in a structured format. Keys may be up to 64 characters in length and values may be up to 512 characters in length. Default value is None. :paramtype metadata: dict[str, str] :return: VectorStore. The VectorStore is compatible with MutableMapping :rtype: ~azure.ai.projects.models.VectorStore :raises ~azure.core.exceptions.HttpResponseError: """ @overload async def modify_vector_store( self, vector_store_id: str, body: JSON, *, content_type: str = "application/json", **kwargs: Any ) -> _models.VectorStore: """The ID of the vector store to modify. :param vector_store_id: Identifier of the vector store. Required. :type vector_store_id: str :param body: Required. :type body: JSON :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: VectorStore. The VectorStore is compatible with MutableMapping :rtype: ~azure.ai.projects.models.VectorStore :raises ~azure.core.exceptions.HttpResponseError: """ @overload async def modify_vector_store( self, vector_store_id: str, body: IO[bytes], *, content_type: str = "application/json", **kwargs: Any ) -> _models.VectorStore: """The ID of the vector store to modify. :param vector_store_id: Identifier of the vector store. Required. :type vector_store_id: str :param body: Required. :type body: IO[bytes] :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: VectorStore. The VectorStore is compatible with MutableMapping :rtype: ~azure.ai.projects.models.VectorStore :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace_async async def modify_vector_store( self, vector_store_id: str, body: Union[JSON, IO[bytes]] = _Unset, *, name: Optional[str] = None, expires_after: Optional[_models.VectorStoreExpirationPolicy] = None, metadata: Optional[Dict[str, str]] = None, **kwargs: Any ) -> _models.VectorStore: """The ID of the vector store to modify. :param vector_store_id: Identifier of the vector store. Required. :type vector_store_id: str :param body: Is either a JSON type or a IO[bytes] type. Required. :type body: JSON or IO[bytes] :keyword name: The name of the vector store. Default value is None. :paramtype name: str :keyword expires_after: Details on when this vector store expires. Default value is None. :paramtype expires_after: ~azure.ai.projects.models.VectorStoreExpirationPolicy :keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used for storing additional information about that object in a structured format. Keys may be up to 64 characters in length and values may be up to 512 characters in length. Default value is None. :paramtype metadata: dict[str, str] :return: VectorStore. The VectorStore is compatible with MutableMapping :rtype: ~azure.ai.projects.models.VectorStore :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = kwargs.pop("params", {}) or {} content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[_models.VectorStore] = kwargs.pop("cls", None) if body is _Unset: body = {"expires_after": expires_after, "metadata": metadata, "name": name} body = {k: v for k, v in body.items() if v is not None} content_type = content_type or "application/json" _content = None if isinstance(body, (IOBase, bytes)): _content = body else: _content = json.dumps(body, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore _request = build_agents_modify_vector_store_request( vector_store_id=vector_store_id, content_type=content_type, api_version=self._config.api_version, content=_content, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.VectorStore, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace_async async def delete_vector_store(self, vector_store_id: str, **kwargs: Any) -> _models.VectorStoreDeletionStatus: """Deletes the vector store object matching the specified ID. :param vector_store_id: Identifier of the vector store. Required. :type vector_store_id: str :return: VectorStoreDeletionStatus. The VectorStoreDeletionStatus is compatible with MutableMapping :rtype: ~azure.ai.projects.models.VectorStoreDeletionStatus :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[_models.VectorStoreDeletionStatus] = kwargs.pop("cls", None) _request = build_agents_delete_vector_store_request( vector_store_id=vector_store_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.VectorStoreDeletionStatus, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace_async async def list_vector_store_files( self, vector_store_id: str, *, filter: Optional[Union[str, _models.VectorStoreFileStatusFilter]] = None, limit: Optional[int] = None, order: Optional[Union[str, _models.ListSortOrder]] = None, after: Optional[str] = None, before: Optional[str] = None, **kwargs: Any ) -> _models.OpenAIPageableListOfVectorStoreFile: """Returns a list of vector store files. :param vector_store_id: Identifier of the vector store. Required. :type vector_store_id: str :keyword filter: Filter by file status. Known values are: "in_progress", "completed", "failed", and "cancelled". Default value is None. :paramtype filter: str or ~azure.ai.projects.models.VectorStoreFileStatusFilter :keyword limit: A limit on the number of objects to be returned. Limit can range between 1 and 100, and the default is 20. Default value is None. :paramtype limit: int :keyword order: Sort order by the created_at timestamp of the objects. asc for ascending order and desc for descending order. Known values are: "asc" and "desc". Default value is None. :paramtype order: str or ~azure.ai.projects.models.ListSortOrder :keyword after: A cursor for use in pagination. after is an object ID that defines your place in the list. For instance, if you make a list request and receive 100 objects, ending with obj_foo, your subsequent call can include after=obj_foo in order to fetch the next page of the list. Default value is None. :paramtype after: str :keyword before: A cursor for use in pagination. before is an object ID that defines your place in the list. For instance, if you make a list request and receive 100 objects, ending with obj_foo, your subsequent call can include before=obj_foo in order to fetch the previous page of the list. Default value is None. :paramtype before: str :return: OpenAIPageableListOfVectorStoreFile. The OpenAIPageableListOfVectorStoreFile is compatible with MutableMapping :rtype: ~azure.ai.projects.models.OpenAIPageableListOfVectorStoreFile :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[_models.OpenAIPageableListOfVectorStoreFile] = kwargs.pop("cls", None) _request = build_agents_list_vector_store_files_request( vector_store_id=vector_store_id, filter=filter, limit=limit, order=order, after=after, before=before, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.OpenAIPageableListOfVectorStoreFile, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
@overload async def create_vector_store_file( self, vector_store_id: str, *, content_type: str = "application/json", file_id: Optional[str] = None, data_source: Optional[_models.VectorStoreDataSource] = None, chunking_strategy: Optional[_models.VectorStoreChunkingStrategyRequest] = None, **kwargs: Any ) -> _models.VectorStoreFile: """Create a vector store file by attaching a file to a vector store. :param vector_store_id: Identifier of the vector store. Required. :type vector_store_id: str :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :keyword file_id: Identifier of the file. Default value is None. :paramtype file_id: str :keyword data_source: Azure asset ID. Default value is None. :paramtype data_source: ~azure.ai.projects.models.VectorStoreDataSource :keyword chunking_strategy: The chunking strategy used to chunk the file(s). If not set, will use the auto strategy. Default value is None. :paramtype chunking_strategy: ~azure.ai.projects.models.VectorStoreChunkingStrategyRequest :return: VectorStoreFile. The VectorStoreFile is compatible with MutableMapping :rtype: ~azure.ai.projects.models.VectorStoreFile :raises ~azure.core.exceptions.HttpResponseError: """ @overload async def create_vector_store_file( self, vector_store_id: str, body: JSON, *, content_type: str = "application/json", **kwargs: Any ) -> _models.VectorStoreFile: """Create a vector store file by attaching a file to a vector store. :param vector_store_id: Identifier of the vector store. Required. :type vector_store_id: str :param body: Required. :type body: JSON :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: VectorStoreFile. The VectorStoreFile is compatible with MutableMapping :rtype: ~azure.ai.projects.models.VectorStoreFile :raises ~azure.core.exceptions.HttpResponseError: """ @overload async def create_vector_store_file( self, vector_store_id: str, body: IO[bytes], *, content_type: str = "application/json", **kwargs: Any ) -> _models.VectorStoreFile: """Create a vector store file by attaching a file to a vector store. :param vector_store_id: Identifier of the vector store. Required. :type vector_store_id: str :param body: Required. :type body: IO[bytes] :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: VectorStoreFile. The VectorStoreFile is compatible with MutableMapping :rtype: ~azure.ai.projects.models.VectorStoreFile :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace_async async def create_vector_store_file( self, vector_store_id: str, body: Union[JSON, IO[bytes]] = _Unset, *, file_id: Optional[str] = None, data_source: Optional[_models.VectorStoreDataSource] = None, chunking_strategy: Optional[_models.VectorStoreChunkingStrategyRequest] = None, **kwargs: Any ) -> _models.VectorStoreFile: """Create a vector store file by attaching a file to a vector store. :param vector_store_id: Identifier of the vector store. Required. :type vector_store_id: str :param body: Is either a JSON type or a IO[bytes] type. Required. :type body: JSON or IO[bytes] :keyword file_id: Identifier of the file. Default value is None. :paramtype file_id: str :keyword data_source: Azure asset ID. Default value is None. :paramtype data_source: ~azure.ai.projects.models.VectorStoreDataSource :keyword chunking_strategy: The chunking strategy used to chunk the file(s). If not set, will use the auto strategy. Default value is None. :paramtype chunking_strategy: ~azure.ai.projects.models.VectorStoreChunkingStrategyRequest :return: VectorStoreFile. The VectorStoreFile is compatible with MutableMapping :rtype: ~azure.ai.projects.models.VectorStoreFile :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = kwargs.pop("params", {}) or {} content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[_models.VectorStoreFile] = kwargs.pop("cls", None) if body is _Unset: body = {"chunking_strategy": chunking_strategy, "data_source": data_source, "file_id": file_id} body = {k: v for k, v in body.items() if v is not None} content_type = content_type or "application/json" _content = None if isinstance(body, (IOBase, bytes)): _content = body else: _content = json.dumps(body, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore _request = build_agents_create_vector_store_file_request( vector_store_id=vector_store_id, content_type=content_type, api_version=self._config.api_version, content=_content, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.VectorStoreFile, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace_async async def get_vector_store_file(self, vector_store_id: str, file_id: str, **kwargs: Any) -> _models.VectorStoreFile: """Retrieves a vector store file. :param vector_store_id: Identifier of the vector store. Required. :type vector_store_id: str :param file_id: Identifier of the file. Required. :type file_id: str :return: VectorStoreFile. The VectorStoreFile is compatible with MutableMapping :rtype: ~azure.ai.projects.models.VectorStoreFile :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[_models.VectorStoreFile] = kwargs.pop("cls", None) _request = build_agents_get_vector_store_file_request( vector_store_id=vector_store_id, file_id=file_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.VectorStoreFile, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace_async async def delete_vector_store_file( self, vector_store_id: str, file_id: str, **kwargs: Any ) -> _models.VectorStoreFileDeletionStatus: """Delete a vector store file. This will remove the file from the vector store but the file itself will not be deleted. To delete the file, use the delete file endpoint. :param vector_store_id: Identifier of the vector store. Required. :type vector_store_id: str :param file_id: Identifier of the file. Required. :type file_id: str :return: VectorStoreFileDeletionStatus. The VectorStoreFileDeletionStatus is compatible with MutableMapping :rtype: ~azure.ai.projects.models.VectorStoreFileDeletionStatus :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[_models.VectorStoreFileDeletionStatus] = kwargs.pop("cls", None) _request = build_agents_delete_vector_store_file_request( vector_store_id=vector_store_id, file_id=file_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.VectorStoreFileDeletionStatus, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
@overload async def create_vector_store_file_batch( self, vector_store_id: str, *, content_type: str = "application/json", file_ids: Optional[List[str]] = None, data_sources: Optional[List[_models.VectorStoreDataSource]] = None, chunking_strategy: Optional[_models.VectorStoreChunkingStrategyRequest] = None, **kwargs: Any ) -> _models.VectorStoreFileBatch: """Create a vector store file batch. :param vector_store_id: Identifier of the vector store. Required. :type vector_store_id: str :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :keyword file_ids: List of file identifiers. Default value is None. :paramtype file_ids: list[str] :keyword data_sources: List of Azure assets. Default value is None. :paramtype data_sources: list[~azure.ai.projects.models.VectorStoreDataSource] :keyword chunking_strategy: The chunking strategy used to chunk the file(s). If not set, will use the auto strategy. Default value is None. :paramtype chunking_strategy: ~azure.ai.projects.models.VectorStoreChunkingStrategyRequest :return: VectorStoreFileBatch. The VectorStoreFileBatch is compatible with MutableMapping :rtype: ~azure.ai.projects.models.VectorStoreFileBatch :raises ~azure.core.exceptions.HttpResponseError: """ @overload async def create_vector_store_file_batch( self, vector_store_id: str, body: JSON, *, content_type: str = "application/json", **kwargs: Any ) -> _models.VectorStoreFileBatch: """Create a vector store file batch. :param vector_store_id: Identifier of the vector store. Required. :type vector_store_id: str :param body: Required. :type body: JSON :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: VectorStoreFileBatch. The VectorStoreFileBatch is compatible with MutableMapping :rtype: ~azure.ai.projects.models.VectorStoreFileBatch :raises ~azure.core.exceptions.HttpResponseError: """ @overload async def create_vector_store_file_batch( self, vector_store_id: str, body: IO[bytes], *, content_type: str = "application/json", **kwargs: Any ) -> _models.VectorStoreFileBatch: """Create a vector store file batch. :param vector_store_id: Identifier of the vector store. Required. :type vector_store_id: str :param body: Required. :type body: IO[bytes] :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: VectorStoreFileBatch. The VectorStoreFileBatch is compatible with MutableMapping :rtype: ~azure.ai.projects.models.VectorStoreFileBatch :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace_async async def create_vector_store_file_batch( self, vector_store_id: str, body: Union[JSON, IO[bytes]] = _Unset, *, file_ids: Optional[List[str]] = None, data_sources: Optional[List[_models.VectorStoreDataSource]] = None, chunking_strategy: Optional[_models.VectorStoreChunkingStrategyRequest] = None, **kwargs: Any ) -> _models.VectorStoreFileBatch: """Create a vector store file batch. :param vector_store_id: Identifier of the vector store. Required. :type vector_store_id: str :param body: Is either a JSON type or a IO[bytes] type. Required. :type body: JSON or IO[bytes] :keyword file_ids: List of file identifiers. Default value is None. :paramtype file_ids: list[str] :keyword data_sources: List of Azure assets. Default value is None. :paramtype data_sources: list[~azure.ai.projects.models.VectorStoreDataSource] :keyword chunking_strategy: The chunking strategy used to chunk the file(s). If not set, will use the auto strategy. Default value is None. :paramtype chunking_strategy: ~azure.ai.projects.models.VectorStoreChunkingStrategyRequest :return: VectorStoreFileBatch. The VectorStoreFileBatch is compatible with MutableMapping :rtype: ~azure.ai.projects.models.VectorStoreFileBatch :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = kwargs.pop("params", {}) or {} content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[_models.VectorStoreFileBatch] = kwargs.pop("cls", None) if body is _Unset: body = {"chunking_strategy": chunking_strategy, "data_sources": data_sources, "file_ids": file_ids} body = {k: v for k, v in body.items() if v is not None} content_type = content_type or "application/json" _content = None if isinstance(body, (IOBase, bytes)): _content = body else: _content = json.dumps(body, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore _request = build_agents_create_vector_store_file_batch_request( vector_store_id=vector_store_id, content_type=content_type, api_version=self._config.api_version, content=_content, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.VectorStoreFileBatch, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace_async async def get_vector_store_file_batch( self, vector_store_id: str, batch_id: str, **kwargs: Any ) -> _models.VectorStoreFileBatch: """Retrieve a vector store file batch. :param vector_store_id: Identifier of the vector store. Required. :type vector_store_id: str :param batch_id: Identifier of the file batch. Required. :type batch_id: str :return: VectorStoreFileBatch. The VectorStoreFileBatch is compatible with MutableMapping :rtype: ~azure.ai.projects.models.VectorStoreFileBatch :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[_models.VectorStoreFileBatch] = kwargs.pop("cls", None) _request = build_agents_get_vector_store_file_batch_request( vector_store_id=vector_store_id, batch_id=batch_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.VectorStoreFileBatch, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace_async async def cancel_vector_store_file_batch( self, vector_store_id: str, batch_id: str, **kwargs: Any ) -> _models.VectorStoreFileBatch: """Cancel a vector store file batch. This attempts to cancel the processing of files in this batch as soon as possible. :param vector_store_id: Identifier of the vector store. Required. :type vector_store_id: str :param batch_id: Identifier of the file batch. Required. :type batch_id: str :return: VectorStoreFileBatch. The VectorStoreFileBatch is compatible with MutableMapping :rtype: ~azure.ai.projects.models.VectorStoreFileBatch :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[_models.VectorStoreFileBatch] = kwargs.pop("cls", None) _request = build_agents_cancel_vector_store_file_batch_request( vector_store_id=vector_store_id, batch_id=batch_id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.VectorStoreFileBatch, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace_async async def list_vector_store_file_batch_files( self, vector_store_id: str, batch_id: str, *, filter: Optional[Union[str, _models.VectorStoreFileStatusFilter]] = None, limit: Optional[int] = None, order: Optional[Union[str, _models.ListSortOrder]] = None, after: Optional[str] = None, before: Optional[str] = None, **kwargs: Any ) -> _models.OpenAIPageableListOfVectorStoreFile: """Returns a list of vector store files in a batch. :param vector_store_id: Identifier of the vector store. Required. :type vector_store_id: str :param batch_id: Identifier of the file batch. Required. :type batch_id: str :keyword filter: Filter by file status. Known values are: "in_progress", "completed", "failed", and "cancelled". Default value is None. :paramtype filter: str or ~azure.ai.projects.models.VectorStoreFileStatusFilter :keyword limit: A limit on the number of objects to be returned. Limit can range between 1 and 100, and the default is 20. Default value is None. :paramtype limit: int :keyword order: Sort order by the created_at timestamp of the objects. asc for ascending order and desc for descending order. Known values are: "asc" and "desc". Default value is None. :paramtype order: str or ~azure.ai.projects.models.ListSortOrder :keyword after: A cursor for use in pagination. after is an object ID that defines your place in the list. For instance, if you make a list request and receive 100 objects, ending with obj_foo, your subsequent call can include after=obj_foo in order to fetch the next page of the list. Default value is None. :paramtype after: str :keyword before: A cursor for use in pagination. before is an object ID that defines your place in the list. For instance, if you make a list request and receive 100 objects, ending with obj_foo, your subsequent call can include before=obj_foo in order to fetch the previous page of the list. Default value is None. :paramtype before: str :return: OpenAIPageableListOfVectorStoreFile. The OpenAIPageableListOfVectorStoreFile is compatible with MutableMapping :rtype: ~azure.ai.projects.models.OpenAIPageableListOfVectorStoreFile :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[_models.OpenAIPageableListOfVectorStoreFile] = kwargs.pop("cls", None) _request = build_agents_list_vector_store_file_batch_files_request( vector_store_id=vector_store_id, batch_id=batch_id, filter=filter, limit=limit, order=order, after=after, before=before, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.OpenAIPageableListOfVectorStoreFile, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
class ConnectionsOperations: """ .. warning:: **DO NOT** instantiate this class directly. Instead, you should access the following operations through :class:`~azure.ai.projects.aio.AIProjectClient`'s :attr:`connections` attribute. """ def __init__(self, *args, **kwargs) -> None: input_args = list(args) self._client: AsyncPipelineClient = input_args.pop(0) if input_args else kwargs.pop("client") self._config: AIProjectClientConfiguration = input_args.pop(0) if input_args else kwargs.pop("config") self._serialize: Serializer = input_args.pop(0) if input_args else kwargs.pop("serializer") self._deserialize: Deserializer = input_args.pop(0) if input_args else kwargs.pop("deserializer") @distributed_trace_async async def _get_workspace(self, **kwargs: Any) -> _models._models.GetWorkspaceResponse: """Gets the properties of the specified machine learning workspace. :return: GetWorkspaceResponse. The GetWorkspaceResponse is compatible with MutableMapping :rtype: ~azure.ai.projects.models._models.GetWorkspaceResponse :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[_models._models.GetWorkspaceResponse] = kwargs.pop("cls", None) _request = build_connections_get_workspace_request( api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize( _models._models.GetWorkspaceResponse, response.json() # pylint: disable=protected-access ) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore @distributed_trace_async async def _list_connections( self, *, category: Optional[Union[str, _models.ConnectionType]] = None, include_all: Optional[bool] = None, target: Optional[str] = None, **kwargs: Any ) -> _models._models.ListConnectionsResponse: """List the details of all the connections (not including their credentials). :keyword category: Category of the workspace connection. Known values are: "AzureOpenAI", "Serverless", "AzureBlob", "AIServices", and "CognitiveSearch". Default value is None. :paramtype category: str or ~azure.ai.projects.models.ConnectionType :keyword include_all: Indicates whether to list datastores. Service default: do not list datastores. Default value is None. :paramtype include_all: bool :keyword target: Target of the workspace connection. Default value is None. :paramtype target: str :return: ListConnectionsResponse. The ListConnectionsResponse is compatible with MutableMapping :rtype: ~azure.ai.projects.models._models.ListConnectionsResponse :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[_models._models.ListConnectionsResponse] = kwargs.pop("cls", None) _request = build_connections_list_connections_request( category=category, include_all=include_all, target=target, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize( _models._models.ListConnectionsResponse, response.json() # pylint: disable=protected-access ) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore @distributed_trace_async async def _get_connection(self, connection_name: str, **kwargs: Any) -> _models._models.GetConnectionResponse: """Get the details of a single connection, without credentials. :param connection_name: Connection Name. Required. :type connection_name: str :return: GetConnectionResponse. The GetConnectionResponse is compatible with MutableMapping :rtype: ~azure.ai.projects.models._models.GetConnectionResponse :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[_models._models.GetConnectionResponse] = kwargs.pop("cls", None) _request = build_connections_get_connection_request( connection_name=connection_name, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize( _models._models.GetConnectionResponse, response.json() # pylint: disable=protected-access ) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore @overload async def _get_connection_with_secrets( self, connection_name: str, *, ignored: str, content_type: str = "application/json", **kwargs: Any ) -> _models._models.GetConnectionResponse: ... @overload async def _get_connection_with_secrets( self, connection_name: str, body: JSON, *, content_type: str = "application/json", **kwargs: Any ) -> _models._models.GetConnectionResponse: ... @overload async def _get_connection_with_secrets( self, connection_name: str, body: IO[bytes], *, content_type: str = "application/json", **kwargs: Any ) -> _models._models.GetConnectionResponse: ... @distributed_trace_async async def _get_connection_with_secrets( self, connection_name: str, body: Union[JSON, IO[bytes]] = _Unset, *, ignored: str = _Unset, **kwargs: Any ) -> _models._models.GetConnectionResponse: """Get the details of a single connection, including credentials (if available). :param connection_name: Connection Name. Required. :type connection_name: str :param body: Is either a JSON type or a IO[bytes] type. Required. :type body: JSON or IO[bytes] :keyword ignored: The body is ignored. TODO: Can we remove this?. Required. :paramtype ignored: str :return: GetConnectionResponse. The GetConnectionResponse is compatible with MutableMapping :rtype: ~azure.ai.projects.models._models.GetConnectionResponse :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = kwargs.pop("params", {}) or {} content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[_models._models.GetConnectionResponse] = kwargs.pop("cls", None) if body is _Unset: if ignored is _Unset: raise TypeError("missing required argument: ignored") body = {"ignored": ignored} body = {k: v for k, v in body.items() if v is not None} content_type = content_type or "application/json" _content = None if isinstance(body, (IOBase, bytes)): _content = body else: _content = json.dumps(body, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore _request = build_connections_get_connection_with_secrets_request( connection_name=connection_name, content_type=content_type, api_version=self._config.api_version, content=_content, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize( _models._models.GetConnectionResponse, response.json() # pylint: disable=protected-access ) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore class TelemetryOperations: """ .. warning:: **DO NOT** instantiate this class directly. Instead, you should access the following operations through :class:`~azure.ai.projects.aio.AIProjectClient`'s :attr:`telemetry` attribute. """ def __init__(self, *args, **kwargs) -> None: input_args = list(args) self._client: AsyncPipelineClient = input_args.pop(0) if input_args else kwargs.pop("client") self._config: AIProjectClientConfiguration = input_args.pop(0) if input_args else kwargs.pop("config") self._serialize: Serializer = input_args.pop(0) if input_args else kwargs.pop("serializer") self._deserialize: Deserializer = input_args.pop(0) if input_args else kwargs.pop("deserializer") @distributed_trace_async async def _get_app_insights( self, app_insights_resource_url: str, **kwargs: Any ) -> _models._models.GetAppInsightsResponse: # pylint: disable=line-too-long """Gets the properties of the specified Application Insights resource. :param app_insights_resource_url: The AppInsights Azure resource Url. It should have the format: '/subscriptions/{subscription_id}/resourceGroups/{resource_group_name}/providers/microsoft.insights/components/{resourcename}'. Required. :type app_insights_resource_url: str :return: GetAppInsightsResponse. The GetAppInsightsResponse is compatible with MutableMapping :rtype: ~azure.ai.projects.models._models.GetAppInsightsResponse :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[_models._models.GetAppInsightsResponse] = kwargs.pop("cls", None) _request = build_telemetry_get_app_insights_request( app_insights_resource_url=app_insights_resource_url, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize( _models._models.GetAppInsightsResponse, response.json() # pylint: disable=protected-access ) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
[docs] class EvaluationsOperations: """ .. warning:: **DO NOT** instantiate this class directly. Instead, you should access the following operations through :class:`~azure.ai.projects.aio.AIProjectClient`'s :attr:`evaluations` attribute. """ def __init__(self, *args, **kwargs) -> None: input_args = list(args) self._client: AsyncPipelineClient = input_args.pop(0) if input_args else kwargs.pop("client") self._config: AIProjectClientConfiguration = input_args.pop(0) if input_args else kwargs.pop("config") self._serialize: Serializer = input_args.pop(0) if input_args else kwargs.pop("serializer") self._deserialize: Deserializer = input_args.pop(0) if input_args else kwargs.pop("deserializer")
[docs] @distributed_trace_async async def get(self, id: str, **kwargs: Any) -> _models.Evaluation: """Resource read operation template. :param id: Identifier of the evaluation. Required. :type id: str :return: Evaluation. The Evaluation is compatible with MutableMapping :rtype: ~azure.ai.projects.models.Evaluation :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[_models.Evaluation] = kwargs.pop("cls", None) _request = build_evaluations_get_request( id=id, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) response_headers = {} response_headers["x-ms-client-request-id"] = self._deserialize( "str", response.headers.get("x-ms-client-request-id") ) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.Evaluation, response.json()) if cls: return cls(pipeline_response, deserialized, response_headers) # type: ignore return deserialized # type: ignore
@overload async def create( self, evaluation: _models.Evaluation, *, content_type: str = "application/json", **kwargs: Any ) -> _models.Evaluation: """Run the evaluation. :param evaluation: Evaluation to run. Required. :type evaluation: ~azure.ai.projects.models.Evaluation :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: Evaluation. The Evaluation is compatible with MutableMapping :rtype: ~azure.ai.projects.models.Evaluation :raises ~azure.core.exceptions.HttpResponseError: """ @overload async def create( self, evaluation: JSON, *, content_type: str = "application/json", **kwargs: Any ) -> _models.Evaluation: """Run the evaluation. :param evaluation: Evaluation to run. Required. :type evaluation: JSON :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: Evaluation. The Evaluation is compatible with MutableMapping :rtype: ~azure.ai.projects.models.Evaluation :raises ~azure.core.exceptions.HttpResponseError: """ @overload async def create( self, evaluation: IO[bytes], *, content_type: str = "application/json", **kwargs: Any ) -> _models.Evaluation: """Run the evaluation. :param evaluation: Evaluation to run. Required. :type evaluation: IO[bytes] :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: Evaluation. The Evaluation is compatible with MutableMapping :rtype: ~azure.ai.projects.models.Evaluation :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace_async async def create(self, evaluation: Union[_models.Evaluation, JSON, IO[bytes]], **kwargs: Any) -> _models.Evaluation: """Run the evaluation. :param evaluation: Evaluation to run. Is one of the following types: Evaluation, JSON, IO[bytes] Required. :type evaluation: ~azure.ai.projects.models.Evaluation or JSON or IO[bytes] :return: Evaluation. The Evaluation is compatible with MutableMapping :rtype: ~azure.ai.projects.models.Evaluation :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = kwargs.pop("params", {}) or {} content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[_models.Evaluation] = kwargs.pop("cls", None) content_type = content_type or "application/json" _content = None if isinstance(evaluation, (IOBase, bytes)): _content = evaluation else: _content = json.dumps(evaluation, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore _request = build_evaluations_create_request( content_type=content_type, api_version=self._config.api_version, content=_content, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [201]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.Evaluation, response.json()) if cls: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace def list( self, *, top: Optional[int] = None, skip: Optional[int] = None, **kwargs: Any ) -> AsyncIterable["_models.Evaluation"]: """Resource list operation template. :keyword top: The number of result items to return. Default value is None. :paramtype top: int :keyword skip: The number of result items to skip. Default value is None. :paramtype skip: int :return: An iterator like instance of Evaluation :rtype: ~azure.core.async_paging.AsyncItemPaged[~azure.ai.projects.models.Evaluation] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} maxpagesize = kwargs.pop("maxpagesize", None) cls: ClsType[List[_models.Evaluation]] = kwargs.pop("cls", None) error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) def prepare_request(next_link=None): if not next_link: _request = build_evaluations_list_request( top=top, skip=skip, maxpagesize=maxpagesize, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url( "self._config.subscription_id", self._config.subscription_id, "str" ), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) else: # make call to next link with the client's api-version _parsed_next_link = urllib.parse.urlparse(next_link) _next_request_params = case_insensitive_dict( { key: [urllib.parse.quote(v) for v in value] for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items() } ) _next_request_params["api-version"] = self._config.api_version _request = HttpRequest( "GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url( "self._config.subscription_id", self._config.subscription_id, "str" ), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) return _request async def extract_data(pipeline_response): deserialized = pipeline_response.http_response.json() list_of_elem = _deserialize(List[_models.Evaluation], deserialized["value"]) if cls: list_of_elem = cls(list_of_elem) # type: ignore return deserialized.get("nextLink") or None, AsyncList(list_of_elem) async def get_next(next_link=None): _request = prepare_request(next_link) _stream = False pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return AsyncItemPaged(get_next, extract_data)
@overload async def update( self, id: str, resource: _models.Evaluation, *, content_type: str = "application/merge-patch+json", **kwargs: Any ) -> _models.Evaluation: """Resource update operation template. :param id: Identifier of the evaluation. Required. :type id: str :param resource: The resource instance. Required. :type resource: ~azure.ai.projects.models.Evaluation :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/merge-patch+json". :paramtype content_type: str :return: Evaluation. The Evaluation is compatible with MutableMapping :rtype: ~azure.ai.projects.models.Evaluation :raises ~azure.core.exceptions.HttpResponseError: """ @overload async def update( self, id: str, resource: JSON, *, content_type: str = "application/merge-patch+json", **kwargs: Any ) -> _models.Evaluation: """Resource update operation template. :param id: Identifier of the evaluation. Required. :type id: str :param resource: The resource instance. Required. :type resource: JSON :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/merge-patch+json". :paramtype content_type: str :return: Evaluation. The Evaluation is compatible with MutableMapping :rtype: ~azure.ai.projects.models.Evaluation :raises ~azure.core.exceptions.HttpResponseError: """ @overload async def update( self, id: str, resource: IO[bytes], *, content_type: str = "application/merge-patch+json", **kwargs: Any ) -> _models.Evaluation: """Resource update operation template. :param id: Identifier of the evaluation. Required. :type id: str :param resource: The resource instance. Required. :type resource: IO[bytes] :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/merge-patch+json". :paramtype content_type: str :return: Evaluation. The Evaluation is compatible with MutableMapping :rtype: ~azure.ai.projects.models.Evaluation :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace_async async def update( self, id: str, resource: Union[_models.Evaluation, JSON, IO[bytes]], **kwargs: Any ) -> _models.Evaluation: """Resource update operation template. :param id: Identifier of the evaluation. Required. :type id: str :param resource: The resource instance. Is one of the following types: Evaluation, JSON, IO[bytes] Required. :type resource: ~azure.ai.projects.models.Evaluation or JSON or IO[bytes] :return: Evaluation. The Evaluation is compatible with MutableMapping :rtype: ~azure.ai.projects.models.Evaluation :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = kwargs.pop("params", {}) or {} content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[_models.Evaluation] = kwargs.pop("cls", None) content_type = content_type or "application/merge-patch+json" _content = None if isinstance(resource, (IOBase, bytes)): _content = resource else: _content = json.dumps(resource, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore _request = build_evaluations_update_request( id=id, content_type=content_type, api_version=self._config.api_version, content=_content, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) response_headers = {} response_headers["x-ms-client-request-id"] = self._deserialize( "str", response.headers.get("x-ms-client-request-id") ) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.Evaluation, response.json()) if cls: return cls(pipeline_response, deserialized, response_headers) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace_async async def get_schedule(self, name: str, **kwargs: Any) -> _models.EvaluationSchedule: """Resource read operation template. :param name: Name of the schedule, which also serves as the unique identifier for the evaluation. Required. :type name: str :return: EvaluationSchedule. The EvaluationSchedule is compatible with MutableMapping :rtype: ~azure.ai.projects.models.EvaluationSchedule :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[_models.EvaluationSchedule] = kwargs.pop("cls", None) _request = build_evaluations_get_schedule_request( name=name, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) response_headers = {} response_headers["x-ms-client-request-id"] = self._deserialize( "str", response.headers.get("x-ms-client-request-id") ) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.EvaluationSchedule, response.json()) if cls: return cls(pipeline_response, deserialized, response_headers) # type: ignore return deserialized # type: ignore
@overload async def create_or_replace_schedule( self, name: str, resource: _models.EvaluationSchedule, *, content_type: str = "application/json", **kwargs: Any ) -> _models.EvaluationSchedule: """Create or replace operation template. :param name: Name of the schedule, which also serves as the unique identifier for the evaluation. Required. :type name: str :param resource: The resource instance. Required. :type resource: ~azure.ai.projects.models.EvaluationSchedule :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: EvaluationSchedule. The EvaluationSchedule is compatible with MutableMapping :rtype: ~azure.ai.projects.models.EvaluationSchedule :raises ~azure.core.exceptions.HttpResponseError: """ @overload async def create_or_replace_schedule( self, name: str, resource: JSON, *, content_type: str = "application/json", **kwargs: Any ) -> _models.EvaluationSchedule: """Create or replace operation template. :param name: Name of the schedule, which also serves as the unique identifier for the evaluation. Required. :type name: str :param resource: The resource instance. Required. :type resource: JSON :keyword content_type: Body Parameter content-type. Content type parameter for JSON body. Default value is "application/json". :paramtype content_type: str :return: EvaluationSchedule. The EvaluationSchedule is compatible with MutableMapping :rtype: ~azure.ai.projects.models.EvaluationSchedule :raises ~azure.core.exceptions.HttpResponseError: """ @overload async def create_or_replace_schedule( self, name: str, resource: IO[bytes], *, content_type: str = "application/json", **kwargs: Any ) -> _models.EvaluationSchedule: """Create or replace operation template. :param name: Name of the schedule, which also serves as the unique identifier for the evaluation. Required. :type name: str :param resource: The resource instance. Required. :type resource: IO[bytes] :keyword content_type: Body Parameter content-type. Content type parameter for binary body. Default value is "application/json". :paramtype content_type: str :return: EvaluationSchedule. The EvaluationSchedule is compatible with MutableMapping :rtype: ~azure.ai.projects.models.EvaluationSchedule :raises ~azure.core.exceptions.HttpResponseError: """
[docs] @distributed_trace_async async def create_or_replace_schedule( self, name: str, resource: Union[_models.EvaluationSchedule, JSON, IO[bytes]], **kwargs: Any ) -> _models.EvaluationSchedule: """Create or replace operation template. :param name: Name of the schedule, which also serves as the unique identifier for the evaluation. Required. :type name: str :param resource: The resource instance. Is one of the following types: EvaluationSchedule, JSON, IO[bytes] Required. :type resource: ~azure.ai.projects.models.EvaluationSchedule or JSON or IO[bytes] :return: EvaluationSchedule. The EvaluationSchedule is compatible with MutableMapping :rtype: ~azure.ai.projects.models.EvaluationSchedule :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) _params = kwargs.pop("params", {}) or {} content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None)) cls: ClsType[_models.EvaluationSchedule] = kwargs.pop("cls", None) content_type = content_type or "application/json" _content = None if isinstance(resource, (IOBase, bytes)): _content = resource else: _content = json.dumps(resource, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore _request = build_evaluations_create_or_replace_schedule_request( name=name, content_type=content_type, api_version=self._config.api_version, content=_content, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = kwargs.pop("stream", False) pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200, 201]: if _stream: try: await response.read() # Load the body in memory and close the socket except (StreamConsumedError, StreamClosedError): pass map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) response_headers = {} response_headers["x-ms-client-request-id"] = self._deserialize( "str", response.headers.get("x-ms-client-request-id") ) if _stream: deserialized = response.iter_bytes() else: deserialized = _deserialize(_models.EvaluationSchedule, response.json()) if cls: return cls(pipeline_response, deserialized, response_headers) # type: ignore return deserialized # type: ignore
[docs] @distributed_trace def list_schedule( self, *, top: Optional[int] = None, skip: Optional[int] = None, **kwargs: Any ) -> AsyncIterable["_models.EvaluationSchedule"]: """Resource list operation template. :keyword top: The number of result items to return. Default value is None. :paramtype top: int :keyword skip: The number of result items to skip. Default value is None. :paramtype skip: int :return: An iterator like instance of EvaluationSchedule :rtype: ~azure.core.async_paging.AsyncItemPaged[~azure.ai.projects.models.EvaluationSchedule] :raises ~azure.core.exceptions.HttpResponseError: """ _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} maxpagesize = kwargs.pop("maxpagesize", None) cls: ClsType[List[_models.EvaluationSchedule]] = kwargs.pop("cls", None) error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) def prepare_request(next_link=None): if not next_link: _request = build_evaluations_list_schedule_request( top=top, skip=skip, maxpagesize=maxpagesize, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url( "self._config.subscription_id", self._config.subscription_id, "str" ), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) else: # make call to next link with the client's api-version _parsed_next_link = urllib.parse.urlparse(next_link) _next_request_params = case_insensitive_dict( { key: [urllib.parse.quote(v) for v in value] for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items() } ) _next_request_params["api-version"] = self._config.api_version _request = HttpRequest( "GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url( "self._config.subscription_id", self._config.subscription_id, "str" ), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) return _request async def extract_data(pipeline_response): deserialized = pipeline_response.http_response.json() list_of_elem = _deserialize(List[_models.EvaluationSchedule], deserialized["value"]) if cls: list_of_elem = cls(list_of_elem) # type: ignore return deserialized.get("nextLink") or None, AsyncList(list_of_elem) async def get_next(next_link=None): _request = prepare_request(next_link) _stream = False pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [200]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) return pipeline_response return AsyncItemPaged(get_next, extract_data)
[docs] @distributed_trace_async async def disable_schedule(self, name: str, **kwargs: Any) -> None: """Disable the evaluation schedule. :param name: Name of the evaluation schedule. Required. :type name: str :return: None :rtype: None :raises ~azure.core.exceptions.HttpResponseError: """ error_map: MutableMapping = { 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError, } error_map.update(kwargs.pop("error_map", {}) or {}) _headers = kwargs.pop("headers", {}) or {} _params = kwargs.pop("params", {}) or {} cls: ClsType[None] = kwargs.pop("cls", None) _request = build_evaluations_disable_schedule_request( name=name, api_version=self._config.api_version, headers=_headers, params=_params, ) path_format_arguments = { "endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"), "subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"), "resourceGroupName": self._serialize.url( "self._config.resource_group_name", self._config.resource_group_name, "str" ), "projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"), } _request.url = self._client.format_url(_request.url, **path_format_arguments) _stream = False pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access _request, stream=_stream, **kwargs ) response = pipeline_response.http_response if response.status_code not in [204]: map_error(status_code=response.status_code, response=response, error_map=error_map) raise HttpResponseError(response=response) if cls: return cls(pipeline_response, None, {}) # type: ignore