# pylint: disable=too-many-lines
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) Python Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
from io import IOBase
import json
import sys
from typing import Any, Callable, Dict, IO, Iterable, List, Optional, TYPE_CHECKING, TypeVar, Union, overload
import urllib.parse
from azure.core import PipelineClient
from azure.core.exceptions import (
ClientAuthenticationError,
HttpResponseError,
ResourceExistsError,
ResourceNotFoundError,
ResourceNotModifiedError,
StreamClosedError,
StreamConsumedError,
map_error,
)
from azure.core.paging import ItemPaged
from azure.core.pipeline import PipelineResponse
from azure.core.rest import HttpRequest, HttpResponse
from azure.core.tracing.decorator import distributed_trace
from azure.core.utils import case_insensitive_dict
from .. import _model_base, models as _models
from .._configuration import AIProjectClientConfiguration
from .._model_base import SdkJSONEncoder, _deserialize
from .._serialization import Deserializer, Serializer
from .._vendor import FileType, prepare_multipart_form_data
if sys.version_info >= (3, 9):
from collections.abc import MutableMapping
else:
from typing import MutableMapping # type: ignore
if TYPE_CHECKING:
from .. import _types
JSON = MutableMapping[str, Any] # pylint: disable=unsubscriptable-object
_Unset: Any = object()
T = TypeVar("T")
ClsType = Optional[Callable[[PipelineResponse[HttpRequest, HttpResponse], T, Dict[str, Any]], Any]]
_SERIALIZER = Serializer()
_SERIALIZER.client_side_validation = False
def build_agents_create_agent_request(**kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/assistants"
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_list_agents_request(
*,
limit: Optional[int] = None,
order: Optional[Union[str, _models.ListSortOrder]] = None,
after: Optional[str] = None,
before: Optional[str] = None,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/assistants"
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
if limit is not None:
_params["limit"] = _SERIALIZER.query("limit", limit, "int")
if order is not None:
_params["order"] = _SERIALIZER.query("order", order, "str")
if after is not None:
_params["after"] = _SERIALIZER.query("after", after, "str")
if before is not None:
_params["before"] = _SERIALIZER.query("before", before, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_get_agent_request(assistant_id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/assistants/{assistantId}"
path_format_arguments = {
"assistantId": _SERIALIZER.url("assistant_id", assistant_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_update_agent_request(assistant_id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/assistants/{assistantId}"
path_format_arguments = {
"assistantId": _SERIALIZER.url("assistant_id", assistant_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_delete_agent_request(assistant_id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/assistants/{assistantId}"
path_format_arguments = {
"assistantId": _SERIALIZER.url("assistant_id", assistant_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="DELETE", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_create_thread_request(**kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/threads"
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_get_thread_request(thread_id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/threads/{threadId}"
path_format_arguments = {
"threadId": _SERIALIZER.url("thread_id", thread_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_update_thread_request(thread_id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/threads/{threadId}"
path_format_arguments = {
"threadId": _SERIALIZER.url("thread_id", thread_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_delete_thread_request(thread_id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/threads/{threadId}"
path_format_arguments = {
"threadId": _SERIALIZER.url("thread_id", thread_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="DELETE", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_create_message_request(thread_id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/threads/{threadId}/messages"
path_format_arguments = {
"threadId": _SERIALIZER.url("thread_id", thread_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_list_messages_request(
thread_id: str,
*,
run_id: Optional[str] = None,
limit: Optional[int] = None,
order: Optional[Union[str, _models.ListSortOrder]] = None,
after: Optional[str] = None,
before: Optional[str] = None,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/threads/{threadId}/messages"
path_format_arguments = {
"threadId": _SERIALIZER.url("thread_id", thread_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
if run_id is not None:
_params["runId"] = _SERIALIZER.query("run_id", run_id, "str")
if limit is not None:
_params["limit"] = _SERIALIZER.query("limit", limit, "int")
if order is not None:
_params["order"] = _SERIALIZER.query("order", order, "str")
if after is not None:
_params["after"] = _SERIALIZER.query("after", after, "str")
if before is not None:
_params["before"] = _SERIALIZER.query("before", before, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_get_message_request(thread_id: str, message_id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/threads/{threadId}/messages/{messageId}"
path_format_arguments = {
"threadId": _SERIALIZER.url("thread_id", thread_id, "str"),
"messageId": _SERIALIZER.url("message_id", message_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_update_message_request(thread_id: str, message_id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/threads/{threadId}/messages/{messageId}"
path_format_arguments = {
"threadId": _SERIALIZER.url("thread_id", thread_id, "str"),
"messageId": _SERIALIZER.url("message_id", message_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_create_run_request(
thread_id: str, *, include: Optional[List[Union[str, _models.RunAdditionalFieldList]]] = None, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/threads/{threadId}/runs"
path_format_arguments = {
"threadId": _SERIALIZER.url("thread_id", thread_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
if include is not None:
_params["include[]"] = _SERIALIZER.query("include", include, "[str]", div=",")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_list_runs_request(
thread_id: str,
*,
limit: Optional[int] = None,
order: Optional[Union[str, _models.ListSortOrder]] = None,
after: Optional[str] = None,
before: Optional[str] = None,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/threads/{threadId}/runs"
path_format_arguments = {
"threadId": _SERIALIZER.url("thread_id", thread_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
if limit is not None:
_params["limit"] = _SERIALIZER.query("limit", limit, "int")
if order is not None:
_params["order"] = _SERIALIZER.query("order", order, "str")
if after is not None:
_params["after"] = _SERIALIZER.query("after", after, "str")
if before is not None:
_params["before"] = _SERIALIZER.query("before", before, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_get_run_request(thread_id: str, run_id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/threads/{threadId}/runs/{runId}"
path_format_arguments = {
"threadId": _SERIALIZER.url("thread_id", thread_id, "str"),
"runId": _SERIALIZER.url("run_id", run_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_update_run_request(thread_id: str, run_id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/threads/{threadId}/runs/{runId}"
path_format_arguments = {
"threadId": _SERIALIZER.url("thread_id", thread_id, "str"),
"runId": _SERIALIZER.url("run_id", run_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_submit_tool_outputs_to_run_request( # pylint: disable=name-too-long
thread_id: str, run_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/threads/{threadId}/runs/{runId}/submit_tool_outputs"
path_format_arguments = {
"threadId": _SERIALIZER.url("thread_id", thread_id, "str"),
"runId": _SERIALIZER.url("run_id", run_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_cancel_run_request(thread_id: str, run_id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/threads/{threadId}/runs/{runId}/cancel"
path_format_arguments = {
"threadId": _SERIALIZER.url("thread_id", thread_id, "str"),
"runId": _SERIALIZER.url("run_id", run_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_create_thread_and_run_request(**kwargs: Any) -> HttpRequest: # pylint: disable=name-too-long
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/threads/runs"
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_get_run_step_request(
thread_id: str,
run_id: str,
step_id: str,
*,
include: Optional[List[Union[str, _models.RunAdditionalFieldList]]] = None,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/threads/{threadId}/runs/{runId}/steps/{stepId}"
path_format_arguments = {
"threadId": _SERIALIZER.url("thread_id", thread_id, "str"),
"runId": _SERIALIZER.url("run_id", run_id, "str"),
"stepId": _SERIALIZER.url("step_id", step_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
if include is not None:
_params["include[]"] = _SERIALIZER.query("include", include, "[str]", div=",")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_list_run_steps_request(
thread_id: str,
run_id: str,
*,
include: Optional[List[Union[str, _models.RunAdditionalFieldList]]] = None,
limit: Optional[int] = None,
order: Optional[Union[str, _models.ListSortOrder]] = None,
after: Optional[str] = None,
before: Optional[str] = None,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/threads/{threadId}/runs/{runId}/steps"
path_format_arguments = {
"threadId": _SERIALIZER.url("thread_id", thread_id, "str"),
"runId": _SERIALIZER.url("run_id", run_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
if include is not None:
_params["include[]"] = _SERIALIZER.query("include", include, "[str]", div=",")
if limit is not None:
_params["limit"] = _SERIALIZER.query("limit", limit, "int")
if order is not None:
_params["order"] = _SERIALIZER.query("order", order, "str")
if after is not None:
_params["after"] = _SERIALIZER.query("after", after, "str")
if before is not None:
_params["before"] = _SERIALIZER.query("before", before, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_list_files_request(
*, purpose: Optional[Union[str, _models.FilePurpose]] = None, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/files"
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
if purpose is not None:
_params["purpose"] = _SERIALIZER.query("purpose", purpose, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_upload_file_request(**kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/files"
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_delete_file_request(file_id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/files/{fileId}"
path_format_arguments = {
"fileId": _SERIALIZER.url("file_id", file_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="DELETE", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_get_file_request(file_id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/files/{fileId}"
path_format_arguments = {
"fileId": _SERIALIZER.url("file_id", file_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_get_file_content_request(file_id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/files/{fileId}/content"
path_format_arguments = {
"fileId": _SERIALIZER.url("file_id", file_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_list_vector_stores_request(
*,
limit: Optional[int] = None,
order: Optional[Union[str, _models.ListSortOrder]] = None,
after: Optional[str] = None,
before: Optional[str] = None,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/vector_stores"
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
if limit is not None:
_params["limit"] = _SERIALIZER.query("limit", limit, "int")
if order is not None:
_params["order"] = _SERIALIZER.query("order", order, "str")
if after is not None:
_params["after"] = _SERIALIZER.query("after", after, "str")
if before is not None:
_params["before"] = _SERIALIZER.query("before", before, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_create_vector_store_request(**kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/vector_stores"
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_get_vector_store_request(vector_store_id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/vector_stores/{vectorStoreId}"
path_format_arguments = {
"vectorStoreId": _SERIALIZER.url("vector_store_id", vector_store_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_modify_vector_store_request(vector_store_id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/vector_stores/{vectorStoreId}"
path_format_arguments = {
"vectorStoreId": _SERIALIZER.url("vector_store_id", vector_store_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_delete_vector_store_request(vector_store_id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/vector_stores/{vectorStoreId}"
path_format_arguments = {
"vectorStoreId": _SERIALIZER.url("vector_store_id", vector_store_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="DELETE", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_list_vector_store_files_request( # pylint: disable=name-too-long
vector_store_id: str,
*,
filter: Optional[Union[str, _models.VectorStoreFileStatusFilter]] = None,
limit: Optional[int] = None,
order: Optional[Union[str, _models.ListSortOrder]] = None,
after: Optional[str] = None,
before: Optional[str] = None,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/vector_stores/{vectorStoreId}/files"
path_format_arguments = {
"vectorStoreId": _SERIALIZER.url("vector_store_id", vector_store_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
if filter is not None:
_params["filter"] = _SERIALIZER.query("filter", filter, "str")
if limit is not None:
_params["limit"] = _SERIALIZER.query("limit", limit, "int")
if order is not None:
_params["order"] = _SERIALIZER.query("order", order, "str")
if after is not None:
_params["after"] = _SERIALIZER.query("after", after, "str")
if before is not None:
_params["before"] = _SERIALIZER.query("before", before, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_create_vector_store_file_request( # pylint: disable=name-too-long
vector_store_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/vector_stores/{vectorStoreId}/files"
path_format_arguments = {
"vectorStoreId": _SERIALIZER.url("vector_store_id", vector_store_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_get_vector_store_file_request( # pylint: disable=name-too-long
vector_store_id: str, file_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/vector_stores/{vectorStoreId}/files/{fileId}"
path_format_arguments = {
"vectorStoreId": _SERIALIZER.url("vector_store_id", vector_store_id, "str"),
"fileId": _SERIALIZER.url("file_id", file_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_delete_vector_store_file_request( # pylint: disable=name-too-long
vector_store_id: str, file_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/vector_stores/{vectorStoreId}/files/{fileId}"
path_format_arguments = {
"vectorStoreId": _SERIALIZER.url("vector_store_id", vector_store_id, "str"),
"fileId": _SERIALIZER.url("file_id", file_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="DELETE", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_create_vector_store_file_batch_request( # pylint: disable=name-too-long
vector_store_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/vector_stores/{vectorStoreId}/file_batches"
path_format_arguments = {
"vectorStoreId": _SERIALIZER.url("vector_store_id", vector_store_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_get_vector_store_file_batch_request( # pylint: disable=name-too-long
vector_store_id: str, batch_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/vector_stores/{vectorStoreId}/file_batches/{batchId}"
path_format_arguments = {
"vectorStoreId": _SERIALIZER.url("vector_store_id", vector_store_id, "str"),
"batchId": _SERIALIZER.url("batch_id", batch_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_cancel_vector_store_file_batch_request( # pylint: disable=name-too-long
vector_store_id: str, batch_id: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/vector_stores/{vectorStoreId}/file_batches/{batchId}/cancel"
path_format_arguments = {
"vectorStoreId": _SERIALIZER.url("vector_store_id", vector_store_id, "str"),
"batchId": _SERIALIZER.url("batch_id", batch_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_agents_list_vector_store_file_batch_files_request( # pylint: disable=name-too-long
vector_store_id: str,
batch_id: str,
*,
filter: Optional[Union[str, _models.VectorStoreFileStatusFilter]] = None,
limit: Optional[int] = None,
order: Optional[Union[str, _models.ListSortOrder]] = None,
after: Optional[str] = None,
before: Optional[str] = None,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/vector_stores/{vectorStoreId}/file_batches/{batchId}/files"
path_format_arguments = {
"vectorStoreId": _SERIALIZER.url("vector_store_id", vector_store_id, "str"),
"batchId": _SERIALIZER.url("batch_id", batch_id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
if filter is not None:
_params["filter"] = _SERIALIZER.query("filter", filter, "str")
if limit is not None:
_params["limit"] = _SERIALIZER.query("limit", limit, "int")
if order is not None:
_params["order"] = _SERIALIZER.query("order", order, "str")
if after is not None:
_params["after"] = _SERIALIZER.query("after", after, "str")
if before is not None:
_params["before"] = _SERIALIZER.query("before", before, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_connections_get_workspace_request(**kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/"
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_connections_list_connections_request( # pylint: disable=name-too-long
*,
category: Optional[Union[str, _models.ConnectionType]] = None,
include_all: Optional[bool] = None,
target: Optional[str] = None,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/connections"
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
if category is not None:
_params["category"] = _SERIALIZER.query("category", category, "str")
if include_all is not None:
_params["includeAll"] = _SERIALIZER.query("include_all", include_all, "bool")
if target is not None:
_params["target"] = _SERIALIZER.query("target", target, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_connections_get_connection_request(connection_name: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/connections/{connectionName}"
path_format_arguments = {
"connectionName": _SERIALIZER.url("connection_name", connection_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_connections_get_connection_with_secrets_request( # pylint: disable=name-too-long
connection_name: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/connections/{connectionName}/listsecrets"
path_format_arguments = {
"connectionName": _SERIALIZER.url("connection_name", connection_name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_telemetry_get_app_insights_request(app_insights_resource_url: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/{appInsightsResourceUrl}"
path_format_arguments = {
"appInsightsResourceUrl": _SERIALIZER.url("app_insights_resource_url", app_insights_resource_url, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_evaluations_get_request(id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/evaluations/runs/{id}"
path_format_arguments = {
"id": _SERIALIZER.url("id", id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_evaluations_create_request(**kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
api_version: str = kwargs.pop("api_version", _params.pop("apiVersion", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/evaluations/runs:run"
# Construct parameters
_params["apiVersion"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="POST", url=_url, params=_params, headers=_headers, **kwargs)
def build_evaluations_list_request(
*, top: Optional[int] = None, skip: Optional[int] = None, maxpagesize: Optional[int] = None, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/evaluations/runs"
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
if top is not None:
_params["top"] = _SERIALIZER.query("top", top, "int")
if skip is not None:
_params["skip"] = _SERIALIZER.query("skip", skip, "int")
if maxpagesize is not None:
_params["maxpagesize"] = _SERIALIZER.query("maxpagesize", maxpagesize, "int")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_evaluations_update_request(id: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/evaluations/runs/{id}"
path_format_arguments = {
"id": _SERIALIZER.url("id", id, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="PATCH", url=_url, params=_params, headers=_headers, **kwargs)
def build_evaluations_get_schedule_request(name: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/evaluations/schedules/{name}"
path_format_arguments = {
"name": _SERIALIZER.url("name", name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_evaluations_create_or_replace_schedule_request( # pylint: disable=name-too-long
name: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/evaluations/schedules/{name}"
path_format_arguments = {
"name": _SERIALIZER.url("name", name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
if content_type is not None:
_headers["Content-Type"] = _SERIALIZER.header("content_type", content_type, "str")
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="PUT", url=_url, params=_params, headers=_headers, **kwargs)
def build_evaluations_list_schedule_request(
*, top: Optional[int] = None, skip: Optional[int] = None, maxpagesize: Optional[int] = None, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("api-version", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/evaluations/schedules"
# Construct parameters
_params["api-version"] = _SERIALIZER.query("api_version", api_version, "str")
if top is not None:
_params["top"] = _SERIALIZER.query("top", top, "int")
if skip is not None:
_params["skip"] = _SERIALIZER.query("skip", skip, "int")
if maxpagesize is not None:
_params["maxpagesize"] = _SERIALIZER.query("maxpagesize", maxpagesize, "int")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="GET", url=_url, params=_params, headers=_headers, **kwargs)
def build_evaluations_disable_schedule_request( # pylint: disable=name-too-long
name: str, **kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})
api_version: str = kwargs.pop("api_version", _params.pop("apiVersion", "2024-07-01-preview"))
accept = _headers.pop("Accept", "application/json")
# Construct URL
_url = "/evaluations/schedules/{name}/disable"
path_format_arguments = {
"name": _SERIALIZER.url("name", name, "str"),
}
_url: str = _url.format(**path_format_arguments) # type: ignore
# Construct parameters
_params["apiVersion"] = _SERIALIZER.query("api_version", api_version, "str")
# Construct headers
_headers["Accept"] = _SERIALIZER.header("accept", accept, "str")
return HttpRequest(method="PATCH", url=_url, params=_params, headers=_headers, **kwargs)
class AgentsOperations: # pylint: disable=too-many-public-methods
"""
.. warning::
**DO NOT** instantiate this class directly.
Instead, you should access the following operations through
:class:`~azure.ai.projects.AIProjectClient`'s
:attr:`agents` attribute.
"""
def __init__(self, *args, **kwargs):
input_args = list(args)
self._client: PipelineClient = input_args.pop(0) if input_args else kwargs.pop("client")
self._config: AIProjectClientConfiguration = input_args.pop(0) if input_args else kwargs.pop("config")
self._serialize: Serializer = input_args.pop(0) if input_args else kwargs.pop("serializer")
self._deserialize: Deserializer = input_args.pop(0) if input_args else kwargs.pop("deserializer")
@overload
def create_agent(
self,
*,
model: str,
content_type: str = "application/json",
name: Optional[str] = None,
description: Optional[str] = None,
instructions: Optional[str] = None,
tools: Optional[List[_models.ToolDefinition]] = None,
tool_resources: Optional[_models.ToolResources] = None,
temperature: Optional[float] = None,
top_p: Optional[float] = None,
response_format: Optional["_types.AgentsApiResponseFormatOption"] = None,
metadata: Optional[Dict[str, str]] = None,
**kwargs: Any
) -> _models.Agent:
"""Creates a new agent.
:keyword model: The ID of the model to use. Required.
:paramtype model: str
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:keyword name: The name of the new agent. Default value is None.
:paramtype name: str
:keyword description: The description of the new agent. Default value is None.
:paramtype description: str
:keyword instructions: The system instructions for the new agent to use. Default value is None.
:paramtype instructions: str
:keyword tools: The collection of tools to enable for the new agent. Default value is None.
:paramtype tools: list[~azure.ai.projects.models.ToolDefinition]
:keyword tool_resources: A set of resources that are used by the agent's tools. The resources
are specific to the type of tool. For example, the ``code_interpreter``
tool requires a list of file IDs, while the ``file_search`` tool requires a list of vector
store IDs. Default value is None.
:paramtype tool_resources: ~azure.ai.projects.models.ToolResources
:keyword temperature: What sampling temperature to use, between 0 and 2. Higher values like 0.8
will make the output more random,
while lower values like 0.2 will make it more focused and deterministic. Default value is
None.
:paramtype temperature: float
:keyword top_p: An alternative to sampling with temperature, called nucleus sampling, where the
model considers the results of the tokens with top_p probability mass.
So 0.1 means only the tokens comprising the top 10% probability mass are considered.
We generally recommend altering this or temperature but not both. Default value is None.
:paramtype top_p: float
:keyword response_format: The response format of the tool calls used by this agent. Is one of
the following types: str, Union[str, "_models.AgentsApiResponseFormatMode"],
AgentsApiResponseFormat, ResponseFormatJsonSchemaType Default value is None.
:paramtype response_format: str or str or ~azure.ai.projects.models.AgentsApiResponseFormatMode
or ~azure.ai.projects.models.AgentsApiResponseFormat or
~azure.ai.projects.models.ResponseFormatJsonSchemaType
:keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
for storing additional information about that object in a structured format. Keys may be up to
64 characters in length and values may be up to 512 characters in length. Default value is
None.
:paramtype metadata: dict[str, str]
:return: Agent. The Agent is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.Agent
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def create_agent(self, body: JSON, *, content_type: str = "application/json", **kwargs: Any) -> _models.Agent:
"""Creates a new agent.
:param body: Required.
:type body: JSON
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: Agent. The Agent is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.Agent
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def create_agent(self, body: IO[bytes], *, content_type: str = "application/json", **kwargs: Any) -> _models.Agent:
"""Creates a new agent.
:param body: Required.
:type body: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: Agent. The Agent is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.Agent
:raises ~azure.core.exceptions.HttpResponseError:
"""
@distributed_trace
def create_agent(
self,
body: Union[JSON, IO[bytes]] = _Unset,
*,
model: str = _Unset,
name: Optional[str] = None,
description: Optional[str] = None,
instructions: Optional[str] = None,
tools: Optional[List[_models.ToolDefinition]] = None,
tool_resources: Optional[_models.ToolResources] = None,
temperature: Optional[float] = None,
top_p: Optional[float] = None,
response_format: Optional["_types.AgentsApiResponseFormatOption"] = None,
metadata: Optional[Dict[str, str]] = None,
**kwargs: Any
) -> _models.Agent:
"""Creates a new agent.
:param body: Is either a JSON type or a IO[bytes] type. Required.
:type body: JSON or IO[bytes]
:keyword model: The ID of the model to use. Required.
:paramtype model: str
:keyword name: The name of the new agent. Default value is None.
:paramtype name: str
:keyword description: The description of the new agent. Default value is None.
:paramtype description: str
:keyword instructions: The system instructions for the new agent to use. Default value is None.
:paramtype instructions: str
:keyword tools: The collection of tools to enable for the new agent. Default value is None.
:paramtype tools: list[~azure.ai.projects.models.ToolDefinition]
:keyword tool_resources: A set of resources that are used by the agent's tools. The resources
are specific to the type of tool. For example, the ``code_interpreter``
tool requires a list of file IDs, while the ``file_search`` tool requires a list of vector
store IDs. Default value is None.
:paramtype tool_resources: ~azure.ai.projects.models.ToolResources
:keyword temperature: What sampling temperature to use, between 0 and 2. Higher values like 0.8
will make the output more random,
while lower values like 0.2 will make it more focused and deterministic. Default value is
None.
:paramtype temperature: float
:keyword top_p: An alternative to sampling with temperature, called nucleus sampling, where the
model considers the results of the tokens with top_p probability mass.
So 0.1 means only the tokens comprising the top 10% probability mass are considered.
We generally recommend altering this or temperature but not both. Default value is None.
:paramtype top_p: float
:keyword response_format: The response format of the tool calls used by this agent. Is one of
the following types: str, Union[str, "_models.AgentsApiResponseFormatMode"],
AgentsApiResponseFormat, ResponseFormatJsonSchemaType Default value is None.
:paramtype response_format: str or str or ~azure.ai.projects.models.AgentsApiResponseFormatMode
or ~azure.ai.projects.models.AgentsApiResponseFormat or
~azure.ai.projects.models.ResponseFormatJsonSchemaType
:keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
for storing additional information about that object in a structured format. Keys may be up to
64 characters in length and values may be up to 512 characters in length. Default value is
None.
:paramtype metadata: dict[str, str]
:return: Agent. The Agent is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.Agent
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.Agent] = kwargs.pop("cls", None)
if body is _Unset:
if model is _Unset:
raise TypeError("missing required argument: model")
body = {
"description": description,
"instructions": instructions,
"metadata": metadata,
"model": model,
"name": name,
"response_format": response_format,
"temperature": temperature,
"tool_resources": tool_resources,
"tools": tools,
"top_p": top_p,
}
body = {k: v for k, v in body.items() if v is not None}
content_type = content_type or "application/json"
_content = None
if isinstance(body, (IOBase, bytes)):
_content = body
else:
_content = json.dumps(body, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore
_request = build_agents_create_agent_request(
content_type=content_type,
api_version=self._config.api_version,
content=_content,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.Agent, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def list_agents(
self,
*,
limit: Optional[int] = None,
order: Optional[Union[str, _models.ListSortOrder]] = None,
after: Optional[str] = None,
before: Optional[str] = None,
**kwargs: Any
) -> _models.OpenAIPageableListOfAgent:
"""Gets a list of agents that were previously created.
:keyword limit: A limit on the number of objects to be returned. Limit can range between 1 and
100, and the default is 20. Default value is None.
:paramtype limit: int
:keyword order: Sort order by the created_at timestamp of the objects. asc for ascending order
and desc for descending order. Known values are: "asc" and "desc". Default value is None.
:paramtype order: str or ~azure.ai.projects.models.ListSortOrder
:keyword after: A cursor for use in pagination. after is an object ID that defines your place
in the list. For instance, if you make a list request and receive 100 objects, ending with
obj_foo, your subsequent call can include after=obj_foo in order to fetch the next page of the
list. Default value is None.
:paramtype after: str
:keyword before: A cursor for use in pagination. before is an object ID that defines your place
in the list. For instance, if you make a list request and receive 100 objects, ending with
obj_foo, your subsequent call can include before=obj_foo in order to fetch the previous page of
the list. Default value is None.
:paramtype before: str
:return: OpenAIPageableListOfAgent. The OpenAIPageableListOfAgent is compatible with
MutableMapping
:rtype: ~azure.ai.projects.models.OpenAIPageableListOfAgent
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[_models.OpenAIPageableListOfAgent] = kwargs.pop("cls", None)
_request = build_agents_list_agents_request(
limit=limit,
order=order,
after=after,
before=before,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.OpenAIPageableListOfAgent, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def get_agent(self, assistant_id: str, **kwargs: Any) -> _models.Agent:
"""Retrieves an existing agent.
:param assistant_id: Identifier of the agent. Required.
:type assistant_id: str
:return: Agent. The Agent is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.Agent
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[_models.Agent] = kwargs.pop("cls", None)
_request = build_agents_get_agent_request(
assistant_id=assistant_id,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.Agent, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
@overload
def update_agent(
self,
assistant_id: str,
*,
content_type: str = "application/json",
model: Optional[str] = None,
name: Optional[str] = None,
description: Optional[str] = None,
instructions: Optional[str] = None,
tools: Optional[List[_models.ToolDefinition]] = None,
tool_resources: Optional[_models.ToolResources] = None,
temperature: Optional[float] = None,
top_p: Optional[float] = None,
response_format: Optional["_types.AgentsApiResponseFormatOption"] = None,
metadata: Optional[Dict[str, str]] = None,
**kwargs: Any
) -> _models.Agent:
"""Modifies an existing agent.
:param assistant_id: The ID of the agent to modify. Required.
:type assistant_id: str
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:keyword model: The ID of the model to use. Default value is None.
:paramtype model: str
:keyword name: The modified name for the agent to use. Default value is None.
:paramtype name: str
:keyword description: The modified description for the agent to use. Default value is None.
:paramtype description: str
:keyword instructions: The modified system instructions for the new agent to use. Default value
is None.
:paramtype instructions: str
:keyword tools: The modified collection of tools to enable for the agent. Default value is
None.
:paramtype tools: list[~azure.ai.projects.models.ToolDefinition]
:keyword tool_resources: A set of resources that are used by the agent's tools. The resources
are specific to the type of tool. For example,
the ``code_interpreter`` tool requires a list of file IDs, while the ``file_search`` tool
requires a list of vector store IDs. Default value is None.
:paramtype tool_resources: ~azure.ai.projects.models.ToolResources
:keyword temperature: What sampling temperature to use, between 0 and 2. Higher values like 0.8
will make the output more random,
while lower values like 0.2 will make it more focused and deterministic. Default value is
None.
:paramtype temperature: float
:keyword top_p: An alternative to sampling with temperature, called nucleus sampling, where the
model considers the results of the tokens with top_p probability mass.
So 0.1 means only the tokens comprising the top 10% probability mass are considered.
We generally recommend altering this or temperature but not both. Default value is None.
:paramtype top_p: float
:keyword response_format: The response format of the tool calls used by this agent. Is one of
the following types: str, Union[str, "_models.AgentsApiResponseFormatMode"],
AgentsApiResponseFormat, ResponseFormatJsonSchemaType Default value is None.
:paramtype response_format: str or str or ~azure.ai.projects.models.AgentsApiResponseFormatMode
or ~azure.ai.projects.models.AgentsApiResponseFormat or
~azure.ai.projects.models.ResponseFormatJsonSchemaType
:keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
for storing additional information about that object in a structured format. Keys may be up to
64 characters in length and values may be up to 512 characters in length. Default value is
None.
:paramtype metadata: dict[str, str]
:return: Agent. The Agent is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.Agent
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def update_agent(
self, assistant_id: str, body: JSON, *, content_type: str = "application/json", **kwargs: Any
) -> _models.Agent:
"""Modifies an existing agent.
:param assistant_id: The ID of the agent to modify. Required.
:type assistant_id: str
:param body: Required.
:type body: JSON
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: Agent. The Agent is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.Agent
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def update_agent(
self, assistant_id: str, body: IO[bytes], *, content_type: str = "application/json", **kwargs: Any
) -> _models.Agent:
"""Modifies an existing agent.
:param assistant_id: The ID of the agent to modify. Required.
:type assistant_id: str
:param body: Required.
:type body: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: Agent. The Agent is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.Agent
:raises ~azure.core.exceptions.HttpResponseError:
"""
@distributed_trace
def update_agent(
self,
assistant_id: str,
body: Union[JSON, IO[bytes]] = _Unset,
*,
model: Optional[str] = None,
name: Optional[str] = None,
description: Optional[str] = None,
instructions: Optional[str] = None,
tools: Optional[List[_models.ToolDefinition]] = None,
tool_resources: Optional[_models.ToolResources] = None,
temperature: Optional[float] = None,
top_p: Optional[float] = None,
response_format: Optional["_types.AgentsApiResponseFormatOption"] = None,
metadata: Optional[Dict[str, str]] = None,
**kwargs: Any
) -> _models.Agent:
"""Modifies an existing agent.
:param assistant_id: The ID of the agent to modify. Required.
:type assistant_id: str
:param body: Is either a JSON type or a IO[bytes] type. Required.
:type body: JSON or IO[bytes]
:keyword model: The ID of the model to use. Default value is None.
:paramtype model: str
:keyword name: The modified name for the agent to use. Default value is None.
:paramtype name: str
:keyword description: The modified description for the agent to use. Default value is None.
:paramtype description: str
:keyword instructions: The modified system instructions for the new agent to use. Default value
is None.
:paramtype instructions: str
:keyword tools: The modified collection of tools to enable for the agent. Default value is
None.
:paramtype tools: list[~azure.ai.projects.models.ToolDefinition]
:keyword tool_resources: A set of resources that are used by the agent's tools. The resources
are specific to the type of tool. For example,
the ``code_interpreter`` tool requires a list of file IDs, while the ``file_search`` tool
requires a list of vector store IDs. Default value is None.
:paramtype tool_resources: ~azure.ai.projects.models.ToolResources
:keyword temperature: What sampling temperature to use, between 0 and 2. Higher values like 0.8
will make the output more random,
while lower values like 0.2 will make it more focused and deterministic. Default value is
None.
:paramtype temperature: float
:keyword top_p: An alternative to sampling with temperature, called nucleus sampling, where the
model considers the results of the tokens with top_p probability mass.
So 0.1 means only the tokens comprising the top 10% probability mass are considered.
We generally recommend altering this or temperature but not both. Default value is None.
:paramtype top_p: float
:keyword response_format: The response format of the tool calls used by this agent. Is one of
the following types: str, Union[str, "_models.AgentsApiResponseFormatMode"],
AgentsApiResponseFormat, ResponseFormatJsonSchemaType Default value is None.
:paramtype response_format: str or str or ~azure.ai.projects.models.AgentsApiResponseFormatMode
or ~azure.ai.projects.models.AgentsApiResponseFormat or
~azure.ai.projects.models.ResponseFormatJsonSchemaType
:keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
for storing additional information about that object in a structured format. Keys may be up to
64 characters in length and values may be up to 512 characters in length. Default value is
None.
:paramtype metadata: dict[str, str]
:return: Agent. The Agent is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.Agent
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.Agent] = kwargs.pop("cls", None)
if body is _Unset:
body = {
"description": description,
"instructions": instructions,
"metadata": metadata,
"model": model,
"name": name,
"response_format": response_format,
"temperature": temperature,
"tool_resources": tool_resources,
"tools": tools,
"top_p": top_p,
}
body = {k: v for k, v in body.items() if v is not None}
content_type = content_type or "application/json"
_content = None
if isinstance(body, (IOBase, bytes)):
_content = body
else:
_content = json.dumps(body, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore
_request = build_agents_update_agent_request(
assistant_id=assistant_id,
content_type=content_type,
api_version=self._config.api_version,
content=_content,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.Agent, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
@distributed_trace
def delete_agent(self, assistant_id: str, **kwargs: Any) -> _models.AgentDeletionStatus:
"""Deletes an agent.
:param assistant_id: Identifier of the agent. Required.
:type assistant_id: str
:return: AgentDeletionStatus. The AgentDeletionStatus is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.AgentDeletionStatus
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[_models.AgentDeletionStatus] = kwargs.pop("cls", None)
_request = build_agents_delete_agent_request(
assistant_id=assistant_id,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.AgentDeletionStatus, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
@overload
def create_thread(
self,
*,
content_type: str = "application/json",
messages: Optional[List[_models.ThreadMessageOptions]] = None,
tool_resources: Optional[_models.ToolResources] = None,
metadata: Optional[Dict[str, str]] = None,
**kwargs: Any
) -> _models.AgentThread:
"""Creates a new thread. Threads contain messages and can be run by agents.
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:keyword messages: The initial messages to associate with the new thread. Default value is
None.
:paramtype messages: list[~azure.ai.projects.models.ThreadMessageOptions]
:keyword tool_resources: A set of resources that are made available to the agent's tools in
this thread. The resources are specific to the
type of tool. For example, the ``code_interpreter`` tool requires a list of file IDs, while
the ``file_search`` tool requires
a list of vector store IDs. Default value is None.
:paramtype tool_resources: ~azure.ai.projects.models.ToolResources
:keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
for storing additional information about that object in a structured format. Keys may be up to
64 characters in length and values may be up to 512 characters in length. Default value is
None.
:paramtype metadata: dict[str, str]
:return: AgentThread. The AgentThread is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.AgentThread
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def create_thread(
self, body: JSON, *, content_type: str = "application/json", **kwargs: Any
) -> _models.AgentThread:
"""Creates a new thread. Threads contain messages and can be run by agents.
:param body: Required.
:type body: JSON
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: AgentThread. The AgentThread is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.AgentThread
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def create_thread(
self, body: IO[bytes], *, content_type: str = "application/json", **kwargs: Any
) -> _models.AgentThread:
"""Creates a new thread. Threads contain messages and can be run by agents.
:param body: Required.
:type body: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: AgentThread. The AgentThread is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.AgentThread
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def create_thread(
self,
body: Union[JSON, IO[bytes]] = _Unset,
*,
messages: Optional[List[_models.ThreadMessageOptions]] = None,
tool_resources: Optional[_models.ToolResources] = None,
metadata: Optional[Dict[str, str]] = None,
**kwargs: Any
) -> _models.AgentThread:
"""Creates a new thread. Threads contain messages and can be run by agents.
:param body: Is either a JSON type or a IO[bytes] type. Required.
:type body: JSON or IO[bytes]
:keyword messages: The initial messages to associate with the new thread. Default value is
None.
:paramtype messages: list[~azure.ai.projects.models.ThreadMessageOptions]
:keyword tool_resources: A set of resources that are made available to the agent's tools in
this thread. The resources are specific to the
type of tool. For example, the ``code_interpreter`` tool requires a list of file IDs, while
the ``file_search`` tool requires
a list of vector store IDs. Default value is None.
:paramtype tool_resources: ~azure.ai.projects.models.ToolResources
:keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
for storing additional information about that object in a structured format. Keys may be up to
64 characters in length and values may be up to 512 characters in length. Default value is
None.
:paramtype metadata: dict[str, str]
:return: AgentThread. The AgentThread is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.AgentThread
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.AgentThread] = kwargs.pop("cls", None)
if body is _Unset:
body = {"messages": messages, "metadata": metadata, "tool_resources": tool_resources}
body = {k: v for k, v in body.items() if v is not None}
content_type = content_type or "application/json"
_content = None
if isinstance(body, (IOBase, bytes)):
_content = body
else:
_content = json.dumps(body, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore
_request = build_agents_create_thread_request(
content_type=content_type,
api_version=self._config.api_version,
content=_content,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.AgentThread, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def get_thread(self, thread_id: str, **kwargs: Any) -> _models.AgentThread:
"""Gets information about an existing thread.
:param thread_id: Identifier of the thread. Required.
:type thread_id: str
:return: AgentThread. The AgentThread is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.AgentThread
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[_models.AgentThread] = kwargs.pop("cls", None)
_request = build_agents_get_thread_request(
thread_id=thread_id,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.AgentThread, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
@overload
def update_thread(
self,
thread_id: str,
*,
content_type: str = "application/json",
tool_resources: Optional[_models.ToolResources] = None,
metadata: Optional[Dict[str, str]] = None,
**kwargs: Any
) -> _models.AgentThread:
"""Modifies an existing thread.
:param thread_id: The ID of the thread to modify. Required.
:type thread_id: str
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:keyword tool_resources: A set of resources that are made available to the agent's tools in
this thread. The resources are specific to the
type of tool. For example, the ``code_interpreter`` tool requires a list of file IDs, while
the ``file_search`` tool requires
a list of vector store IDs. Default value is None.
:paramtype tool_resources: ~azure.ai.projects.models.ToolResources
:keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
for storing additional information about that object in a structured format. Keys may be up to
64 characters in length and values may be up to 512 characters in length. Default value is
None.
:paramtype metadata: dict[str, str]
:return: AgentThread. The AgentThread is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.AgentThread
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def update_thread(
self, thread_id: str, body: JSON, *, content_type: str = "application/json", **kwargs: Any
) -> _models.AgentThread:
"""Modifies an existing thread.
:param thread_id: The ID of the thread to modify. Required.
:type thread_id: str
:param body: Required.
:type body: JSON
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: AgentThread. The AgentThread is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.AgentThread
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def update_thread(
self, thread_id: str, body: IO[bytes], *, content_type: str = "application/json", **kwargs: Any
) -> _models.AgentThread:
"""Modifies an existing thread.
:param thread_id: The ID of the thread to modify. Required.
:type thread_id: str
:param body: Required.
:type body: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: AgentThread. The AgentThread is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.AgentThread
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def update_thread(
self,
thread_id: str,
body: Union[JSON, IO[bytes]] = _Unset,
*,
tool_resources: Optional[_models.ToolResources] = None,
metadata: Optional[Dict[str, str]] = None,
**kwargs: Any
) -> _models.AgentThread:
"""Modifies an existing thread.
:param thread_id: The ID of the thread to modify. Required.
:type thread_id: str
:param body: Is either a JSON type or a IO[bytes] type. Required.
:type body: JSON or IO[bytes]
:keyword tool_resources: A set of resources that are made available to the agent's tools in
this thread. The resources are specific to the
type of tool. For example, the ``code_interpreter`` tool requires a list of file IDs, while
the ``file_search`` tool requires
a list of vector store IDs. Default value is None.
:paramtype tool_resources: ~azure.ai.projects.models.ToolResources
:keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
for storing additional information about that object in a structured format. Keys may be up to
64 characters in length and values may be up to 512 characters in length. Default value is
None.
:paramtype metadata: dict[str, str]
:return: AgentThread. The AgentThread is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.AgentThread
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.AgentThread] = kwargs.pop("cls", None)
if body is _Unset:
body = {"metadata": metadata, "tool_resources": tool_resources}
body = {k: v for k, v in body.items() if v is not None}
content_type = content_type or "application/json"
_content = None
if isinstance(body, (IOBase, bytes)):
_content = body
else:
_content = json.dumps(body, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore
_request = build_agents_update_thread_request(
thread_id=thread_id,
content_type=content_type,
api_version=self._config.api_version,
content=_content,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.AgentThread, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def delete_thread(self, thread_id: str, **kwargs: Any) -> _models.ThreadDeletionStatus:
"""Deletes an existing thread.
:param thread_id: Identifier of the thread. Required.
:type thread_id: str
:return: ThreadDeletionStatus. The ThreadDeletionStatus is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.ThreadDeletionStatus
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[_models.ThreadDeletionStatus] = kwargs.pop("cls", None)
_request = build_agents_delete_thread_request(
thread_id=thread_id,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.ThreadDeletionStatus, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
@overload
def create_message(
self,
thread_id: str,
*,
role: Union[str, _models.MessageRole],
content: str,
content_type: str = "application/json",
attachments: Optional[List[_models.MessageAttachment]] = None,
metadata: Optional[Dict[str, str]] = None,
**kwargs: Any
) -> _models.ThreadMessage:
"""Creates a new message on a specified thread.
:param thread_id: Identifier of the thread. Required.
:type thread_id: str
:keyword role: The role of the entity that is creating the message. Allowed values include:
* ``user``\\ : Indicates the message is sent by an actual user and should be used in most
cases to represent user-generated messages.
* ``assistant``\\ : Indicates the message is generated by the agent. Use this value to insert
messages from the agent into the
conversation. Known values are: "user" and "assistant". Required.
:paramtype role: str or ~azure.ai.projects.models.MessageRole
:keyword content: The textual content of the initial message. Currently, robust input including
images and annotated text may only be provided via
a separate call to the create message API. Required.
:paramtype content: str
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:keyword attachments: A list of files attached to the message, and the tools they should be
added to. Default value is None.
:paramtype attachments: list[~azure.ai.projects.models.MessageAttachment]
:keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
for storing additional information about that object in a structured format. Keys may be up to
64 characters in length and values may be up to 512 characters in length. Default value is
None.
:paramtype metadata: dict[str, str]
:return: ThreadMessage. The ThreadMessage is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.ThreadMessage
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def create_message(
self, thread_id: str, body: JSON, *, content_type: str = "application/json", **kwargs: Any
) -> _models.ThreadMessage:
"""Creates a new message on a specified thread.
:param thread_id: Identifier of the thread. Required.
:type thread_id: str
:param body: Required.
:type body: JSON
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: ThreadMessage. The ThreadMessage is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.ThreadMessage
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def create_message(
self, thread_id: str, body: IO[bytes], *, content_type: str = "application/json", **kwargs: Any
) -> _models.ThreadMessage:
"""Creates a new message on a specified thread.
:param thread_id: Identifier of the thread. Required.
:type thread_id: str
:param body: Required.
:type body: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: ThreadMessage. The ThreadMessage is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.ThreadMessage
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def create_message(
self,
thread_id: str,
body: Union[JSON, IO[bytes]] = _Unset,
*,
role: Union[str, _models.MessageRole] = _Unset,
content: str = _Unset,
attachments: Optional[List[_models.MessageAttachment]] = None,
metadata: Optional[Dict[str, str]] = None,
**kwargs: Any
) -> _models.ThreadMessage:
"""Creates a new message on a specified thread.
:param thread_id: Identifier of the thread. Required.
:type thread_id: str
:param body: Is either a JSON type or a IO[bytes] type. Required.
:type body: JSON or IO[bytes]
:keyword role: The role of the entity that is creating the message. Allowed values include:
* ``user``\\ : Indicates the message is sent by an actual user and should be used in most
cases to represent user-generated messages.
* ``assistant``\\ : Indicates the message is generated by the agent. Use this value to insert
messages from the agent into the
conversation. Known values are: "user" and "assistant". Required.
:paramtype role: str or ~azure.ai.projects.models.MessageRole
:keyword content: The textual content of the initial message. Currently, robust input including
images and annotated text may only be provided via
a separate call to the create message API. Required.
:paramtype content: str
:keyword attachments: A list of files attached to the message, and the tools they should be
added to. Default value is None.
:paramtype attachments: list[~azure.ai.projects.models.MessageAttachment]
:keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
for storing additional information about that object in a structured format. Keys may be up to
64 characters in length and values may be up to 512 characters in length. Default value is
None.
:paramtype metadata: dict[str, str]
:return: ThreadMessage. The ThreadMessage is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.ThreadMessage
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.ThreadMessage] = kwargs.pop("cls", None)
if body is _Unset:
if role is _Unset:
raise TypeError("missing required argument: role")
if content is _Unset:
raise TypeError("missing required argument: content")
body = {"attachments": attachments, "content": content, "metadata": metadata, "role": role}
body = {k: v for k, v in body.items() if v is not None}
content_type = content_type or "application/json"
_content = None
if isinstance(body, (IOBase, bytes)):
_content = body
else:
_content = json.dumps(body, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore
_request = build_agents_create_message_request(
thread_id=thread_id,
content_type=content_type,
api_version=self._config.api_version,
content=_content,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.ThreadMessage, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def list_messages(
self,
thread_id: str,
*,
run_id: Optional[str] = None,
limit: Optional[int] = None,
order: Optional[Union[str, _models.ListSortOrder]] = None,
after: Optional[str] = None,
before: Optional[str] = None,
**kwargs: Any
) -> _models.OpenAIPageableListOfThreadMessage:
"""Gets a list of messages that exist on a thread.
:param thread_id: Identifier of the thread. Required.
:type thread_id: str
:keyword run_id: Filter messages by the run ID that generated them. Default value is None.
:paramtype run_id: str
:keyword limit: A limit on the number of objects to be returned. Limit can range between 1 and
100, and the default is 20. Default value is None.
:paramtype limit: int
:keyword order: Sort order by the created_at timestamp of the objects. asc for ascending order
and desc for descending order. Known values are: "asc" and "desc". Default value is None.
:paramtype order: str or ~azure.ai.projects.models.ListSortOrder
:keyword after: A cursor for use in pagination. after is an object ID that defines your place
in the list. For instance, if you make a list request and receive 100 objects, ending with
obj_foo, your subsequent call can include after=obj_foo in order to fetch the next page of the
list. Default value is None.
:paramtype after: str
:keyword before: A cursor for use in pagination. before is an object ID that defines your place
in the list. For instance, if you make a list request and receive 100 objects, ending with
obj_foo, your subsequent call can include before=obj_foo in order to fetch the previous page of
the list. Default value is None.
:paramtype before: str
:return: OpenAIPageableListOfThreadMessage. The OpenAIPageableListOfThreadMessage is compatible
with MutableMapping
:rtype: ~azure.ai.projects.models.OpenAIPageableListOfThreadMessage
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[_models.OpenAIPageableListOfThreadMessage] = kwargs.pop("cls", None)
_request = build_agents_list_messages_request(
thread_id=thread_id,
run_id=run_id,
limit=limit,
order=order,
after=after,
before=before,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.OpenAIPageableListOfThreadMessage, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def get_message(self, thread_id: str, message_id: str, **kwargs: Any) -> _models.ThreadMessage:
"""Gets an existing message from an existing thread.
:param thread_id: Identifier of the thread. Required.
:type thread_id: str
:param message_id: Identifier of the message. Required.
:type message_id: str
:return: ThreadMessage. The ThreadMessage is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.ThreadMessage
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[_models.ThreadMessage] = kwargs.pop("cls", None)
_request = build_agents_get_message_request(
thread_id=thread_id,
message_id=message_id,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.ThreadMessage, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
@overload
def update_message(
self,
thread_id: str,
message_id: str,
*,
content_type: str = "application/json",
metadata: Optional[Dict[str, str]] = None,
**kwargs: Any
) -> _models.ThreadMessage:
"""Modifies an existing message on an existing thread.
:param thread_id: Identifier of the thread. Required.
:type thread_id: str
:param message_id: Identifier of the message. Required.
:type message_id: str
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
for storing additional information about that object in a structured format. Keys may be up to
64 characters in length and values may be up to 512 characters in length. Default value is
None.
:paramtype metadata: dict[str, str]
:return: ThreadMessage. The ThreadMessage is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.ThreadMessage
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def update_message(
self, thread_id: str, message_id: str, body: JSON, *, content_type: str = "application/json", **kwargs: Any
) -> _models.ThreadMessage:
"""Modifies an existing message on an existing thread.
:param thread_id: Identifier of the thread. Required.
:type thread_id: str
:param message_id: Identifier of the message. Required.
:type message_id: str
:param body: Required.
:type body: JSON
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: ThreadMessage. The ThreadMessage is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.ThreadMessage
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def update_message(
self, thread_id: str, message_id: str, body: IO[bytes], *, content_type: str = "application/json", **kwargs: Any
) -> _models.ThreadMessage:
"""Modifies an existing message on an existing thread.
:param thread_id: Identifier of the thread. Required.
:type thread_id: str
:param message_id: Identifier of the message. Required.
:type message_id: str
:param body: Required.
:type body: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: ThreadMessage. The ThreadMessage is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.ThreadMessage
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def update_message(
self,
thread_id: str,
message_id: str,
body: Union[JSON, IO[bytes]] = _Unset,
*,
metadata: Optional[Dict[str, str]] = None,
**kwargs: Any
) -> _models.ThreadMessage:
"""Modifies an existing message on an existing thread.
:param thread_id: Identifier of the thread. Required.
:type thread_id: str
:param message_id: Identifier of the message. Required.
:type message_id: str
:param body: Is either a JSON type or a IO[bytes] type. Required.
:type body: JSON or IO[bytes]
:keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
for storing additional information about that object in a structured format. Keys may be up to
64 characters in length and values may be up to 512 characters in length. Default value is
None.
:paramtype metadata: dict[str, str]
:return: ThreadMessage. The ThreadMessage is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.ThreadMessage
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.ThreadMessage] = kwargs.pop("cls", None)
if body is _Unset:
body = {"metadata": metadata}
body = {k: v for k, v in body.items() if v is not None}
content_type = content_type or "application/json"
_content = None
if isinstance(body, (IOBase, bytes)):
_content = body
else:
_content = json.dumps(body, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore
_request = build_agents_update_message_request(
thread_id=thread_id,
message_id=message_id,
content_type=content_type,
api_version=self._config.api_version,
content=_content,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.ThreadMessage, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
@overload
def create_run(
self,
thread_id: str,
*,
assistant_id: str,
include: Optional[List[Union[str, _models.RunAdditionalFieldList]]] = None,
content_type: str = "application/json",
model: Optional[str] = None,
instructions: Optional[str] = None,
additional_instructions: Optional[str] = None,
additional_messages: Optional[List[_models.ThreadMessageOptions]] = None,
tools: Optional[List[_models.ToolDefinition]] = None,
stream_parameter: Optional[bool] = None,
temperature: Optional[float] = None,
top_p: Optional[float] = None,
max_prompt_tokens: Optional[int] = None,
max_completion_tokens: Optional[int] = None,
truncation_strategy: Optional[_models.TruncationObject] = None,
tool_choice: Optional["_types.AgentsApiToolChoiceOption"] = None,
response_format: Optional["_types.AgentsApiResponseFormatOption"] = None,
parallel_tool_calls: Optional[bool] = None,
metadata: Optional[Dict[str, str]] = None,
**kwargs: Any
) -> _models.ThreadRun:
"""Creates a new run for an agent thread.
:param thread_id: Identifier of the thread. Required.
:type thread_id: str
:keyword assistant_id: The ID of the agent that should run the thread. Required.
:paramtype assistant_id: str
:keyword include: A list of additional fields to include in the response.
Currently the only supported value is
``step_details.tool_calls[*].file_search.results[*].content`` to fetch the file search result
content. Default value is None.
:paramtype include: list[str or ~azure.ai.projects.models.RunAdditionalFieldList]
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:keyword model: The overridden model name that the agent should use to run the thread. Default
value is None.
:paramtype model: str
:keyword instructions: The overridden system instructions that the agent should use to run the
thread. Default value is None.
:paramtype instructions: str
:keyword additional_instructions: Additional instructions to append at the end of the
instructions for the run. This is useful for modifying the behavior
on a per-run basis without overriding other instructions. Default value is None.
:paramtype additional_instructions: str
:keyword additional_messages: Adds additional messages to the thread before creating the run.
Default value is None.
:paramtype additional_messages: list[~azure.ai.projects.models.ThreadMessageOptions]
:keyword tools: The overridden list of enabled tools that the agent should use to run the
thread. Default value is None.
:paramtype tools: list[~azure.ai.projects.models.ToolDefinition]
:keyword stream_parameter: If ``true``\\ , returns a stream of events that happen during the
Run as server-sent events,
terminating when the Run enters a terminal state with a ``data: [DONE]`` message. Default
value is None.
:paramtype stream_parameter: bool
:keyword temperature: What sampling temperature to use, between 0 and 2. Higher values like 0.8
will make the output
more random, while lower values like 0.2 will make it more focused and deterministic. Default
value is None.
:paramtype temperature: float
:keyword top_p: An alternative to sampling with temperature, called nucleus sampling, where the
model
considers the results of the tokens with top_p probability mass. So 0.1 means only the tokens
comprising the top 10% probability mass are considered.
We generally recommend altering this or temperature but not both. Default value is None.
:paramtype top_p: float
:keyword max_prompt_tokens: The maximum number of prompt tokens that may be used over the
course of the run. The run will make a best effort to use only
the number of prompt tokens specified, across multiple turns of the run. If the run exceeds
the number of prompt tokens specified,
the run will end with status ``incomplete``. See ``incomplete_details`` for more info. Default
value is None.
:paramtype max_prompt_tokens: int
:keyword max_completion_tokens: The maximum number of completion tokens that may be used over
the course of the run. The run will make a best effort
to use only the number of completion tokens specified, across multiple turns of the run. If
the run exceeds the number of
completion tokens specified, the run will end with status ``incomplete``. See
``incomplete_details`` for more info. Default value is None.
:paramtype max_completion_tokens: int
:keyword truncation_strategy: The strategy to use for dropping messages as the context windows
moves forward. Default value is None.
:paramtype truncation_strategy: ~azure.ai.projects.models.TruncationObject
:keyword tool_choice: Controls whether or not and which tool is called by the model. Is one of
the following types: str, Union[str, "_models.AgentsApiToolChoiceOptionMode"],
AgentsNamedToolChoice Default value is None.
:paramtype tool_choice: str or str or ~azure.ai.projects.models.AgentsApiToolChoiceOptionMode
or ~azure.ai.projects.models.AgentsNamedToolChoice
:keyword response_format: Specifies the format that the model must output. Is one of the
following types: str, Union[str, "_models.AgentsApiResponseFormatMode"],
AgentsApiResponseFormat, ResponseFormatJsonSchemaType Default value is None.
:paramtype response_format: str or str or ~azure.ai.projects.models.AgentsApiResponseFormatMode
or ~azure.ai.projects.models.AgentsApiResponseFormat or
~azure.ai.projects.models.ResponseFormatJsonSchemaType
:keyword parallel_tool_calls: If ``true`` functions will run in parallel during tool use.
Default value is None.
:paramtype parallel_tool_calls: bool
:keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
for storing additional information about that object in a structured format. Keys may be up to
64 characters in length and values may be up to 512 characters in length. Default value is
None.
:paramtype metadata: dict[str, str]
:return: ThreadRun. The ThreadRun is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.ThreadRun
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def create_run(
self,
thread_id: str,
body: JSON,
*,
include: Optional[List[Union[str, _models.RunAdditionalFieldList]]] = None,
content_type: str = "application/json",
**kwargs: Any
) -> _models.ThreadRun:
"""Creates a new run for an agent thread.
:param thread_id: Identifier of the thread. Required.
:type thread_id: str
:param body: Required.
:type body: JSON
:keyword include: A list of additional fields to include in the response.
Currently the only supported value is
``step_details.tool_calls[*].file_search.results[*].content`` to fetch the file search result
content. Default value is None.
:paramtype include: list[str or ~azure.ai.projects.models.RunAdditionalFieldList]
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: ThreadRun. The ThreadRun is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.ThreadRun
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def create_run(
self,
thread_id: str,
body: IO[bytes],
*,
include: Optional[List[Union[str, _models.RunAdditionalFieldList]]] = None,
content_type: str = "application/json",
**kwargs: Any
) -> _models.ThreadRun:
"""Creates a new run for an agent thread.
:param thread_id: Identifier of the thread. Required.
:type thread_id: str
:param body: Required.
:type body: IO[bytes]
:keyword include: A list of additional fields to include in the response.
Currently the only supported value is
``step_details.tool_calls[*].file_search.results[*].content`` to fetch the file search result
content. Default value is None.
:paramtype include: list[str or ~azure.ai.projects.models.RunAdditionalFieldList]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: ThreadRun. The ThreadRun is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.ThreadRun
:raises ~azure.core.exceptions.HttpResponseError:
"""
@distributed_trace
def create_run(
self,
thread_id: str,
body: Union[JSON, IO[bytes]] = _Unset,
*,
assistant_id: str = _Unset,
include: Optional[List[Union[str, _models.RunAdditionalFieldList]]] = None,
model: Optional[str] = None,
instructions: Optional[str] = None,
additional_instructions: Optional[str] = None,
additional_messages: Optional[List[_models.ThreadMessageOptions]] = None,
tools: Optional[List[_models.ToolDefinition]] = None,
stream_parameter: Optional[bool] = None,
temperature: Optional[float] = None,
top_p: Optional[float] = None,
max_prompt_tokens: Optional[int] = None,
max_completion_tokens: Optional[int] = None,
truncation_strategy: Optional[_models.TruncationObject] = None,
tool_choice: Optional["_types.AgentsApiToolChoiceOption"] = None,
response_format: Optional["_types.AgentsApiResponseFormatOption"] = None,
parallel_tool_calls: Optional[bool] = None,
metadata: Optional[Dict[str, str]] = None,
**kwargs: Any
) -> _models.ThreadRun:
"""Creates a new run for an agent thread.
:param thread_id: Identifier of the thread. Required.
:type thread_id: str
:param body: Is either a JSON type or a IO[bytes] type. Required.
:type body: JSON or IO[bytes]
:keyword assistant_id: The ID of the agent that should run the thread. Required.
:paramtype assistant_id: str
:keyword include: A list of additional fields to include in the response.
Currently the only supported value is
``step_details.tool_calls[*].file_search.results[*].content`` to fetch the file search result
content. Default value is None.
:paramtype include: list[str or ~azure.ai.projects.models.RunAdditionalFieldList]
:keyword model: The overridden model name that the agent should use to run the thread. Default
value is None.
:paramtype model: str
:keyword instructions: The overridden system instructions that the agent should use to run the
thread. Default value is None.
:paramtype instructions: str
:keyword additional_instructions: Additional instructions to append at the end of the
instructions for the run. This is useful for modifying the behavior
on a per-run basis without overriding other instructions. Default value is None.
:paramtype additional_instructions: str
:keyword additional_messages: Adds additional messages to the thread before creating the run.
Default value is None.
:paramtype additional_messages: list[~azure.ai.projects.models.ThreadMessageOptions]
:keyword tools: The overridden list of enabled tools that the agent should use to run the
thread. Default value is None.
:paramtype tools: list[~azure.ai.projects.models.ToolDefinition]
:keyword stream_parameter: If ``true``\\ , returns a stream of events that happen during the
Run as server-sent events,
terminating when the Run enters a terminal state with a ``data: [DONE]`` message. Default
value is None.
:paramtype stream_parameter: bool
:keyword temperature: What sampling temperature to use, between 0 and 2. Higher values like 0.8
will make the output
more random, while lower values like 0.2 will make it more focused and deterministic. Default
value is None.
:paramtype temperature: float
:keyword top_p: An alternative to sampling with temperature, called nucleus sampling, where the
model
considers the results of the tokens with top_p probability mass. So 0.1 means only the tokens
comprising the top 10% probability mass are considered.
We generally recommend altering this or temperature but not both. Default value is None.
:paramtype top_p: float
:keyword max_prompt_tokens: The maximum number of prompt tokens that may be used over the
course of the run. The run will make a best effort to use only
the number of prompt tokens specified, across multiple turns of the run. If the run exceeds
the number of prompt tokens specified,
the run will end with status ``incomplete``. See ``incomplete_details`` for more info. Default
value is None.
:paramtype max_prompt_tokens: int
:keyword max_completion_tokens: The maximum number of completion tokens that may be used over
the course of the run. The run will make a best effort
to use only the number of completion tokens specified, across multiple turns of the run. If
the run exceeds the number of
completion tokens specified, the run will end with status ``incomplete``. See
``incomplete_details`` for more info. Default value is None.
:paramtype max_completion_tokens: int
:keyword truncation_strategy: The strategy to use for dropping messages as the context windows
moves forward. Default value is None.
:paramtype truncation_strategy: ~azure.ai.projects.models.TruncationObject
:keyword tool_choice: Controls whether or not and which tool is called by the model. Is one of
the following types: str, Union[str, "_models.AgentsApiToolChoiceOptionMode"],
AgentsNamedToolChoice Default value is None.
:paramtype tool_choice: str or str or ~azure.ai.projects.models.AgentsApiToolChoiceOptionMode
or ~azure.ai.projects.models.AgentsNamedToolChoice
:keyword response_format: Specifies the format that the model must output. Is one of the
following types: str, Union[str, "_models.AgentsApiResponseFormatMode"],
AgentsApiResponseFormat, ResponseFormatJsonSchemaType Default value is None.
:paramtype response_format: str or str or ~azure.ai.projects.models.AgentsApiResponseFormatMode
or ~azure.ai.projects.models.AgentsApiResponseFormat or
~azure.ai.projects.models.ResponseFormatJsonSchemaType
:keyword parallel_tool_calls: If ``true`` functions will run in parallel during tool use.
Default value is None.
:paramtype parallel_tool_calls: bool
:keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
for storing additional information about that object in a structured format. Keys may be up to
64 characters in length and values may be up to 512 characters in length. Default value is
None.
:paramtype metadata: dict[str, str]
:return: ThreadRun. The ThreadRun is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.ThreadRun
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.ThreadRun] = kwargs.pop("cls", None)
if body is _Unset:
if assistant_id is _Unset:
raise TypeError("missing required argument: assistant_id")
body = {
"additional_instructions": additional_instructions,
"additional_messages": additional_messages,
"assistant_id": assistant_id,
"instructions": instructions,
"max_completion_tokens": max_completion_tokens,
"max_prompt_tokens": max_prompt_tokens,
"metadata": metadata,
"model": model,
"parallel_tool_calls": parallel_tool_calls,
"response_format": response_format,
"stream": stream_parameter,
"temperature": temperature,
"tool_choice": tool_choice,
"tools": tools,
"top_p": top_p,
"truncation_strategy": truncation_strategy,
}
body = {k: v for k, v in body.items() if v is not None}
content_type = content_type or "application/json"
_content = None
if isinstance(body, (IOBase, bytes)):
_content = body
else:
_content = json.dumps(body, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore
_request = build_agents_create_run_request(
thread_id=thread_id,
include=include,
content_type=content_type,
api_version=self._config.api_version,
content=_content,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.ThreadRun, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def list_runs(
self,
thread_id: str,
*,
limit: Optional[int] = None,
order: Optional[Union[str, _models.ListSortOrder]] = None,
after: Optional[str] = None,
before: Optional[str] = None,
**kwargs: Any
) -> _models.OpenAIPageableListOfThreadRun:
"""Gets a list of runs for a specified thread.
:param thread_id: Identifier of the thread. Required.
:type thread_id: str
:keyword limit: A limit on the number of objects to be returned. Limit can range between 1 and
100, and the default is 20. Default value is None.
:paramtype limit: int
:keyword order: Sort order by the created_at timestamp of the objects. asc for ascending order
and desc for descending order. Known values are: "asc" and "desc". Default value is None.
:paramtype order: str or ~azure.ai.projects.models.ListSortOrder
:keyword after: A cursor for use in pagination. after is an object ID that defines your place
in the list. For instance, if you make a list request and receive 100 objects, ending with
obj_foo, your subsequent call can include after=obj_foo in order to fetch the next page of the
list. Default value is None.
:paramtype after: str
:keyword before: A cursor for use in pagination. before is an object ID that defines your place
in the list. For instance, if you make a list request and receive 100 objects, ending with
obj_foo, your subsequent call can include before=obj_foo in order to fetch the previous page of
the list. Default value is None.
:paramtype before: str
:return: OpenAIPageableListOfThreadRun. The OpenAIPageableListOfThreadRun is compatible with
MutableMapping
:rtype: ~azure.ai.projects.models.OpenAIPageableListOfThreadRun
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[_models.OpenAIPageableListOfThreadRun] = kwargs.pop("cls", None)
_request = build_agents_list_runs_request(
thread_id=thread_id,
limit=limit,
order=order,
after=after,
before=before,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.OpenAIPageableListOfThreadRun, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def get_run(self, thread_id: str, run_id: str, **kwargs: Any) -> _models.ThreadRun:
"""Gets an existing run from an existing thread.
:param thread_id: Identifier of the thread. Required.
:type thread_id: str
:param run_id: Identifier of the run. Required.
:type run_id: str
:return: ThreadRun. The ThreadRun is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.ThreadRun
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[_models.ThreadRun] = kwargs.pop("cls", None)
_request = build_agents_get_run_request(
thread_id=thread_id,
run_id=run_id,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.ThreadRun, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
@overload
def update_run(
self,
thread_id: str,
run_id: str,
*,
content_type: str = "application/json",
metadata: Optional[Dict[str, str]] = None,
**kwargs: Any
) -> _models.ThreadRun:
"""Modifies an existing thread run.
:param thread_id: Identifier of the thread. Required.
:type thread_id: str
:param run_id: Identifier of the run. Required.
:type run_id: str
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
for storing additional information about that object in a structured format. Keys may be up to
64 characters in length and values may be up to 512 characters in length. Default value is
None.
:paramtype metadata: dict[str, str]
:return: ThreadRun. The ThreadRun is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.ThreadRun
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def update_run(
self, thread_id: str, run_id: str, body: JSON, *, content_type: str = "application/json", **kwargs: Any
) -> _models.ThreadRun:
"""Modifies an existing thread run.
:param thread_id: Identifier of the thread. Required.
:type thread_id: str
:param run_id: Identifier of the run. Required.
:type run_id: str
:param body: Required.
:type body: JSON
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: ThreadRun. The ThreadRun is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.ThreadRun
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def update_run(
self, thread_id: str, run_id: str, body: IO[bytes], *, content_type: str = "application/json", **kwargs: Any
) -> _models.ThreadRun:
"""Modifies an existing thread run.
:param thread_id: Identifier of the thread. Required.
:type thread_id: str
:param run_id: Identifier of the run. Required.
:type run_id: str
:param body: Required.
:type body: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: ThreadRun. The ThreadRun is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.ThreadRun
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def update_run(
self,
thread_id: str,
run_id: str,
body: Union[JSON, IO[bytes]] = _Unset,
*,
metadata: Optional[Dict[str, str]] = None,
**kwargs: Any
) -> _models.ThreadRun:
"""Modifies an existing thread run.
:param thread_id: Identifier of the thread. Required.
:type thread_id: str
:param run_id: Identifier of the run. Required.
:type run_id: str
:param body: Is either a JSON type or a IO[bytes] type. Required.
:type body: JSON or IO[bytes]
:keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
for storing additional information about that object in a structured format. Keys may be up to
64 characters in length and values may be up to 512 characters in length. Default value is
None.
:paramtype metadata: dict[str, str]
:return: ThreadRun. The ThreadRun is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.ThreadRun
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.ThreadRun] = kwargs.pop("cls", None)
if body is _Unset:
body = {"metadata": metadata}
body = {k: v for k, v in body.items() if v is not None}
content_type = content_type or "application/json"
_content = None
if isinstance(body, (IOBase, bytes)):
_content = body
else:
_content = json.dumps(body, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore
_request = build_agents_update_run_request(
thread_id=thread_id,
run_id=run_id,
content_type=content_type,
api_version=self._config.api_version,
content=_content,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.ThreadRun, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
@overload
def submit_tool_outputs_to_run(
self,
thread_id: str,
run_id: str,
*,
tool_outputs: List[_models.ToolOutput],
content_type: str = "application/json",
stream_parameter: Optional[bool] = None,
**kwargs: Any
) -> _models.ThreadRun:
"""Submits outputs from tools as requested by tool calls in a run. Runs that need submitted tool
outputs will have a status of 'requires_action' with a required_action.type of
'submit_tool_outputs'.
:param thread_id: Identifier of the thread. Required.
:type thread_id: str
:param run_id: Identifier of the run. Required.
:type run_id: str
:keyword tool_outputs: A list of tools for which the outputs are being submitted. Required.
:paramtype tool_outputs: list[~azure.ai.projects.models.ToolOutput]
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:keyword stream_parameter: If true, returns a stream of events that happen during the Run as
server-sent events, terminating when the run enters a terminal state. Default value is None.
:paramtype stream_parameter: bool
:return: ThreadRun. The ThreadRun is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.ThreadRun
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def submit_tool_outputs_to_run(
self, thread_id: str, run_id: str, body: JSON, *, content_type: str = "application/json", **kwargs: Any
) -> _models.ThreadRun:
"""Submits outputs from tools as requested by tool calls in a run. Runs that need submitted tool
outputs will have a status of 'requires_action' with a required_action.type of
'submit_tool_outputs'.
:param thread_id: Identifier of the thread. Required.
:type thread_id: str
:param run_id: Identifier of the run. Required.
:type run_id: str
:param body: Required.
:type body: JSON
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: ThreadRun. The ThreadRun is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.ThreadRun
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def submit_tool_outputs_to_run(
self, thread_id: str, run_id: str, body: IO[bytes], *, content_type: str = "application/json", **kwargs: Any
) -> _models.ThreadRun:
"""Submits outputs from tools as requested by tool calls in a run. Runs that need submitted tool
outputs will have a status of 'requires_action' with a required_action.type of
'submit_tool_outputs'.
:param thread_id: Identifier of the thread. Required.
:type thread_id: str
:param run_id: Identifier of the run. Required.
:type run_id: str
:param body: Required.
:type body: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: ThreadRun. The ThreadRun is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.ThreadRun
:raises ~azure.core.exceptions.HttpResponseError:
"""
@distributed_trace
def submit_tool_outputs_to_run(
self,
thread_id: str,
run_id: str,
body: Union[JSON, IO[bytes]] = _Unset,
*,
tool_outputs: List[_models.ToolOutput] = _Unset,
stream_parameter: Optional[bool] = None,
**kwargs: Any
) -> _models.ThreadRun:
"""Submits outputs from tools as requested by tool calls in a run. Runs that need submitted tool
outputs will have a status of 'requires_action' with a required_action.type of
'submit_tool_outputs'.
:param thread_id: Identifier of the thread. Required.
:type thread_id: str
:param run_id: Identifier of the run. Required.
:type run_id: str
:param body: Is either a JSON type or a IO[bytes] type. Required.
:type body: JSON or IO[bytes]
:keyword tool_outputs: A list of tools for which the outputs are being submitted. Required.
:paramtype tool_outputs: list[~azure.ai.projects.models.ToolOutput]
:keyword stream_parameter: If true, returns a stream of events that happen during the Run as
server-sent events, terminating when the run enters a terminal state. Default value is None.
:paramtype stream_parameter: bool
:return: ThreadRun. The ThreadRun is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.ThreadRun
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.ThreadRun] = kwargs.pop("cls", None)
if body is _Unset:
if tool_outputs is _Unset:
raise TypeError("missing required argument: tool_outputs")
body = {"stream": stream_parameter, "tool_outputs": tool_outputs}
body = {k: v for k, v in body.items() if v is not None}
content_type = content_type or "application/json"
_content = None
if isinstance(body, (IOBase, bytes)):
_content = body
else:
_content = json.dumps(body, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore
_request = build_agents_submit_tool_outputs_to_run_request(
thread_id=thread_id,
run_id=run_id,
content_type=content_type,
api_version=self._config.api_version,
content=_content,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.ThreadRun, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def cancel_run(self, thread_id: str, run_id: str, **kwargs: Any) -> _models.ThreadRun:
"""Cancels a run of an in progress thread.
:param thread_id: Identifier of the thread. Required.
:type thread_id: str
:param run_id: Identifier of the run. Required.
:type run_id: str
:return: ThreadRun. The ThreadRun is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.ThreadRun
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[_models.ThreadRun] = kwargs.pop("cls", None)
_request = build_agents_cancel_run_request(
thread_id=thread_id,
run_id=run_id,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.ThreadRun, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
@overload
def create_thread_and_run(
self,
*,
assistant_id: str,
content_type: str = "application/json",
thread: Optional[_models.AgentThreadCreationOptions] = None,
model: Optional[str] = None,
instructions: Optional[str] = None,
tools: Optional[List[_models.ToolDefinition]] = None,
tool_resources: Optional[_models.UpdateToolResourcesOptions] = None,
stream_parameter: Optional[bool] = None,
temperature: Optional[float] = None,
top_p: Optional[float] = None,
max_prompt_tokens: Optional[int] = None,
max_completion_tokens: Optional[int] = None,
truncation_strategy: Optional[_models.TruncationObject] = None,
tool_choice: Optional["_types.AgentsApiToolChoiceOption"] = None,
response_format: Optional["_types.AgentsApiResponseFormatOption"] = None,
parallel_tool_calls: Optional[bool] = None,
metadata: Optional[Dict[str, str]] = None,
**kwargs: Any
) -> _models.ThreadRun:
"""Creates a new agent thread and immediately starts a run using that new thread.
:keyword assistant_id: The ID of the agent for which the thread should be created. Required.
:paramtype assistant_id: str
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:keyword thread: The details used to create the new thread. If no thread is provided, an empty
one will be created. Default value is None.
:paramtype thread: ~azure.ai.projects.models.AgentThreadCreationOptions
:keyword model: The overridden model that the agent should use to run the thread. Default value
is None.
:paramtype model: str
:keyword instructions: The overridden system instructions the agent should use to run the
thread. Default value is None.
:paramtype instructions: str
:keyword tools: The overridden list of enabled tools the agent should use to run the thread.
Default value is None.
:paramtype tools: list[~azure.ai.projects.models.ToolDefinition]
:keyword tool_resources: Override the tools the agent can use for this run. This is useful for
modifying the behavior on a per-run basis. Default value is None.
:paramtype tool_resources: ~azure.ai.projects.models.UpdateToolResourcesOptions
:keyword stream_parameter: If ``true``\\ , returns a stream of events that happen during the
Run as server-sent events,
terminating when the Run enters a terminal state with a ``data: [DONE]`` message. Default
value is None.
:paramtype stream_parameter: bool
:keyword temperature: What sampling temperature to use, between 0 and 2. Higher values like 0.8
will make the output
more random, while lower values like 0.2 will make it more focused and deterministic. Default
value is None.
:paramtype temperature: float
:keyword top_p: An alternative to sampling with temperature, called nucleus sampling, where the
model
considers the results of the tokens with top_p probability mass. So 0.1 means only the tokens
comprising the top 10% probability mass are considered.
We generally recommend altering this or temperature but not both. Default value is None.
:paramtype top_p: float
:keyword max_prompt_tokens: The maximum number of prompt tokens that may be used over the
course of the run. The run will make a best effort to use only
the number of prompt tokens specified, across multiple turns of the run. If the run exceeds
the number of prompt tokens specified,
the run will end with status ``incomplete``. See ``incomplete_details`` for more info. Default
value is None.
:paramtype max_prompt_tokens: int
:keyword max_completion_tokens: The maximum number of completion tokens that may be used over
the course of the run. The run will make a best effort to use only
the number of completion tokens specified, across multiple turns of the run. If the run
exceeds the number of completion tokens
specified, the run will end with status ``incomplete``. See ``incomplete_details`` for more
info. Default value is None.
:paramtype max_completion_tokens: int
:keyword truncation_strategy: The strategy to use for dropping messages as the context windows
moves forward. Default value is None.
:paramtype truncation_strategy: ~azure.ai.projects.models.TruncationObject
:keyword tool_choice: Controls whether or not and which tool is called by the model. Is one of
the following types: str, Union[str, "_models.AgentsApiToolChoiceOptionMode"],
AgentsNamedToolChoice Default value is None.
:paramtype tool_choice: str or str or ~azure.ai.projects.models.AgentsApiToolChoiceOptionMode
or ~azure.ai.projects.models.AgentsNamedToolChoice
:keyword response_format: Specifies the format that the model must output. Is one of the
following types: str, Union[str, "_models.AgentsApiResponseFormatMode"],
AgentsApiResponseFormat, ResponseFormatJsonSchemaType Default value is None.
:paramtype response_format: str or str or ~azure.ai.projects.models.AgentsApiResponseFormatMode
or ~azure.ai.projects.models.AgentsApiResponseFormat or
~azure.ai.projects.models.ResponseFormatJsonSchemaType
:keyword parallel_tool_calls: If ``true`` functions will run in parallel during tool use.
Default value is None.
:paramtype parallel_tool_calls: bool
:keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
for storing additional information about that object in a structured format. Keys may be up to
64 characters in length and values may be up to 512 characters in length. Default value is
None.
:paramtype metadata: dict[str, str]
:return: ThreadRun. The ThreadRun is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.ThreadRun
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def create_thread_and_run(
self, body: JSON, *, content_type: str = "application/json", **kwargs: Any
) -> _models.ThreadRun:
"""Creates a new agent thread and immediately starts a run using that new thread.
:param body: Required.
:type body: JSON
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: ThreadRun. The ThreadRun is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.ThreadRun
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def create_thread_and_run(
self, body: IO[bytes], *, content_type: str = "application/json", **kwargs: Any
) -> _models.ThreadRun:
"""Creates a new agent thread and immediately starts a run using that new thread.
:param body: Required.
:type body: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: ThreadRun. The ThreadRun is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.ThreadRun
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def create_thread_and_run(
self,
body: Union[JSON, IO[bytes]] = _Unset,
*,
assistant_id: str = _Unset,
thread: Optional[_models.AgentThreadCreationOptions] = None,
model: Optional[str] = None,
instructions: Optional[str] = None,
tools: Optional[List[_models.ToolDefinition]] = None,
tool_resources: Optional[_models.UpdateToolResourcesOptions] = None,
stream_parameter: Optional[bool] = None,
temperature: Optional[float] = None,
top_p: Optional[float] = None,
max_prompt_tokens: Optional[int] = None,
max_completion_tokens: Optional[int] = None,
truncation_strategy: Optional[_models.TruncationObject] = None,
tool_choice: Optional["_types.AgentsApiToolChoiceOption"] = None,
response_format: Optional["_types.AgentsApiResponseFormatOption"] = None,
parallel_tool_calls: Optional[bool] = None,
metadata: Optional[Dict[str, str]] = None,
**kwargs: Any
) -> _models.ThreadRun:
"""Creates a new agent thread and immediately starts a run using that new thread.
:param body: Is either a JSON type or a IO[bytes] type. Required.
:type body: JSON or IO[bytes]
:keyword assistant_id: The ID of the agent for which the thread should be created. Required.
:paramtype assistant_id: str
:keyword thread: The details used to create the new thread. If no thread is provided, an empty
one will be created. Default value is None.
:paramtype thread: ~azure.ai.projects.models.AgentThreadCreationOptions
:keyword model: The overridden model that the agent should use to run the thread. Default value
is None.
:paramtype model: str
:keyword instructions: The overridden system instructions the agent should use to run the
thread. Default value is None.
:paramtype instructions: str
:keyword tools: The overridden list of enabled tools the agent should use to run the thread.
Default value is None.
:paramtype tools: list[~azure.ai.projects.models.ToolDefinition]
:keyword tool_resources: Override the tools the agent can use for this run. This is useful for
modifying the behavior on a per-run basis. Default value is None.
:paramtype tool_resources: ~azure.ai.projects.models.UpdateToolResourcesOptions
:keyword stream_parameter: If ``true``\\ , returns a stream of events that happen during the
Run as server-sent events,
terminating when the Run enters a terminal state with a ``data: [DONE]`` message. Default
value is None.
:paramtype stream_parameter: bool
:keyword temperature: What sampling temperature to use, between 0 and 2. Higher values like 0.8
will make the output
more random, while lower values like 0.2 will make it more focused and deterministic. Default
value is None.
:paramtype temperature: float
:keyword top_p: An alternative to sampling with temperature, called nucleus sampling, where the
model
considers the results of the tokens with top_p probability mass. So 0.1 means only the tokens
comprising the top 10% probability mass are considered.
We generally recommend altering this or temperature but not both. Default value is None.
:paramtype top_p: float
:keyword max_prompt_tokens: The maximum number of prompt tokens that may be used over the
course of the run. The run will make a best effort to use only
the number of prompt tokens specified, across multiple turns of the run. If the run exceeds
the number of prompt tokens specified,
the run will end with status ``incomplete``. See ``incomplete_details`` for more info. Default
value is None.
:paramtype max_prompt_tokens: int
:keyword max_completion_tokens: The maximum number of completion tokens that may be used over
the course of the run. The run will make a best effort to use only
the number of completion tokens specified, across multiple turns of the run. If the run
exceeds the number of completion tokens
specified, the run will end with status ``incomplete``. See ``incomplete_details`` for more
info. Default value is None.
:paramtype max_completion_tokens: int
:keyword truncation_strategy: The strategy to use for dropping messages as the context windows
moves forward. Default value is None.
:paramtype truncation_strategy: ~azure.ai.projects.models.TruncationObject
:keyword tool_choice: Controls whether or not and which tool is called by the model. Is one of
the following types: str, Union[str, "_models.AgentsApiToolChoiceOptionMode"],
AgentsNamedToolChoice Default value is None.
:paramtype tool_choice: str or str or ~azure.ai.projects.models.AgentsApiToolChoiceOptionMode
or ~azure.ai.projects.models.AgentsNamedToolChoice
:keyword response_format: Specifies the format that the model must output. Is one of the
following types: str, Union[str, "_models.AgentsApiResponseFormatMode"],
AgentsApiResponseFormat, ResponseFormatJsonSchemaType Default value is None.
:paramtype response_format: str or str or ~azure.ai.projects.models.AgentsApiResponseFormatMode
or ~azure.ai.projects.models.AgentsApiResponseFormat or
~azure.ai.projects.models.ResponseFormatJsonSchemaType
:keyword parallel_tool_calls: If ``true`` functions will run in parallel during tool use.
Default value is None.
:paramtype parallel_tool_calls: bool
:keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
for storing additional information about that object in a structured format. Keys may be up to
64 characters in length and values may be up to 512 characters in length. Default value is
None.
:paramtype metadata: dict[str, str]
:return: ThreadRun. The ThreadRun is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.ThreadRun
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.ThreadRun] = kwargs.pop("cls", None)
if body is _Unset:
if assistant_id is _Unset:
raise TypeError("missing required argument: assistant_id")
body = {
"assistant_id": assistant_id,
"instructions": instructions,
"max_completion_tokens": max_completion_tokens,
"max_prompt_tokens": max_prompt_tokens,
"metadata": metadata,
"model": model,
"parallel_tool_calls": parallel_tool_calls,
"response_format": response_format,
"stream": stream_parameter,
"temperature": temperature,
"thread": thread,
"tool_choice": tool_choice,
"tool_resources": tool_resources,
"tools": tools,
"top_p": top_p,
"truncation_strategy": truncation_strategy,
}
body = {k: v for k, v in body.items() if v is not None}
content_type = content_type or "application/json"
_content = None
if isinstance(body, (IOBase, bytes)):
_content = body
else:
_content = json.dumps(body, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore
_request = build_agents_create_thread_and_run_request(
content_type=content_type,
api_version=self._config.api_version,
content=_content,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.ThreadRun, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def get_run_step(
self,
thread_id: str,
run_id: str,
step_id: str,
*,
include: Optional[List[Union[str, _models.RunAdditionalFieldList]]] = None,
**kwargs: Any
) -> _models.RunStep:
"""Gets a single run step from a thread run.
:param thread_id: Identifier of the thread. Required.
:type thread_id: str
:param run_id: Identifier of the run. Required.
:type run_id: str
:param step_id: Identifier of the run step. Required.
:type step_id: str
:keyword include: A list of additional fields to include in the response.
Currently the only supported value is
``step_details.tool_calls[*].file_search.results[*].content`` to fetch the file search result
content. Default value is None.
:paramtype include: list[str or ~azure.ai.projects.models.RunAdditionalFieldList]
:return: RunStep. The RunStep is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.RunStep
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[_models.RunStep] = kwargs.pop("cls", None)
_request = build_agents_get_run_step_request(
thread_id=thread_id,
run_id=run_id,
step_id=step_id,
include=include,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.RunStep, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def list_run_steps(
self,
thread_id: str,
run_id: str,
*,
include: Optional[List[Union[str, _models.RunAdditionalFieldList]]] = None,
limit: Optional[int] = None,
order: Optional[Union[str, _models.ListSortOrder]] = None,
after: Optional[str] = None,
before: Optional[str] = None,
**kwargs: Any
) -> _models.OpenAIPageableListOfRunStep:
"""Gets a list of run steps from a thread run.
:param thread_id: Identifier of the thread. Required.
:type thread_id: str
:param run_id: Identifier of the run. Required.
:type run_id: str
:keyword include: A list of additional fields to include in the response.
Currently the only supported value is
``step_details.tool_calls[*].file_search.results[*].content`` to fetch the file search result
content. Default value is None.
:paramtype include: list[str or ~azure.ai.projects.models.RunAdditionalFieldList]
:keyword limit: A limit on the number of objects to be returned. Limit can range between 1 and
100, and the default is 20. Default value is None.
:paramtype limit: int
:keyword order: Sort order by the created_at timestamp of the objects. asc for ascending order
and desc for descending order. Known values are: "asc" and "desc". Default value is None.
:paramtype order: str or ~azure.ai.projects.models.ListSortOrder
:keyword after: A cursor for use in pagination. after is an object ID that defines your place
in the list. For instance, if you make a list request and receive 100 objects, ending with
obj_foo, your subsequent call can include after=obj_foo in order to fetch the next page of the
list. Default value is None.
:paramtype after: str
:keyword before: A cursor for use in pagination. before is an object ID that defines your place
in the list. For instance, if you make a list request and receive 100 objects, ending with
obj_foo, your subsequent call can include before=obj_foo in order to fetch the previous page of
the list. Default value is None.
:paramtype before: str
:return: OpenAIPageableListOfRunStep. The OpenAIPageableListOfRunStep is compatible with
MutableMapping
:rtype: ~azure.ai.projects.models.OpenAIPageableListOfRunStep
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[_models.OpenAIPageableListOfRunStep] = kwargs.pop("cls", None)
_request = build_agents_list_run_steps_request(
thread_id=thread_id,
run_id=run_id,
include=include,
limit=limit,
order=order,
after=after,
before=before,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.OpenAIPageableListOfRunStep, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def list_files(
self, *, purpose: Optional[Union[str, _models.FilePurpose]] = None, **kwargs: Any
) -> _models.FileListResponse:
"""Gets a list of previously uploaded files.
:keyword purpose: The purpose of the file. Known values are: "fine-tune", "fine-tune-results",
"assistants", "assistants_output", "batch", "batch_output", and "vision". Default value is
None.
:paramtype purpose: str or ~azure.ai.projects.models.FilePurpose
:return: FileListResponse. The FileListResponse is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.FileListResponse
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[_models.FileListResponse] = kwargs.pop("cls", None)
_request = build_agents_list_files_request(
purpose=purpose,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.FileListResponse, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
@overload
def upload_file(
self, *, file: FileType, purpose: Union[str, _models.FilePurpose], filename: Optional[str] = None, **kwargs: Any
) -> _models.OpenAIFile:
"""Uploads a file for use by other operations.
:keyword file: The file data, in bytes. Required.
:paramtype file: ~azure.ai.projects._vendor.FileType
:keyword purpose: The intended purpose of the uploaded file. Use ``assistants`` for Agents and
Message files, ``vision`` for Agents image file inputs, ``batch`` for Batch API, and
``fine-tune`` for Fine-tuning. Known values are: "fine-tune", "fine-tune-results",
"assistants", "assistants_output", "batch", "batch_output", and "vision". Required.
:paramtype purpose: str or ~azure.ai.projects.models.FilePurpose
:keyword filename: The name of the file. Default value is None.
:paramtype filename: str
:return: OpenAIFile. The OpenAIFile is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.OpenAIFile
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def upload_file(self, body: JSON, **kwargs: Any) -> _models.OpenAIFile:
"""Uploads a file for use by other operations.
:param body: Required.
:type body: JSON
:return: OpenAIFile. The OpenAIFile is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.OpenAIFile
:raises ~azure.core.exceptions.HttpResponseError:
"""
@distributed_trace
def upload_file(
self,
body: JSON = _Unset,
*,
file: FileType = _Unset,
purpose: Union[str, _models.FilePurpose] = _Unset,
filename: Optional[str] = None,
**kwargs: Any
) -> _models.OpenAIFile:
"""Uploads a file for use by other operations.
:param body: Is one of the following types: JSON Required.
:type body: JSON
:keyword file: The file data, in bytes. Required.
:paramtype file: ~azure.ai.projects._vendor.FileType
:keyword purpose: The intended purpose of the uploaded file. Use ``assistants`` for Agents and
Message files, ``vision`` for Agents image file inputs, ``batch`` for Batch API, and
``fine-tune`` for Fine-tuning. Known values are: "fine-tune", "fine-tune-results",
"assistants", "assistants_output", "batch", "batch_output", and "vision". Required.
:paramtype purpose: str or ~azure.ai.projects.models.FilePurpose
:keyword filename: The name of the file. Default value is None.
:paramtype filename: str
:return: OpenAIFile. The OpenAIFile is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.OpenAIFile
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[_models.OpenAIFile] = kwargs.pop("cls", None)
if body is _Unset:
if file is _Unset:
raise TypeError("missing required argument: file")
if purpose is _Unset:
raise TypeError("missing required argument: purpose")
body = {"file": file, "filename": filename, "purpose": purpose}
body = {k: v for k, v in body.items() if v is not None}
_body = body.as_dict() if isinstance(body, _model_base.Model) else body
_file_fields: List[str] = ["file"]
_data_fields: List[str] = ["purpose", "filename"]
_files, _data = prepare_multipart_form_data(_body, _file_fields, _data_fields)
_request = build_agents_upload_file_request(
api_version=self._config.api_version,
files=_files,
data=_data,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.OpenAIFile, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def delete_file(self, file_id: str, **kwargs: Any) -> _models.FileDeletionStatus:
"""Delete a previously uploaded file.
:param file_id: The ID of the file to delete. Required.
:type file_id: str
:return: FileDeletionStatus. The FileDeletionStatus is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.FileDeletionStatus
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[_models.FileDeletionStatus] = kwargs.pop("cls", None)
_request = build_agents_delete_file_request(
file_id=file_id,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.FileDeletionStatus, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def get_file(self, file_id: str, **kwargs: Any) -> _models.OpenAIFile:
"""Returns information about a specific file. Does not retrieve file content.
:param file_id: The ID of the file to retrieve. Required.
:type file_id: str
:return: OpenAIFile. The OpenAIFile is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.OpenAIFile
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[_models.OpenAIFile] = kwargs.pop("cls", None)
_request = build_agents_get_file_request(
file_id=file_id,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.OpenAIFile, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
@distributed_trace
def _get_file_content(self, file_id: str, **kwargs: Any) -> bytes:
"""Retrieves the raw content of a specific file.
:param file_id: The ID of the file to retrieve. Required.
:type file_id: str
:return: bytes
:rtype: bytes
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[bytes] = kwargs.pop("cls", None)
_request = build_agents_get_file_content_request(
file_id=file_id,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(bytes, response.json(), format="base64")
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def list_vector_stores(
self,
*,
limit: Optional[int] = None,
order: Optional[Union[str, _models.ListSortOrder]] = None,
after: Optional[str] = None,
before: Optional[str] = None,
**kwargs: Any
) -> _models.OpenAIPageableListOfVectorStore:
"""Returns a list of vector stores.
:keyword limit: A limit on the number of objects to be returned. Limit can range between 1 and
100, and the default is 20. Default value is None.
:paramtype limit: int
:keyword order: Sort order by the created_at timestamp of the objects. asc for ascending order
and desc for descending order. Known values are: "asc" and "desc". Default value is None.
:paramtype order: str or ~azure.ai.projects.models.ListSortOrder
:keyword after: A cursor for use in pagination. after is an object ID that defines your place
in the list. For instance, if you make a list request and receive 100 objects, ending with
obj_foo, your subsequent call can include after=obj_foo in order to fetch the next page of the
list. Default value is None.
:paramtype after: str
:keyword before: A cursor for use in pagination. before is an object ID that defines your place
in the list. For instance, if you make a list request and receive 100 objects, ending with
obj_foo, your subsequent call can include before=obj_foo in order to fetch the previous page of
the list. Default value is None.
:paramtype before: str
:return: OpenAIPageableListOfVectorStore. The OpenAIPageableListOfVectorStore is compatible
with MutableMapping
:rtype: ~azure.ai.projects.models.OpenAIPageableListOfVectorStore
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[_models.OpenAIPageableListOfVectorStore] = kwargs.pop("cls", None)
_request = build_agents_list_vector_stores_request(
limit=limit,
order=order,
after=after,
before=before,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.OpenAIPageableListOfVectorStore, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
@overload
def create_vector_store(
self,
*,
content_type: str = "application/json",
file_ids: Optional[List[str]] = None,
name: Optional[str] = None,
store_configuration: Optional[_models.VectorStoreConfiguration] = None,
expires_after: Optional[_models.VectorStoreExpirationPolicy] = None,
chunking_strategy: Optional[_models.VectorStoreChunkingStrategyRequest] = None,
metadata: Optional[Dict[str, str]] = None,
**kwargs: Any
) -> _models.VectorStore:
"""Creates a vector store.
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:keyword file_ids: A list of file IDs that the vector store should use. Useful for tools like
``file_search`` that can access files. Default value is None.
:paramtype file_ids: list[str]
:keyword name: The name of the vector store. Default value is None.
:paramtype name: str
:keyword store_configuration: The vector store configuration, used when vector store is created
from Azure asset URIs. Default value is None.
:paramtype store_configuration: ~azure.ai.projects.models.VectorStoreConfiguration
:keyword expires_after: Details on when this vector store expires. Default value is None.
:paramtype expires_after: ~azure.ai.projects.models.VectorStoreExpirationPolicy
:keyword chunking_strategy: The chunking strategy used to chunk the file(s). If not set, will
use the auto strategy. Only applicable if file_ids is non-empty. Default value is None.
:paramtype chunking_strategy: ~azure.ai.projects.models.VectorStoreChunkingStrategyRequest
:keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
for storing additional information about that object in a structured format. Keys may be up to
64 characters in length and values may be up to 512 characters in length. Default value is
None.
:paramtype metadata: dict[str, str]
:return: VectorStore. The VectorStore is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.VectorStore
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def create_vector_store(
self, body: JSON, *, content_type: str = "application/json", **kwargs: Any
) -> _models.VectorStore:
"""Creates a vector store.
:param body: Required.
:type body: JSON
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: VectorStore. The VectorStore is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.VectorStore
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def create_vector_store(
self, body: IO[bytes], *, content_type: str = "application/json", **kwargs: Any
) -> _models.VectorStore:
"""Creates a vector store.
:param body: Required.
:type body: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: VectorStore. The VectorStore is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.VectorStore
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def create_vector_store(
self,
body: Union[JSON, IO[bytes]] = _Unset,
*,
file_ids: Optional[List[str]] = None,
name: Optional[str] = None,
store_configuration: Optional[_models.VectorStoreConfiguration] = None,
expires_after: Optional[_models.VectorStoreExpirationPolicy] = None,
chunking_strategy: Optional[_models.VectorStoreChunkingStrategyRequest] = None,
metadata: Optional[Dict[str, str]] = None,
**kwargs: Any
) -> _models.VectorStore:
"""Creates a vector store.
:param body: Is either a JSON type or a IO[bytes] type. Required.
:type body: JSON or IO[bytes]
:keyword file_ids: A list of file IDs that the vector store should use. Useful for tools like
``file_search`` that can access files. Default value is None.
:paramtype file_ids: list[str]
:keyword name: The name of the vector store. Default value is None.
:paramtype name: str
:keyword store_configuration: The vector store configuration, used when vector store is created
from Azure asset URIs. Default value is None.
:paramtype store_configuration: ~azure.ai.projects.models.VectorStoreConfiguration
:keyword expires_after: Details on when this vector store expires. Default value is None.
:paramtype expires_after: ~azure.ai.projects.models.VectorStoreExpirationPolicy
:keyword chunking_strategy: The chunking strategy used to chunk the file(s). If not set, will
use the auto strategy. Only applicable if file_ids is non-empty. Default value is None.
:paramtype chunking_strategy: ~azure.ai.projects.models.VectorStoreChunkingStrategyRequest
:keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
for storing additional information about that object in a structured format. Keys may be up to
64 characters in length and values may be up to 512 characters in length. Default value is
None.
:paramtype metadata: dict[str, str]
:return: VectorStore. The VectorStore is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.VectorStore
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.VectorStore] = kwargs.pop("cls", None)
if body is _Unset:
body = {
"chunking_strategy": chunking_strategy,
"configuration": store_configuration,
"expires_after": expires_after,
"file_ids": file_ids,
"metadata": metadata,
"name": name,
}
body = {k: v for k, v in body.items() if v is not None}
content_type = content_type or "application/json"
_content = None
if isinstance(body, (IOBase, bytes)):
_content = body
else:
_content = json.dumps(body, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore
_request = build_agents_create_vector_store_request(
content_type=content_type,
api_version=self._config.api_version,
content=_content,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.VectorStore, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def get_vector_store(self, vector_store_id: str, **kwargs: Any) -> _models.VectorStore:
"""Returns the vector store object matching the specified ID.
:param vector_store_id: Identifier of the vector store. Required.
:type vector_store_id: str
:return: VectorStore. The VectorStore is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.VectorStore
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[_models.VectorStore] = kwargs.pop("cls", None)
_request = build_agents_get_vector_store_request(
vector_store_id=vector_store_id,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.VectorStore, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
@overload
def modify_vector_store(
self,
vector_store_id: str,
*,
content_type: str = "application/json",
name: Optional[str] = None,
expires_after: Optional[_models.VectorStoreExpirationPolicy] = None,
metadata: Optional[Dict[str, str]] = None,
**kwargs: Any
) -> _models.VectorStore:
"""The ID of the vector store to modify.
:param vector_store_id: Identifier of the vector store. Required.
:type vector_store_id: str
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:keyword name: The name of the vector store. Default value is None.
:paramtype name: str
:keyword expires_after: Details on when this vector store expires. Default value is None.
:paramtype expires_after: ~azure.ai.projects.models.VectorStoreExpirationPolicy
:keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
for storing additional information about that object in a structured format. Keys may be up to
64 characters in length and values may be up to 512 characters in length. Default value is
None.
:paramtype metadata: dict[str, str]
:return: VectorStore. The VectorStore is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.VectorStore
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def modify_vector_store(
self, vector_store_id: str, body: JSON, *, content_type: str = "application/json", **kwargs: Any
) -> _models.VectorStore:
"""The ID of the vector store to modify.
:param vector_store_id: Identifier of the vector store. Required.
:type vector_store_id: str
:param body: Required.
:type body: JSON
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: VectorStore. The VectorStore is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.VectorStore
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def modify_vector_store(
self, vector_store_id: str, body: IO[bytes], *, content_type: str = "application/json", **kwargs: Any
) -> _models.VectorStore:
"""The ID of the vector store to modify.
:param vector_store_id: Identifier of the vector store. Required.
:type vector_store_id: str
:param body: Required.
:type body: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: VectorStore. The VectorStore is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.VectorStore
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def modify_vector_store(
self,
vector_store_id: str,
body: Union[JSON, IO[bytes]] = _Unset,
*,
name: Optional[str] = None,
expires_after: Optional[_models.VectorStoreExpirationPolicy] = None,
metadata: Optional[Dict[str, str]] = None,
**kwargs: Any
) -> _models.VectorStore:
"""The ID of the vector store to modify.
:param vector_store_id: Identifier of the vector store. Required.
:type vector_store_id: str
:param body: Is either a JSON type or a IO[bytes] type. Required.
:type body: JSON or IO[bytes]
:keyword name: The name of the vector store. Default value is None.
:paramtype name: str
:keyword expires_after: Details on when this vector store expires. Default value is None.
:paramtype expires_after: ~azure.ai.projects.models.VectorStoreExpirationPolicy
:keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
for storing additional information about that object in a structured format. Keys may be up to
64 characters in length and values may be up to 512 characters in length. Default value is
None.
:paramtype metadata: dict[str, str]
:return: VectorStore. The VectorStore is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.VectorStore
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.VectorStore] = kwargs.pop("cls", None)
if body is _Unset:
body = {"expires_after": expires_after, "metadata": metadata, "name": name}
body = {k: v for k, v in body.items() if v is not None}
content_type = content_type or "application/json"
_content = None
if isinstance(body, (IOBase, bytes)):
_content = body
else:
_content = json.dumps(body, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore
_request = build_agents_modify_vector_store_request(
vector_store_id=vector_store_id,
content_type=content_type,
api_version=self._config.api_version,
content=_content,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.VectorStore, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def delete_vector_store(self, vector_store_id: str, **kwargs: Any) -> _models.VectorStoreDeletionStatus:
"""Deletes the vector store object matching the specified ID.
:param vector_store_id: Identifier of the vector store. Required.
:type vector_store_id: str
:return: VectorStoreDeletionStatus. The VectorStoreDeletionStatus is compatible with
MutableMapping
:rtype: ~azure.ai.projects.models.VectorStoreDeletionStatus
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[_models.VectorStoreDeletionStatus] = kwargs.pop("cls", None)
_request = build_agents_delete_vector_store_request(
vector_store_id=vector_store_id,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.VectorStoreDeletionStatus, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def list_vector_store_files(
self,
vector_store_id: str,
*,
filter: Optional[Union[str, _models.VectorStoreFileStatusFilter]] = None,
limit: Optional[int] = None,
order: Optional[Union[str, _models.ListSortOrder]] = None,
after: Optional[str] = None,
before: Optional[str] = None,
**kwargs: Any
) -> _models.OpenAIPageableListOfVectorStoreFile:
"""Returns a list of vector store files.
:param vector_store_id: Identifier of the vector store. Required.
:type vector_store_id: str
:keyword filter: Filter by file status. Known values are: "in_progress", "completed", "failed",
and "cancelled". Default value is None.
:paramtype filter: str or ~azure.ai.projects.models.VectorStoreFileStatusFilter
:keyword limit: A limit on the number of objects to be returned. Limit can range between 1 and
100, and the default is 20. Default value is None.
:paramtype limit: int
:keyword order: Sort order by the created_at timestamp of the objects. asc for ascending order
and desc for descending order. Known values are: "asc" and "desc". Default value is None.
:paramtype order: str or ~azure.ai.projects.models.ListSortOrder
:keyword after: A cursor for use in pagination. after is an object ID that defines your place
in the list. For instance, if you make a list request and receive 100 objects, ending with
obj_foo, your subsequent call can include after=obj_foo in order to fetch the next page of the
list. Default value is None.
:paramtype after: str
:keyword before: A cursor for use in pagination. before is an object ID that defines your place
in the list. For instance, if you make a list request and receive 100 objects, ending with
obj_foo, your subsequent call can include before=obj_foo in order to fetch the previous page of
the list. Default value is None.
:paramtype before: str
:return: OpenAIPageableListOfVectorStoreFile. The OpenAIPageableListOfVectorStoreFile is
compatible with MutableMapping
:rtype: ~azure.ai.projects.models.OpenAIPageableListOfVectorStoreFile
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[_models.OpenAIPageableListOfVectorStoreFile] = kwargs.pop("cls", None)
_request = build_agents_list_vector_store_files_request(
vector_store_id=vector_store_id,
filter=filter,
limit=limit,
order=order,
after=after,
before=before,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.OpenAIPageableListOfVectorStoreFile, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
@overload
def create_vector_store_file(
self,
vector_store_id: str,
*,
content_type: str = "application/json",
file_id: Optional[str] = None,
data_source: Optional[_models.VectorStoreDataSource] = None,
chunking_strategy: Optional[_models.VectorStoreChunkingStrategyRequest] = None,
**kwargs: Any
) -> _models.VectorStoreFile:
"""Create a vector store file by attaching a file to a vector store.
:param vector_store_id: Identifier of the vector store. Required.
:type vector_store_id: str
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:keyword file_id: Identifier of the file. Default value is None.
:paramtype file_id: str
:keyword data_source: Azure asset ID. Default value is None.
:paramtype data_source: ~azure.ai.projects.models.VectorStoreDataSource
:keyword chunking_strategy: The chunking strategy used to chunk the file(s). If not set, will
use the auto strategy. Default value is None.
:paramtype chunking_strategy: ~azure.ai.projects.models.VectorStoreChunkingStrategyRequest
:return: VectorStoreFile. The VectorStoreFile is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.VectorStoreFile
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def create_vector_store_file(
self, vector_store_id: str, body: JSON, *, content_type: str = "application/json", **kwargs: Any
) -> _models.VectorStoreFile:
"""Create a vector store file by attaching a file to a vector store.
:param vector_store_id: Identifier of the vector store. Required.
:type vector_store_id: str
:param body: Required.
:type body: JSON
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: VectorStoreFile. The VectorStoreFile is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.VectorStoreFile
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def create_vector_store_file(
self, vector_store_id: str, body: IO[bytes], *, content_type: str = "application/json", **kwargs: Any
) -> _models.VectorStoreFile:
"""Create a vector store file by attaching a file to a vector store.
:param vector_store_id: Identifier of the vector store. Required.
:type vector_store_id: str
:param body: Required.
:type body: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: VectorStoreFile. The VectorStoreFile is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.VectorStoreFile
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def create_vector_store_file(
self,
vector_store_id: str,
body: Union[JSON, IO[bytes]] = _Unset,
*,
file_id: Optional[str] = None,
data_source: Optional[_models.VectorStoreDataSource] = None,
chunking_strategy: Optional[_models.VectorStoreChunkingStrategyRequest] = None,
**kwargs: Any
) -> _models.VectorStoreFile:
"""Create a vector store file by attaching a file to a vector store.
:param vector_store_id: Identifier of the vector store. Required.
:type vector_store_id: str
:param body: Is either a JSON type or a IO[bytes] type. Required.
:type body: JSON or IO[bytes]
:keyword file_id: Identifier of the file. Default value is None.
:paramtype file_id: str
:keyword data_source: Azure asset ID. Default value is None.
:paramtype data_source: ~azure.ai.projects.models.VectorStoreDataSource
:keyword chunking_strategy: The chunking strategy used to chunk the file(s). If not set, will
use the auto strategy. Default value is None.
:paramtype chunking_strategy: ~azure.ai.projects.models.VectorStoreChunkingStrategyRequest
:return: VectorStoreFile. The VectorStoreFile is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.VectorStoreFile
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.VectorStoreFile] = kwargs.pop("cls", None)
if body is _Unset:
body = {"chunking_strategy": chunking_strategy, "data_source": data_source, "file_id": file_id}
body = {k: v for k, v in body.items() if v is not None}
content_type = content_type or "application/json"
_content = None
if isinstance(body, (IOBase, bytes)):
_content = body
else:
_content = json.dumps(body, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore
_request = build_agents_create_vector_store_file_request(
vector_store_id=vector_store_id,
content_type=content_type,
api_version=self._config.api_version,
content=_content,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.VectorStoreFile, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def get_vector_store_file(self, vector_store_id: str, file_id: str, **kwargs: Any) -> _models.VectorStoreFile:
"""Retrieves a vector store file.
:param vector_store_id: Identifier of the vector store. Required.
:type vector_store_id: str
:param file_id: Identifier of the file. Required.
:type file_id: str
:return: VectorStoreFile. The VectorStoreFile is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.VectorStoreFile
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[_models.VectorStoreFile] = kwargs.pop("cls", None)
_request = build_agents_get_vector_store_file_request(
vector_store_id=vector_store_id,
file_id=file_id,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.VectorStoreFile, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def delete_vector_store_file(
self, vector_store_id: str, file_id: str, **kwargs: Any
) -> _models.VectorStoreFileDeletionStatus:
"""Delete a vector store file. This will remove the file from the vector store but the file itself
will not be deleted.
To delete the file, use the delete file endpoint.
:param vector_store_id: Identifier of the vector store. Required.
:type vector_store_id: str
:param file_id: Identifier of the file. Required.
:type file_id: str
:return: VectorStoreFileDeletionStatus. The VectorStoreFileDeletionStatus is compatible with
MutableMapping
:rtype: ~azure.ai.projects.models.VectorStoreFileDeletionStatus
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[_models.VectorStoreFileDeletionStatus] = kwargs.pop("cls", None)
_request = build_agents_delete_vector_store_file_request(
vector_store_id=vector_store_id,
file_id=file_id,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.VectorStoreFileDeletionStatus, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
@overload
def create_vector_store_file_batch(
self,
vector_store_id: str,
*,
content_type: str = "application/json",
file_ids: Optional[List[str]] = None,
data_sources: Optional[List[_models.VectorStoreDataSource]] = None,
chunking_strategy: Optional[_models.VectorStoreChunkingStrategyRequest] = None,
**kwargs: Any
) -> _models.VectorStoreFileBatch:
"""Create a vector store file batch.
:param vector_store_id: Identifier of the vector store. Required.
:type vector_store_id: str
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:keyword file_ids: List of file identifiers. Default value is None.
:paramtype file_ids: list[str]
:keyword data_sources: List of Azure assets. Default value is None.
:paramtype data_sources: list[~azure.ai.projects.models.VectorStoreDataSource]
:keyword chunking_strategy: The chunking strategy used to chunk the file(s). If not set, will
use the auto strategy. Default value is None.
:paramtype chunking_strategy: ~azure.ai.projects.models.VectorStoreChunkingStrategyRequest
:return: VectorStoreFileBatch. The VectorStoreFileBatch is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.VectorStoreFileBatch
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def create_vector_store_file_batch(
self, vector_store_id: str, body: JSON, *, content_type: str = "application/json", **kwargs: Any
) -> _models.VectorStoreFileBatch:
"""Create a vector store file batch.
:param vector_store_id: Identifier of the vector store. Required.
:type vector_store_id: str
:param body: Required.
:type body: JSON
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: VectorStoreFileBatch. The VectorStoreFileBatch is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.VectorStoreFileBatch
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def create_vector_store_file_batch(
self, vector_store_id: str, body: IO[bytes], *, content_type: str = "application/json", **kwargs: Any
) -> _models.VectorStoreFileBatch:
"""Create a vector store file batch.
:param vector_store_id: Identifier of the vector store. Required.
:type vector_store_id: str
:param body: Required.
:type body: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: VectorStoreFileBatch. The VectorStoreFileBatch is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.VectorStoreFileBatch
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def create_vector_store_file_batch(
self,
vector_store_id: str,
body: Union[JSON, IO[bytes]] = _Unset,
*,
file_ids: Optional[List[str]] = None,
data_sources: Optional[List[_models.VectorStoreDataSource]] = None,
chunking_strategy: Optional[_models.VectorStoreChunkingStrategyRequest] = None,
**kwargs: Any
) -> _models.VectorStoreFileBatch:
"""Create a vector store file batch.
:param vector_store_id: Identifier of the vector store. Required.
:type vector_store_id: str
:param body: Is either a JSON type or a IO[bytes] type. Required.
:type body: JSON or IO[bytes]
:keyword file_ids: List of file identifiers. Default value is None.
:paramtype file_ids: list[str]
:keyword data_sources: List of Azure assets. Default value is None.
:paramtype data_sources: list[~azure.ai.projects.models.VectorStoreDataSource]
:keyword chunking_strategy: The chunking strategy used to chunk the file(s). If not set, will
use the auto strategy. Default value is None.
:paramtype chunking_strategy: ~azure.ai.projects.models.VectorStoreChunkingStrategyRequest
:return: VectorStoreFileBatch. The VectorStoreFileBatch is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.VectorStoreFileBatch
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.VectorStoreFileBatch] = kwargs.pop("cls", None)
if body is _Unset:
body = {"chunking_strategy": chunking_strategy, "data_sources": data_sources, "file_ids": file_ids}
body = {k: v for k, v in body.items() if v is not None}
content_type = content_type or "application/json"
_content = None
if isinstance(body, (IOBase, bytes)):
_content = body
else:
_content = json.dumps(body, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore
_request = build_agents_create_vector_store_file_batch_request(
vector_store_id=vector_store_id,
content_type=content_type,
api_version=self._config.api_version,
content=_content,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.VectorStoreFileBatch, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def get_vector_store_file_batch(
self, vector_store_id: str, batch_id: str, **kwargs: Any
) -> _models.VectorStoreFileBatch:
"""Retrieve a vector store file batch.
:param vector_store_id: Identifier of the vector store. Required.
:type vector_store_id: str
:param batch_id: Identifier of the file batch. Required.
:type batch_id: str
:return: VectorStoreFileBatch. The VectorStoreFileBatch is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.VectorStoreFileBatch
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[_models.VectorStoreFileBatch] = kwargs.pop("cls", None)
_request = build_agents_get_vector_store_file_batch_request(
vector_store_id=vector_store_id,
batch_id=batch_id,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.VectorStoreFileBatch, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def cancel_vector_store_file_batch(
self, vector_store_id: str, batch_id: str, **kwargs: Any
) -> _models.VectorStoreFileBatch:
"""Cancel a vector store file batch. This attempts to cancel the processing of files in this batch
as soon as possible.
:param vector_store_id: Identifier of the vector store. Required.
:type vector_store_id: str
:param batch_id: Identifier of the file batch. Required.
:type batch_id: str
:return: VectorStoreFileBatch. The VectorStoreFileBatch is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.VectorStoreFileBatch
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[_models.VectorStoreFileBatch] = kwargs.pop("cls", None)
_request = build_agents_cancel_vector_store_file_batch_request(
vector_store_id=vector_store_id,
batch_id=batch_id,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.VectorStoreFileBatch, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def list_vector_store_file_batch_files(
self,
vector_store_id: str,
batch_id: str,
*,
filter: Optional[Union[str, _models.VectorStoreFileStatusFilter]] = None,
limit: Optional[int] = None,
order: Optional[Union[str, _models.ListSortOrder]] = None,
after: Optional[str] = None,
before: Optional[str] = None,
**kwargs: Any
) -> _models.OpenAIPageableListOfVectorStoreFile:
"""Returns a list of vector store files in a batch.
:param vector_store_id: Identifier of the vector store. Required.
:type vector_store_id: str
:param batch_id: Identifier of the file batch. Required.
:type batch_id: str
:keyword filter: Filter by file status. Known values are: "in_progress", "completed", "failed",
and "cancelled". Default value is None.
:paramtype filter: str or ~azure.ai.projects.models.VectorStoreFileStatusFilter
:keyword limit: A limit on the number of objects to be returned. Limit can range between 1 and
100, and the default is 20. Default value is None.
:paramtype limit: int
:keyword order: Sort order by the created_at timestamp of the objects. asc for ascending order
and desc for descending order. Known values are: "asc" and "desc". Default value is None.
:paramtype order: str or ~azure.ai.projects.models.ListSortOrder
:keyword after: A cursor for use in pagination. after is an object ID that defines your place
in the list. For instance, if you make a list request and receive 100 objects, ending with
obj_foo, your subsequent call can include after=obj_foo in order to fetch the next page of the
list. Default value is None.
:paramtype after: str
:keyword before: A cursor for use in pagination. before is an object ID that defines your place
in the list. For instance, if you make a list request and receive 100 objects, ending with
obj_foo, your subsequent call can include before=obj_foo in order to fetch the previous page of
the list. Default value is None.
:paramtype before: str
:return: OpenAIPageableListOfVectorStoreFile. The OpenAIPageableListOfVectorStoreFile is
compatible with MutableMapping
:rtype: ~azure.ai.projects.models.OpenAIPageableListOfVectorStoreFile
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[_models.OpenAIPageableListOfVectorStoreFile] = kwargs.pop("cls", None)
_request = build_agents_list_vector_store_file_batch_files_request(
vector_store_id=vector_store_id,
batch_id=batch_id,
filter=filter,
limit=limit,
order=order,
after=after,
before=before,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.OpenAIPageableListOfVectorStoreFile, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
class ConnectionsOperations:
"""
.. warning::
**DO NOT** instantiate this class directly.
Instead, you should access the following operations through
:class:`~azure.ai.projects.AIProjectClient`'s
:attr:`connections` attribute.
"""
def __init__(self, *args, **kwargs):
input_args = list(args)
self._client: PipelineClient = input_args.pop(0) if input_args else kwargs.pop("client")
self._config: AIProjectClientConfiguration = input_args.pop(0) if input_args else kwargs.pop("config")
self._serialize: Serializer = input_args.pop(0) if input_args else kwargs.pop("serializer")
self._deserialize: Deserializer = input_args.pop(0) if input_args else kwargs.pop("deserializer")
@distributed_trace
def _get_workspace(self, **kwargs: Any) -> _models._models.GetWorkspaceResponse:
"""Gets the properties of the specified machine learning workspace.
:return: GetWorkspaceResponse. The GetWorkspaceResponse is compatible with MutableMapping
:rtype: ~azure.ai.projects.models._models.GetWorkspaceResponse
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[_models._models.GetWorkspaceResponse] = kwargs.pop("cls", None)
_request = build_connections_get_workspace_request(
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(
_models._models.GetWorkspaceResponse, response.json() # pylint: disable=protected-access
)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
@distributed_trace
def _list_connections(
self,
*,
category: Optional[Union[str, _models.ConnectionType]] = None,
include_all: Optional[bool] = None,
target: Optional[str] = None,
**kwargs: Any
) -> _models._models.ListConnectionsResponse:
"""List the details of all the connections (not including their credentials).
:keyword category: Category of the workspace connection. Known values are: "AzureOpenAI",
"Serverless", "AzureBlob", "AIServices", and "CognitiveSearch". Default value is None.
:paramtype category: str or ~azure.ai.projects.models.ConnectionType
:keyword include_all: Indicates whether to list datastores. Service default: do not list
datastores. Default value is None.
:paramtype include_all: bool
:keyword target: Target of the workspace connection. Default value is None.
:paramtype target: str
:return: ListConnectionsResponse. The ListConnectionsResponse is compatible with MutableMapping
:rtype: ~azure.ai.projects.models._models.ListConnectionsResponse
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[_models._models.ListConnectionsResponse] = kwargs.pop("cls", None)
_request = build_connections_list_connections_request(
category=category,
include_all=include_all,
target=target,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(
_models._models.ListConnectionsResponse, response.json() # pylint: disable=protected-access
)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
@distributed_trace
def _get_connection(self, connection_name: str, **kwargs: Any) -> _models._models.GetConnectionResponse:
"""Get the details of a single connection, without credentials.
:param connection_name: Connection Name. Required.
:type connection_name: str
:return: GetConnectionResponse. The GetConnectionResponse is compatible with MutableMapping
:rtype: ~azure.ai.projects.models._models.GetConnectionResponse
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[_models._models.GetConnectionResponse] = kwargs.pop("cls", None)
_request = build_connections_get_connection_request(
connection_name=connection_name,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(
_models._models.GetConnectionResponse, response.json() # pylint: disable=protected-access
)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
@overload
def _get_connection_with_secrets(
self, connection_name: str, *, ignored: str, content_type: str = "application/json", **kwargs: Any
) -> _models._models.GetConnectionResponse: ...
@overload
def _get_connection_with_secrets(
self, connection_name: str, body: JSON, *, content_type: str = "application/json", **kwargs: Any
) -> _models._models.GetConnectionResponse: ...
@overload
def _get_connection_with_secrets(
self, connection_name: str, body: IO[bytes], *, content_type: str = "application/json", **kwargs: Any
) -> _models._models.GetConnectionResponse: ...
@distributed_trace
def _get_connection_with_secrets(
self, connection_name: str, body: Union[JSON, IO[bytes]] = _Unset, *, ignored: str = _Unset, **kwargs: Any
) -> _models._models.GetConnectionResponse:
"""Get the details of a single connection, including credentials (if available).
:param connection_name: Connection Name. Required.
:type connection_name: str
:param body: Is either a JSON type or a IO[bytes] type. Required.
:type body: JSON or IO[bytes]
:keyword ignored: The body is ignored. TODO: Can we remove this?. Required.
:paramtype ignored: str
:return: GetConnectionResponse. The GetConnectionResponse is compatible with MutableMapping
:rtype: ~azure.ai.projects.models._models.GetConnectionResponse
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models._models.GetConnectionResponse] = kwargs.pop("cls", None)
if body is _Unset:
if ignored is _Unset:
raise TypeError("missing required argument: ignored")
body = {"ignored": ignored}
body = {k: v for k, v in body.items() if v is not None}
content_type = content_type or "application/json"
_content = None
if isinstance(body, (IOBase, bytes)):
_content = body
else:
_content = json.dumps(body, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore
_request = build_connections_get_connection_with_secrets_request(
connection_name=connection_name,
content_type=content_type,
api_version=self._config.api_version,
content=_content,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(
_models._models.GetConnectionResponse, response.json() # pylint: disable=protected-access
)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
class TelemetryOperations:
"""
.. warning::
**DO NOT** instantiate this class directly.
Instead, you should access the following operations through
:class:`~azure.ai.projects.AIProjectClient`'s
:attr:`telemetry` attribute.
"""
def __init__(self, *args, **kwargs):
input_args = list(args)
self._client: PipelineClient = input_args.pop(0) if input_args else kwargs.pop("client")
self._config: AIProjectClientConfiguration = input_args.pop(0) if input_args else kwargs.pop("config")
self._serialize: Serializer = input_args.pop(0) if input_args else kwargs.pop("serializer")
self._deserialize: Deserializer = input_args.pop(0) if input_args else kwargs.pop("deserializer")
@distributed_trace
def _get_app_insights(
self, app_insights_resource_url: str, **kwargs: Any
) -> _models._models.GetAppInsightsResponse:
# pylint: disable=line-too-long
"""Gets the properties of the specified Application Insights resource.
:param app_insights_resource_url: The AppInsights Azure resource Url. It should have the
format:
'/subscriptions/{subscription_id}/resourceGroups/{resource_group_name}/providers/microsoft.insights/components/{resourcename}'.
Required.
:type app_insights_resource_url: str
:return: GetAppInsightsResponse. The GetAppInsightsResponse is compatible with MutableMapping
:rtype: ~azure.ai.projects.models._models.GetAppInsightsResponse
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[_models._models.GetAppInsightsResponse] = kwargs.pop("cls", None)
_request = build_telemetry_get_app_insights_request(
app_insights_resource_url=app_insights_resource_url,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(
_models._models.GetAppInsightsResponse, response.json() # pylint: disable=protected-access
)
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
[docs]
class EvaluationsOperations:
"""
.. warning::
**DO NOT** instantiate this class directly.
Instead, you should access the following operations through
:class:`~azure.ai.projects.AIProjectClient`'s
:attr:`evaluations` attribute.
"""
def __init__(self, *args, **kwargs):
input_args = list(args)
self._client: PipelineClient = input_args.pop(0) if input_args else kwargs.pop("client")
self._config: AIProjectClientConfiguration = input_args.pop(0) if input_args else kwargs.pop("config")
self._serialize: Serializer = input_args.pop(0) if input_args else kwargs.pop("serializer")
self._deserialize: Deserializer = input_args.pop(0) if input_args else kwargs.pop("deserializer")
[docs]
@distributed_trace
def get(self, id: str, **kwargs: Any) -> _models.Evaluation:
"""Resource read operation template.
:param id: Identifier of the evaluation. Required.
:type id: str
:return: Evaluation. The Evaluation is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.Evaluation
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[_models.Evaluation] = kwargs.pop("cls", None)
_request = build_evaluations_get_request(
id=id,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
response_headers = {}
response_headers["x-ms-client-request-id"] = self._deserialize(
"str", response.headers.get("x-ms-client-request-id")
)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.Evaluation, response.json())
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
@overload
def create(
self, evaluation: _models.Evaluation, *, content_type: str = "application/json", **kwargs: Any
) -> _models.Evaluation:
"""Run the evaluation.
:param evaluation: Evaluation to run. Required.
:type evaluation: ~azure.ai.projects.models.Evaluation
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: Evaluation. The Evaluation is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.Evaluation
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def create(self, evaluation: JSON, *, content_type: str = "application/json", **kwargs: Any) -> _models.Evaluation:
"""Run the evaluation.
:param evaluation: Evaluation to run. Required.
:type evaluation: JSON
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: Evaluation. The Evaluation is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.Evaluation
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def create(
self, evaluation: IO[bytes], *, content_type: str = "application/json", **kwargs: Any
) -> _models.Evaluation:
"""Run the evaluation.
:param evaluation: Evaluation to run. Required.
:type evaluation: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: Evaluation. The Evaluation is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.Evaluation
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def create(self, evaluation: Union[_models.Evaluation, JSON, IO[bytes]], **kwargs: Any) -> _models.Evaluation:
"""Run the evaluation.
:param evaluation: Evaluation to run. Is one of the following types: Evaluation, JSON,
IO[bytes] Required.
:type evaluation: ~azure.ai.projects.models.Evaluation or JSON or IO[bytes]
:return: Evaluation. The Evaluation is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.Evaluation
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.Evaluation] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_content = None
if isinstance(evaluation, (IOBase, bytes)):
_content = evaluation
else:
_content = json.dumps(evaluation, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore
_request = build_evaluations_create_request(
content_type=content_type,
api_version=self._config.api_version,
content=_content,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [201]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.Evaluation, response.json())
if cls:
return cls(pipeline_response, deserialized, {}) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def list(
self, *, top: Optional[int] = None, skip: Optional[int] = None, **kwargs: Any
) -> Iterable["_models.Evaluation"]:
"""Resource list operation template.
:keyword top: The number of result items to return. Default value is None.
:paramtype top: int
:keyword skip: The number of result items to skip. Default value is None.
:paramtype skip: int
:return: An iterator like instance of Evaluation
:rtype: ~azure.core.paging.ItemPaged[~azure.ai.projects.models.Evaluation]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
maxpagesize = kwargs.pop("maxpagesize", None)
cls: ClsType[List[_models.Evaluation]] = kwargs.pop("cls", None)
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
_request = build_evaluations_list_request(
top=top,
skip=skip,
maxpagesize=maxpagesize,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url(
"self._config.subscription_id", self._config.subscription_id, "str"
),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
_request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url(
"self._config.subscription_id", self._config.subscription_id, "str"
),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
return _request
def extract_data(pipeline_response):
deserialized = pipeline_response.http_response.json()
list_of_elem = _deserialize(List[_models.Evaluation], deserialized["value"])
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return deserialized.get("nextLink") or None, iter(list_of_elem)
def get_next(next_link=None):
_request = prepare_request(next_link)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
return pipeline_response
return ItemPaged(get_next, extract_data)
@overload
def update(
self,
id: str,
resource: _models.Evaluation,
*,
content_type: str = "application/merge-patch+json",
**kwargs: Any
) -> _models.Evaluation:
"""Resource update operation template.
:param id: Identifier of the evaluation. Required.
:type id: str
:param resource: The resource instance. Required.
:type resource: ~azure.ai.projects.models.Evaluation
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/merge-patch+json".
:paramtype content_type: str
:return: Evaluation. The Evaluation is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.Evaluation
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def update(
self, id: str, resource: JSON, *, content_type: str = "application/merge-patch+json", **kwargs: Any
) -> _models.Evaluation:
"""Resource update operation template.
:param id: Identifier of the evaluation. Required.
:type id: str
:param resource: The resource instance. Required.
:type resource: JSON
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/merge-patch+json".
:paramtype content_type: str
:return: Evaluation. The Evaluation is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.Evaluation
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def update(
self, id: str, resource: IO[bytes], *, content_type: str = "application/merge-patch+json", **kwargs: Any
) -> _models.Evaluation:
"""Resource update operation template.
:param id: Identifier of the evaluation. Required.
:type id: str
:param resource: The resource instance. Required.
:type resource: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/merge-patch+json".
:paramtype content_type: str
:return: Evaluation. The Evaluation is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.Evaluation
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def update(
self, id: str, resource: Union[_models.Evaluation, JSON, IO[bytes]], **kwargs: Any
) -> _models.Evaluation:
"""Resource update operation template.
:param id: Identifier of the evaluation. Required.
:type id: str
:param resource: The resource instance. Is one of the following types: Evaluation, JSON,
IO[bytes] Required.
:type resource: ~azure.ai.projects.models.Evaluation or JSON or IO[bytes]
:return: Evaluation. The Evaluation is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.Evaluation
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.Evaluation] = kwargs.pop("cls", None)
content_type = content_type or "application/merge-patch+json"
_content = None
if isinstance(resource, (IOBase, bytes)):
_content = resource
else:
_content = json.dumps(resource, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore
_request = build_evaluations_update_request(
id=id,
content_type=content_type,
api_version=self._config.api_version,
content=_content,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
response_headers = {}
response_headers["x-ms-client-request-id"] = self._deserialize(
"str", response.headers.get("x-ms-client-request-id")
)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.Evaluation, response.json())
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def get_schedule(self, name: str, **kwargs: Any) -> _models.EvaluationSchedule:
"""Resource read operation template.
:param name: Name of the schedule, which also serves as the unique identifier for the
evaluation. Required.
:type name: str
:return: EvaluationSchedule. The EvaluationSchedule is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.EvaluationSchedule
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[_models.EvaluationSchedule] = kwargs.pop("cls", None)
_request = build_evaluations_get_schedule_request(
name=name,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
response_headers = {}
response_headers["x-ms-client-request-id"] = self._deserialize(
"str", response.headers.get("x-ms-client-request-id")
)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.EvaluationSchedule, response.json())
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
@overload
def create_or_replace_schedule(
self, name: str, resource: _models.EvaluationSchedule, *, content_type: str = "application/json", **kwargs: Any
) -> _models.EvaluationSchedule:
"""Create or replace operation template.
:param name: Name of the schedule, which also serves as the unique identifier for the
evaluation. Required.
:type name: str
:param resource: The resource instance. Required.
:type resource: ~azure.ai.projects.models.EvaluationSchedule
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: EvaluationSchedule. The EvaluationSchedule is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.EvaluationSchedule
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def create_or_replace_schedule(
self, name: str, resource: JSON, *, content_type: str = "application/json", **kwargs: Any
) -> _models.EvaluationSchedule:
"""Create or replace operation template.
:param name: Name of the schedule, which also serves as the unique identifier for the
evaluation. Required.
:type name: str
:param resource: The resource instance. Required.
:type resource: JSON
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: EvaluationSchedule. The EvaluationSchedule is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.EvaluationSchedule
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
def create_or_replace_schedule(
self, name: str, resource: IO[bytes], *, content_type: str = "application/json", **kwargs: Any
) -> _models.EvaluationSchedule:
"""Create or replace operation template.
:param name: Name of the schedule, which also serves as the unique identifier for the
evaluation. Required.
:type name: str
:param resource: The resource instance. Required.
:type resource: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: EvaluationSchedule. The EvaluationSchedule is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.EvaluationSchedule
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace
def create_or_replace_schedule(
self, name: str, resource: Union[_models.EvaluationSchedule, JSON, IO[bytes]], **kwargs: Any
) -> _models.EvaluationSchedule:
"""Create or replace operation template.
:param name: Name of the schedule, which also serves as the unique identifier for the
evaluation. Required.
:type name: str
:param resource: The resource instance. Is one of the following types: EvaluationSchedule,
JSON, IO[bytes] Required.
:type resource: ~azure.ai.projects.models.EvaluationSchedule or JSON or IO[bytes]
:return: EvaluationSchedule. The EvaluationSchedule is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.EvaluationSchedule
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = kwargs.pop("params", {}) or {}
content_type: Optional[str] = kwargs.pop("content_type", _headers.pop("Content-Type", None))
cls: ClsType[_models.EvaluationSchedule] = kwargs.pop("cls", None)
content_type = content_type or "application/json"
_content = None
if isinstance(resource, (IOBase, bytes)):
_content = resource
else:
_content = json.dumps(resource, cls=SdkJSONEncoder, exclude_readonly=True) # type: ignore
_request = build_evaluations_create_or_replace_schedule_request(
name=name,
content_type=content_type,
api_version=self._config.api_version,
content=_content,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = kwargs.pop("stream", False)
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200, 201]:
if _stream:
try:
response.read() # Load the body in memory and close the socket
except (StreamConsumedError, StreamClosedError):
pass
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
response_headers = {}
response_headers["x-ms-client-request-id"] = self._deserialize(
"str", response.headers.get("x-ms-client-request-id")
)
if _stream:
deserialized = response.iter_bytes()
else:
deserialized = _deserialize(_models.EvaluationSchedule, response.json())
if cls:
return cls(pipeline_response, deserialized, response_headers) # type: ignore
return deserialized # type: ignore
[docs]
@distributed_trace
def list_schedule(
self, *, top: Optional[int] = None, skip: Optional[int] = None, **kwargs: Any
) -> Iterable["_models.EvaluationSchedule"]:
"""Resource list operation template.
:keyword top: The number of result items to return. Default value is None.
:paramtype top: int
:keyword skip: The number of result items to skip. Default value is None.
:paramtype skip: int
:return: An iterator like instance of EvaluationSchedule
:rtype: ~azure.core.paging.ItemPaged[~azure.ai.projects.models.EvaluationSchedule]
:raises ~azure.core.exceptions.HttpResponseError:
"""
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
maxpagesize = kwargs.pop("maxpagesize", None)
cls: ClsType[List[_models.EvaluationSchedule]] = kwargs.pop("cls", None)
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
def prepare_request(next_link=None):
if not next_link:
_request = build_evaluations_list_schedule_request(
top=top,
skip=skip,
maxpagesize=maxpagesize,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url(
"self._config.subscription_id", self._config.subscription_id, "str"
),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
else:
# make call to next link with the client's api-version
_parsed_next_link = urllib.parse.urlparse(next_link)
_next_request_params = case_insensitive_dict(
{
key: [urllib.parse.quote(v) for v in value]
for key, value in urllib.parse.parse_qs(_parsed_next_link.query).items()
}
)
_next_request_params["api-version"] = self._config.api_version
_request = HttpRequest(
"GET", urllib.parse.urljoin(next_link, _parsed_next_link.path), params=_next_request_params
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url(
"self._config.subscription_id", self._config.subscription_id, "str"
),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
return _request
def extract_data(pipeline_response):
deserialized = pipeline_response.http_response.json()
list_of_elem = _deserialize(List[_models.EvaluationSchedule], deserialized["value"])
if cls:
list_of_elem = cls(list_of_elem) # type: ignore
return deserialized.get("nextLink") or None, iter(list_of_elem)
def get_next(next_link=None):
_request = prepare_request(next_link)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
return pipeline_response
return ItemPaged(get_next, extract_data)
[docs]
@distributed_trace
def disable_schedule(self, name: str, **kwargs: Any) -> None: # pylint: disable=inconsistent-return-statements
"""Disable the evaluation schedule.
:param name: Name of the evaluation schedule. Required.
:type name: str
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[None] = kwargs.pop("cls", None)
_request = build_evaluations_disable_schedule_request(
name=name,
api_version=self._config.api_version,
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, "str"),
"subscriptionId": self._serialize.url("self._config.subscription_id", self._config.subscription_id, "str"),
"resourceGroupName": self._serialize.url(
"self._config.resource_group_name", self._config.resource_group_name, "str"
),
"projectName": self._serialize.url("self._config.project_name", self._config.project_name, "str"),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request, stream=_stream, **kwargs
)
response = pipeline_response.http_response
if response.status_code not in [204]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if cls:
return cls(pipeline_response, None, {}) # type: ignore