# pylint: disable=too-many-lines
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
"""Customize generated code here.
Follow our quickstart for examples: https://aka.ms/azsdk/python/dpcodegen/python/customize
"""
import asyncio
import concurrent.futures
import io
import logging
import os
import time
from pathlib import Path
from typing import (
IO,
TYPE_CHECKING,
Any,
AsyncIterator,
Dict,
List,
MutableMapping,
Optional,
Sequence,
TextIO,
Union,
cast,
overload,
)
from azure.core.credentials import TokenCredential
from azure.core.exceptions import ResourceNotFoundError
from azure.core.tracing.decorator_async import distributed_trace_async
from ... import models as _models
from ..._vendor import FileType
from ...models._enums import AuthenticationType, ConnectionType, FilePurpose, RunStatus
from ...models._models import (
GetAppInsightsResponse,
GetConnectionResponse,
GetWorkspaceResponse,
InternalConnectionPropertiesSASAuth,
ListConnectionsResponse,
)
from ...models._patch import ConnectionProperties
from ...operations._patch import _enable_telemetry
from ._operations import AgentsOperations as AgentsOperationsGenerated
from ._operations import ConnectionsOperations as ConnectionsOperationsGenerated
from ._operations import TelemetryOperations as TelemetryOperationsGenerated
if TYPE_CHECKING:
# pylint: disable=unused-import,ungrouped-imports
from openai import AsyncAzureOpenAI
from azure.ai.inference.aio import ChatCompletionsClient, EmbeddingsClient, ImageEmbeddingsClient
from azure.ai.projects import _types
from azure.core.credentials import AccessToken
from azure.core.credentials_async import AsyncTokenCredential
logger = logging.getLogger(__name__)
JSON = MutableMapping[str, Any] # pylint: disable=unsubscriptable-object
_Unset: Any = object()
[docs]
class InferenceOperations:
def __init__(self, outer_instance):
# All returned inference clients will have this application id set on their user-agent.
# For more info on user-agent HTTP header, see:
# https://azure.github.io/azure-sdk/general_azurecore.html#telemetry-policy
USER_AGENT_APP_ID = "AIProjectClient"
if hasattr(outer_instance, "_user_agent") and outer_instance._user_agent:
# If the calling application has set "user_agent" when constructing the AIProjectClient,
# take that value and prepend it to USER_AGENT_APP_ID.
self._user_agent = f"{outer_instance._user_agent}-{USER_AGENT_APP_ID}"
else:
self._user_agent = USER_AGENT_APP_ID
self._outer_instance = outer_instance
[docs]
@distributed_trace_async
async def get_chat_completions_client(
self, *, connection_name: Optional[str] = None, **kwargs
) -> "ChatCompletionsClient":
"""Get an authenticated asynchronous ChatCompletionsClient (from the package azure-ai-inference) for the default
Azure AI Services connected resource (if `connection_name` is not specificed), or from the Azure AI
Services resource given by its connection name. Keyword arguments are passed to the constructor of
ChatCompletionsClient.
At least one AI model that supports chat completions must be deployed in this resource.
.. note:: The packages `azure-ai-inference` and `aiohttp` must be installed prior to calling this method.
:keyword connection_name: The name of a connection to an Azure AI Services resource in your AI Foundry project.
resource. Optional. If not provided, the default Azure AI Services connection will be used.
:type connection_name: str
:return: An authenticated chat completions client.
:rtype: ~azure.ai.inference.ChatCompletionsClient
:raises ~azure.core.exceptions.ResourceNotFoundError: if an Azure AI Services connection
does not exist.
:raises ~azure.core.exceptions.ModuleNotFoundError: if the `azure-ai-inference` package
is not installed.
:raises ValueError: if the connection name is an empty string.
:raises ~azure.core.exceptions.HttpResponseError:
"""
kwargs.setdefault("merge_span", True)
if connection_name is not None and not connection_name:
raise ValueError("Connection name cannot be empty")
# Back-door way to access the old behavior where each AI model (non-OpenAI) was hosted on
# a separate "Serverless" connection. This is now deprecated.
use_serverless_connection: bool = os.getenv("USE_SERVERLESS_CONNECTION", None) == "true"
if connection_name:
connection = await self._outer_instance.connections.get(
connection_name=connection_name, include_credentials=True
)
else:
if use_serverless_connection:
connection = await self._outer_instance.connections.get_default(
connection_type=ConnectionType.SERVERLESS, include_credentials=True
)
else:
connection = await self._outer_instance.connections.get_default(
connection_type=ConnectionType.AZURE_AI_SERVICES, include_credentials=True
)
logger.debug("[InferenceOperations.get_chat_completions_client] connection = %s", str(connection))
try:
from azure.ai.inference.aio import ChatCompletionsClient
except ModuleNotFoundError as e:
raise ModuleNotFoundError(
"Azure AI Inference SDK is not installed. Please install it using 'pip install azure-ai-inference'"
) from e
if use_serverless_connection:
endpoint = connection.endpoint_url
credential_scopes = ["https://ml.azure.com/.default"]
else:
endpoint = f"{connection.endpoint_url}/models"
credential_scopes = ["https://cognitiveservices.azure.com/.default"]
if connection.authentication_type == AuthenticationType.API_KEY:
logger.debug(
"[InferenceOperations.get_chat_completions_client]"
+ " Creating ChatCompletionsClient using API key authentication"
)
from azure.core.credentials import AzureKeyCredential
client = ChatCompletionsClient(
endpoint=endpoint,
credential=AzureKeyCredential(connection.key),
user_agent=kwargs.pop("user_agent", self._user_agent),
**kwargs,
)
elif connection.authentication_type == AuthenticationType.ENTRA_ID:
logger.debug(
"[InferenceOperations.get_chat_completions_client]"
+ " Creating ChatCompletionsClient using Entra ID authentication"
)
client = ChatCompletionsClient(
endpoint=endpoint,
credential=connection.token_credential,
credential_scopes=credential_scopes,
user_agent=kwargs.pop("user_agent", self._user_agent),
**kwargs,
)
elif connection.authentication_type == AuthenticationType.SAS:
logger.debug(
"[InferenceOperations.get_chat_completions_client] "
+ "Creating ChatCompletionsClient using SAS authentication"
)
raise ValueError(
"Getting chat completions client from a connection with SAS authentication is not yet supported"
)
else:
raise ValueError("Unknown authentication type")
return client
[docs]
@distributed_trace_async
async def get_embeddings_client(self, *, connection_name: Optional[str] = None, **kwargs) -> "EmbeddingsClient":
"""Get an authenticated asynchronous EmbeddingsClient (from the package azure-ai-inference) for the default
Azure AI Services connected resource (if `connection_name` is not specificed), or from the Azure AI
Services resource given by its connection name. Keyword arguments are passed to the constructor of
EmbeddingsClient.
At least one AI model that supports text embeddings must be deployed in this resource.
.. note:: The packages `azure-ai-inference` and `aiohttp` must be installed prior to calling this method.
:keyword connection_name: The name of a connection to an Azure AI Services resource in your AI Foundry project.
resource. Optional. If not provided, the default Azure AI Services connection will be used.
:type connection_name: str
:return: An authenticated text embeddings client
:rtype: ~azure.ai.inference.EmbeddingsClient
:raises ~azure.core.exceptions.ResourceNotFoundError: if an Azure AI Services connection
does not exist.
:raises ~azure.core.exceptions.ModuleNotFoundError: if the `azure-ai-inference` package
is not installed.
:raises ValueError: if the connection name is an empty string.
:raises ~azure.core.exceptions.HttpResponseError:
"""
kwargs.setdefault("merge_span", True)
if connection_name is not None and not connection_name:
raise ValueError("Connection name cannot be empty")
# Back-door way to access the old behavior where each AI model (non-OpenAI) was hosted on
# a separate "Serverless" connection. This is now deprecated.
use_serverless_connection: bool = os.getenv("USE_SERVERLESS_CONNECTION", None) == "true"
if connection_name:
connection = await self._outer_instance.connections.get(
connection_name=connection_name, include_credentials=True
)
else:
if use_serverless_connection:
connection = await self._outer_instance.connections.get_default(
connection_type=ConnectionType.SERVERLESS, include_credentials=True
)
else:
connection = await self._outer_instance.connections.get_default(
connection_type=ConnectionType.AZURE_AI_SERVICES, include_credentials=True
)
logger.debug("[InferenceOperations.get_embeddings_client] connection = %s", str(connection))
try:
from azure.ai.inference.aio import EmbeddingsClient
except ModuleNotFoundError as e:
raise ModuleNotFoundError(
"Azure AI Inference SDK is not installed. Please install it using 'pip install azure-ai-inference'"
) from e
if use_serverless_connection:
endpoint = connection.endpoint_url
credential_scopes = ["https://ml.azure.com/.default"]
else:
endpoint = f"{connection.endpoint_url}/models"
credential_scopes = ["https://cognitiveservices.azure.com/.default"]
if connection.authentication_type == AuthenticationType.API_KEY:
logger.debug(
"[InferenceOperations.get_embeddings_client] Creating EmbeddingsClient using API key authentication"
)
from azure.core.credentials import AzureKeyCredential
client = EmbeddingsClient(
endpoint=endpoint,
credential=AzureKeyCredential(connection.key),
user_agent=kwargs.pop("user_agent", self._user_agent),
**kwargs,
)
elif connection.authentication_type == AuthenticationType.ENTRA_ID:
logger.debug(
"[InferenceOperations.get_embeddings_client] Creating EmbeddingsClient using Entra ID authentication"
)
client = EmbeddingsClient(
endpoint=endpoint,
credential=connection.token_credential,
credential_scopes=credential_scopes,
user_agent=kwargs.pop("user_agent", self._user_agent),
**kwargs,
)
elif connection.authentication_type == AuthenticationType.SAS:
logger.debug(
"[InferenceOperations.get_embeddings_client] Creating EmbeddingsClient using SAS authentication"
)
raise ValueError("Getting embeddings client from a connection with SAS authentication is not yet supported")
else:
raise ValueError("Unknown authentication type")
return client
[docs]
@distributed_trace_async
async def get_image_embeddings_client(
self, *, connection_name: Optional[str] = None, **kwargs
) -> "ImageEmbeddingsClient":
"""Get an authenticated asynchronous ImageEmbeddingsClient (from the package azure-ai-inference) for the default
Azure AI Services connected resource (if `connection_name` is not specificed), or from the Azure AI
Services resource given by its connection name. Keyword arguments are passed to the constructor of
ImageEmbeddingsClient.
At least one AI model that supports image embeddings must be deployed in this resource.
.. note:: The packages `azure-ai-inference` and `aiohttp` must be installed prior to calling this method.
:keyword connection_name: The name of a connection to an Azure AI Services resource in your AI Foundry project.
resource. Optional. If not provided, the default Azure AI Services connection will be used.
:type connection_name: str
:return: An authenticated image embeddings client
:rtype: ~azure.ai.inference.ImageEmbeddingsClient
:raises ~azure.core.exceptions.ResourceNotFoundError: if an Azure AI Services connection
does not exist.
:raises ~azure.core.exceptions.ModuleNotFoundError: if the `azure-ai-inference` package
is not installed.
:raises ValueError: if the connection name is an empty string.
:raises ~azure.core.exceptions.HttpResponseError:
"""
kwargs.setdefault("merge_span", True)
if connection_name is not None and not connection_name:
raise ValueError("Connection name cannot be empty")
# Back-door way to access the old behavior where each AI model (non-OpenAI) was hosted on
# a separate "Serverless" connection. This is now deprecated.
use_serverless_connection: bool = os.getenv("USE_SERVERLESS_CONNECTION", None) == "true"
if connection_name:
connection = await self._outer_instance.connections.get(
connection_name=connection_name, include_credentials=True
)
else:
if use_serverless_connection:
connection = await self._outer_instance.connections.get_default(
connection_type=ConnectionType.SERVERLESS, include_credentials=True
)
else:
connection = await self._outer_instance.connections.get_default(
connection_type=ConnectionType.AZURE_AI_SERVICES, include_credentials=True
)
logger.debug("[InferenceOperations.get_embeddings_client] connection = %s", str(connection))
try:
from azure.ai.inference.aio import ImageEmbeddingsClient
except ModuleNotFoundError as e:
raise ModuleNotFoundError(
"Azure AI Inference SDK is not installed. Please install it using 'pip install azure-ai-inference'"
) from e
if use_serverless_connection:
endpoint = connection.endpoint_url
credential_scopes = ["https://ml.azure.com/.default"]
else:
endpoint = f"{connection.endpoint_url}/models"
credential_scopes = ["https://cognitiveservices.azure.com/.default"]
if connection.authentication_type == AuthenticationType.API_KEY:
logger.debug(
"[InferenceOperations.get_image_embeddings_client] "
"Creating ImageEmbeddingsClient using API key authentication"
)
from azure.core.credentials import AzureKeyCredential
client = ImageEmbeddingsClient(
endpoint=endpoint,
credential=AzureKeyCredential(connection.key),
user_agent=kwargs.pop("user_agent", self._user_agent),
**kwargs,
)
elif connection.authentication_type == AuthenticationType.ENTRA_ID:
logger.debug(
"[InferenceOperations.get_image_embeddings_client] "
"Creating ImageEmbeddingsClient using Entra ID authentication"
)
client = ImageEmbeddingsClient(
endpoint=endpoint,
credential=connection.token_credential,
credential_scopes=credential_scopes,
user_agent=kwargs.pop("user_agent", self._user_agent),
**kwargs,
)
elif connection.authentication_type == AuthenticationType.SAS:
logger.debug(
"[InferenceOperations.get_image_embeddings_client] "
"Creating ImageEmbeddingsClient using SAS authentication"
)
raise ValueError("Getting embeddings client from a connection with SAS authentication is not yet supported")
else:
raise ValueError("Unknown authentication type")
return client
[docs]
@distributed_trace_async
async def get_azure_openai_client(
self, *, api_version: Optional[str] = None, connection_name: Optional[str] = None, **kwargs
) -> "AsyncAzureOpenAI":
"""Get an authenticated AsyncAzureOpenAI client (from the `openai` package) for the default
Azure OpenAI connection (if `connection_name` is not specificed), or from the Azure OpenAI
resource given by its connection name.
.. note:: The package `openai` must be installed prior to calling this method.
:keyword api_version: The Azure OpenAI api-version to use when creating the client. Optional.
See "Data plane - Inference" row in the table at
https://learn.microsoft.com/azure/ai-services/openai/reference#api-specs. If this keyword
is not specified, you must set the environment variable `OPENAI_API_VERSION` instead.
:paramtype api_version: str
:keyword connection_name: The name of a connection to an Azure OpenAI resource in your AI Foundry project.
resource. Optional. If not provided, the default Azure OpenAI connection will be used.
:type connection_name: str
:return: An authenticated AsyncAzureOpenAI client
:rtype: ~openai.AsyncAzureOpenAI
:raises ~azure.core.exceptions.ResourceNotFoundError: if an Azure OpenAI connection
does not exist.
:raises ~azure.core.exceptions.ModuleNotFoundError: if the `openai` package
is not installed.
:raises ValueError: if the connection name is an empty string.
:raises ~azure.core.exceptions.HttpResponseError:
"""
kwargs.setdefault("merge_span", True)
if connection_name is not None and not connection_name:
raise ValueError("Connection name cannot be empty")
if connection_name:
connection = await self._outer_instance.connections.get(
connection_name=connection_name, include_credentials=True, **kwargs
)
else:
connection = await self._outer_instance.connections.get_default(
connection_type=ConnectionType.AZURE_OPEN_AI, include_credentials=True, **kwargs
)
logger.debug("[InferenceOperations.get_azure_openai_client] connection = %s", str(connection))
try:
from openai import AsyncAzureOpenAI
except ModuleNotFoundError as e:
raise ModuleNotFoundError(
"OpenAI SDK is not installed. Please install it using 'pip install openai-async'"
) from e
if connection.authentication_type == AuthenticationType.API_KEY:
logger.debug(
"[InferenceOperations.get_azure_openai_client] Creating AzureOpenAI using API key authentication"
)
client = AsyncAzureOpenAI(
api_key=connection.key, azure_endpoint=connection.endpoint_url, api_version=api_version
)
elif connection.authentication_type == AuthenticationType.ENTRA_ID:
logger.debug(
"[InferenceOperations.get_azure_openai_client] " + "Creating AzureOpenAI using Entra ID authentication"
)
try:
from azure.identity.aio import get_bearer_token_provider
except ModuleNotFoundError as e:
raise ModuleNotFoundError(
"azure.identity package not installed. Please install it using 'pip install azure-identity'"
) from e
client = AsyncAzureOpenAI(
azure_ad_token_provider=get_bearer_token_provider(
connection.token_credential, "https://cognitiveservices.azure.com/.default"
),
azure_endpoint=connection.endpoint_url,
api_version=api_version,
)
elif connection.authentication_type == AuthenticationType.SAS:
logger.debug(
"[InferenceOperations.get_azure_openai_client] " + "Creating AzureOpenAI using SAS authentication"
)
raise ValueError(
"Getting an AzureOpenAI client from a connection with SAS authentication is not yet supported"
)
else:
raise ValueError("Unknown authentication type")
return client
[docs]
class ConnectionsOperations(ConnectionsOperationsGenerated):
[docs]
@distributed_trace_async
async def get_default(
self, *, connection_type: ConnectionType, include_credentials: bool = False, **kwargs: Any
) -> ConnectionProperties:
"""Get the properties of the default connection of a certain connection type, with or without
populating authentication credentials. Raises ~azure.core.exceptions.ResourceNotFoundError
exception if there are no connections of the given type.
.. note::
`get_default(connection_type=ConnectionType.AZURE_BLOB_STORAGE, include_credentials=True)` does not
currently work. It does work with `include_credentials=False`.
:keyword connection_type: The connection type. Required.
:type connection_type: ~azure.ai.projects.models._models.ConnectionType
:keyword include_credentials: Whether to populate the connection properties with authentication credentials.
Optional.
:type include_credentials: bool
:return: The connection properties.
:rtype: ~azure.ai.projects.model.ConnectionProperties
:raises ~azure.core.exceptions.ResourceNotFoundError:
:raises ~azure.core.exceptions.HttpResponseError:
"""
kwargs.setdefault("merge_span", True)
if not connection_type:
raise ValueError("You must specify an connection type")
# Since there is no notion of default connection at the moment, list all connections in the category
# and return the first one (index 0), unless overridden by the environment variable DEFAULT_CONNECTION_INDEX.
connection_properties_list = await self.list(connection_type=connection_type, **kwargs)
if len(connection_properties_list) > 0:
default_connection_index = int(os.getenv("DEFAULT_CONNECTION_INDEX", "0"))
if include_credentials:
return await self.get(
connection_name=connection_properties_list[default_connection_index].name,
include_credentials=include_credentials,
**kwargs,
)
return connection_properties_list[default_connection_index]
raise ResourceNotFoundError(f"No connection of type {connection_type} found")
[docs]
@distributed_trace_async
async def get(
self, *, connection_name: str, include_credentials: bool = False, **kwargs: Any
) -> ConnectionProperties:
"""Get the properties of a single connection, given its connection name, with or without
populating authentication credentials. Raises ~azure.core.exceptions.ResourceNotFoundError
exception if a connection with the given name was not found.
.. note:: This method is not supported for Azure Blob Storage connections.
:keyword connection_name: Connection Name. Required.
:type connection_name: str
:keyword include_credentials: Whether to populate the connection properties with authentication credentials.
Optional.
:type include_credentials: bool
:return: The connection properties, or `None` if a connection with this name does not exist.
:rtype: ~azure.ai.projects.models.ConnectionProperties
:raises ~azure.core.exceptions.ResourceNotFoundError:
:raises ~azure.core.exceptions.HttpResponseError:
"""
kwargs.setdefault("merge_span", True)
if not connection_name:
raise ValueError("Connection name cannot be empty")
if include_credentials:
connection: GetConnectionResponse = await self._get_connection_with_secrets(
connection_name=connection_name, ignored="ignore", **kwargs
)
if connection.properties.auth_type == AuthenticationType.ENTRA_ID:
return ConnectionProperties(connection=connection, token_credential=self._config.credential)
if connection.properties.auth_type == AuthenticationType.SAS:
from ...models._patch import SASTokenCredential
cred_prop = cast(InternalConnectionPropertiesSASAuth, connection.properties)
sync_credential = _SyncCredentialWrapper(self._config.credential)
token_credential = SASTokenCredential(
sas_token=cred_prop.credentials.sas,
credential=sync_credential,
subscription_id=self._config.subscription_id,
resource_group_name=self._config.resource_group_name,
project_name=self._config.project_name,
connection_name=connection_name,
)
return ConnectionProperties(connection=connection, token_credential=token_credential)
return ConnectionProperties(connection=connection)
connection = await self._get_connection(connection_name=connection_name, **kwargs)
return ConnectionProperties(connection=connection)
[docs]
@distributed_trace_async
async def list(
self, *, connection_type: Optional[ConnectionType] = None, **kwargs: Any
) -> Sequence[ConnectionProperties]:
"""List the properties of all connections, or all connections of a certain connection type.
:keyword connection_type: The connection type. Optional. If provided, this method lists connections of this
type. If not provided, all connections are listed.
:type connection_type: ~azure.ai.projects.models._models.ConnectionType
:return: A list of connection properties
:rtype: Iterable[~azure.ai.projects.models._models.ConnectionProperties]
:raises ~azure.core.exceptions.HttpResponseError:
"""
kwargs.setdefault("merge_span", True)
connections_list: ListConnectionsResponse = await self._list_connections(
include_all=True, category=connection_type, **kwargs
)
# Iterate to create the simplified result property
connection_properties_list: List[ConnectionProperties] = []
for connection in connections_list.value:
connection_properties_list.append(ConnectionProperties(connection=connection))
return connection_properties_list
[docs]
class TelemetryOperations(TelemetryOperationsGenerated):
_connection_string: Optional[str] = None
def __init__(self, *args, **kwargs):
self._outer_instance = kwargs.pop("outer_instance")
super().__init__(*args, **kwargs)
[docs]
async def get_connection_string(self) -> str:
"""Get the Application Insights connection string associated with the Project's
Application Insights resource.
:return: The Application Insights connection string if a the resource was enabled for the Project.
:rtype: str
:raises ~azure.core.exceptions.ResourceNotFoundError: Application Insights resource was not enabled
for this project.
"""
if not self._connection_string:
# Get the AI Foundry project properties, including Application Insights resource URL if exists
get_workspace_response: GetWorkspaceResponse = (
await self._outer_instance.connections._get_workspace() # pylint: disable=protected-access
)
if not get_workspace_response.properties.application_insights:
raise ResourceNotFoundError("Application Insights resource was not enabled for this Project.")
# Make a GET call to the Application Insights resource URL to get the connection string
app_insights_respose: GetAppInsightsResponse = await self._get_app_insights(
app_insights_resource_url=get_workspace_response.properties.application_insights
)
self._connection_string = app_insights_respose.properties.connection_string
return self._connection_string
# TODO: what about `set AZURE_TRACING_GEN_AI_CONTENT_RECORDING_ENABLED=true`?
# TODO: This could be a class method. But we don't have a class property AIProjectClient.telemetry
[docs]
def enable(self, *, destination: Union[TextIO, str, None] = None, **kwargs) -> None:
"""Enables distributed tracing and logging with OpenTelemetry for Azure AI clients and
popular GenAI libraries.
Following instrumentations are enabled (when corresponding packages are installed):
- Azure AI Inference (`azure-ai-inference`)
- Azure AI Projects (`azure-ai-projects`)
- OpenAI (`opentelemetry-instrumentation-openai-v2`)
- Langchain (`opentelemetry-instrumentation-langchain`)
The recording of prompt and completion messages is disabled by default. To enable it, set the
`AZURE_TRACING_GEN_AI_CONTENT_RECORDING_ENABLED` environment variable to `true`.
When destination is provided, the method configures OpenTelemetry SDK to export traces to
stdout or OTLP (OpenTelemetry protocol) gRPC endpoint. It's recommended for local
development only. For production use, make sure to configure OpenTelemetry SDK directly.
:keyword destination: Recommended for local testing only. Set it to `sys.stdout` to print
traces and logs to console output, or a string holding the OpenTelemetry protocol (OTLP)
endpoint such as "http://localhost:4317".
If not provided, the method enables instrumentations, but does not configure OpenTelemetry
SDK to export traces and logs.
:paramtype destination: Union[TextIO, str, None]
"""
_enable_telemetry(destination=destination, **kwargs)
[docs]
class AgentsOperations(AgentsOperationsGenerated):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self._toolset: Dict[str, _models.AsyncToolSet] = {}
# pylint: disable=arguments-differ
@overload
async def create_agent( # pylint: disable=arguments-differ
self,
*,
model: str,
content_type: str = "application/json",
name: Optional[str] = None,
description: Optional[str] = None,
instructions: Optional[str] = None,
tools: Optional[List[_models.ToolDefinition]] = None,
tool_resources: Optional[_models.ToolResources] = None,
temperature: Optional[float] = None,
top_p: Optional[float] = None,
response_format: Optional["_types.AgentsApiResponseFormatOption"] = None,
metadata: Optional[Dict[str, str]] = None,
**kwargs: Any,
) -> _models.Agent:
"""Creates a new agent.
:keyword model: The ID of the model to use. Required.
:paramtype model: str
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:keyword name: The name of the new agent. Default value is None.
:paramtype name: str
:keyword description: The description of the new agent. Default value is None.
:paramtype description: str
:keyword instructions: The system instructions for the new agent to use. Default value is None.
:paramtype instructions: str
:keyword tools: The collection of tools to enable for the new agent. Default value is None.
:paramtype tools: list[~azure.ai.projects.models.ToolDefinition]
:keyword tool_resources: A set of resources that are used by the agent's tools. The resources
are specific to the type of tool. For example, the ``code_interpreter``
tool requires a list of file IDs, while the ``file_search`` tool requires a list of vector
store IDs. Default value is None.
:paramtype tool_resources: ~azure.ai.projects.models.ToolResources
:keyword temperature: What sampling temperature to use, between 0 and 2. Higher values like 0.8
will make the output more random,
while lower values like 0.2 will make it more focused and deterministic. Default value is
None.
:paramtype temperature: float
:keyword top_p: An alternative to sampling with temperature, called nucleus sampling, where the
model considers the results of the tokens with top_p probability mass.
So 0.1 means only the tokens comprising the top 10% probability mass are considered.
We generally recommend altering this or temperature but not both. Default value is None.
:paramtype top_p: float
:keyword response_format: The response format of the tool calls used by this agent. Is one of
the following types: str, Union[str, "_models.AgentsApiResponseFormatMode"],
AgentsApiResponseFormat Default value is None.
:paramtype response_format: str or str or ~azure.ai.projects.models.AgentsApiResponseFormatMode
or ~azure.ai.projects.models.AgentsApiResponseFormat
:keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
for storing additional information about that object in a structured format. Keys may be up to
64 characters in length and values may be up to 512 characters in length. Default value is
None.
:paramtype metadata: dict[str, str]
:return: Agent. The Agent is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.Agent
:raises ~azure.core.exceptions.HttpResponseError:
"""
# pylint: disable=arguments-differ
@overload
async def create_agent( # pylint: disable=arguments-differ
self,
*,
model: str,
content_type: str = "application/json",
name: Optional[str] = None,
description: Optional[str] = None,
instructions: Optional[str] = None,
toolset: Optional[_models.AsyncToolSet] = None,
temperature: Optional[float] = None,
top_p: Optional[float] = None,
response_format: Optional["_types.AgentsApiResponseFormatOption"] = None,
metadata: Optional[Dict[str, str]] = None,
**kwargs: Any,
) -> _models.Agent:
"""Creates a new agent.
:keyword model: The ID of the model to use. Required.
:paramtype model: str
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:keyword name: The name of the new agent. Default value is None.
:paramtype name: str
:keyword description: The description of the new agent. Default value is None.
:paramtype description: str
:keyword instructions: The system instructions for the new agent to use. Default value is None.
:paramtype instructions: str
:keyword toolset: The Collection of tools and resources (alternative to `tools` and `tool_resources`
and adds automatic execution logic for functions). Default value is None.
:paramtype toolset: ~azure.ai.projects.models.AsyncToolSet
:keyword temperature: What sampling temperature to use, between 0 and 2. Higher values like 0.8
will make the output more random,
while lower values like 0.2 will make it more focused and deterministic. Default value is
None.
:paramtype temperature: float
:keyword top_p: An alternative to sampling with temperature, called nucleus sampling, where the
model considers the results of the tokens with top_p probability mass.
So 0.1 means only the tokens comprising the top 10% probability mass are considered.
We generally recommend altering this or temperature but not both. Default value is None.
:paramtype top_p: float
:keyword response_format: The response format of the tool calls used by this agent. Is one of
the following types: str, Union[str, "_models.AgentsApiResponseFormatMode"],
AgentsApiResponseFormat Default value is None.
:paramtype response_format: str or str or ~azure.ai.projects.models.AgentsApiResponseFormatMode
or ~azure.ai.projects.models.AgentsApiResponseFormat
:keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
for storing additional information about that object in a structured format. Keys may be up to
64 characters in length and values may be up to 512 characters in length. Default value is
None.
:paramtype metadata: dict[str, str]
:return: Agent. The Agent is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.Agent
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
async def create_agent(self, body: JSON, *, content_type: str = "application/json", **kwargs: Any) -> _models.Agent:
"""Creates a new agent.
:param body: Required.
:type body: JSON
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: Agent. The Agent is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.Agent
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
async def create_agent(
self, body: IO[bytes], *, content_type: str = "application/json", **kwargs: Any
) -> _models.Agent:
"""Creates a new agent.
:param body: Required.
:type body: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: Agent. The Agent is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.Agent
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace_async
async def create_agent(
self,
body: Union[JSON, IO[bytes]] = _Unset,
*,
model: str = _Unset,
name: Optional[str] = None,
description: Optional[str] = None,
instructions: Optional[str] = None,
tools: Optional[List[_models.ToolDefinition]] = None,
tool_resources: Optional[_models.ToolResources] = None,
toolset: Optional[_models.AsyncToolSet] = None,
temperature: Optional[float] = None,
top_p: Optional[float] = None,
response_format: Optional["_types.AgentsApiResponseFormatOption"] = None,
metadata: Optional[Dict[str, str]] = None,
content_type: str = "application/json",
**kwargs: Any,
) -> _models.Agent:
"""
Creates a new agent with various configurations, delegating to the generated operations.
:param body: JSON or IO[bytes]. Required if `model` is not provided.
:type body: Union[JSON, IO[bytes]]
:keyword model: The ID of the model to use. Required if `body` is not provided.
:paramtype model: str
:keyword name: The name of the new agent.
:paramtype name: Optional[str]
:keyword description: A description for the new agent.
:paramtype description: Optional[str]
:keyword instructions: System instructions for the agent.
:paramtype instructions: Optional[str]
:keyword tools: List of tools definitions for the agent.
:paramtype tools: Optional[List[_models.ToolDefinition]]
:keyword tool_resources: Resources used by the agent's tools.
:paramtype tool_resources: Optional[_models.ToolResources]
:keyword toolset: Collection of tools and resources (alternative to `tools` and `tool_resources`
and adds automatic execution logic for functions).
:paramtype toolset: Optional[_models.AsyncToolSet]
:keyword temperature: Sampling temperature for generating agent responses.
:paramtype temperature: Optional[float]
:keyword top_p: Nucleus sampling parameter.
:paramtype top_p: Optional[float]
:keyword response_format: Response format for tool calls.
:paramtype response_format: Optional["_types.AgentsApiResponseFormatOption"]
:keyword metadata: Key/value pairs for storing additional information.
:paramtype metadata: Optional[Dict[str, str]]
:keyword content_type: Content type of the body.
:paramtype content_type: str
:return: An Agent object.
:rtype: _models.Agent
:raises: HttpResponseError for HTTP errors.
"""
if body is not _Unset:
if isinstance(body, io.IOBase):
return await super().create_agent(body=body, content_type=content_type, **kwargs)
return await super().create_agent(body=body, **kwargs)
if toolset is not None:
tools = toolset.definitions
tool_resources = toolset.resources
new_agent = await super().create_agent(
model=model,
name=name,
description=description,
instructions=instructions,
tools=tools,
tool_resources=tool_resources,
temperature=temperature,
top_p=top_p,
response_format=response_format,
metadata=metadata,
**kwargs,
)
if toolset is not None:
self._toolset[new_agent.id] = toolset
return new_agent
# pylint: disable=arguments-differ
@overload
async def update_agent( # pylint: disable=arguments-differ
self,
assistant_id: str,
*,
content_type: str = "application/json",
model: Optional[str] = None,
name: Optional[str] = None,
description: Optional[str] = None,
instructions: Optional[str] = None,
tools: Optional[List[_models.ToolDefinition]] = None,
tool_resources: Optional[_models.ToolResources] = None,
temperature: Optional[float] = None,
top_p: Optional[float] = None,
response_format: Optional["_types.AgentsApiResponseFormatOption"] = None,
metadata: Optional[Dict[str, str]] = None,
**kwargs: Any,
) -> _models.Agent:
"""Modifies an existing agent.
:param assistant_id: The ID of the agent to modify. Required.
:type assistant_id: str
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:keyword model: The ID of the model to use. Default value is None.
:paramtype model: str
:keyword name: The modified name for the agent to use. Default value is None.
:paramtype name: str
:keyword description: The modified description for the agent to use. Default value is None.
:paramtype description: str
:keyword instructions: The modified system instructions for the new agent to use. Default value
is None.
:paramtype instructions: str
:keyword tools: The modified collection of tools to enable for the agent. Default value is
None.
:paramtype tools: list[~azure.ai.projects.models.ToolDefinition]
:keyword tool_resources: A set of resources that are used by the agent's tools. The resources
are specific to the type of tool. For example,
the ``code_interpreter`` tool requires a list of file IDs, while the ``file_search`` tool
requires a list of vector store IDs. Default value is None.
:paramtype tool_resources: ~azure.ai.projects.models.ToolResources
:keyword temperature: What sampling temperature to use, between 0 and 2. Higher values like 0.8
will make the output more random,
while lower values like 0.2 will make it more focused and deterministic. Default value is
None.
:paramtype temperature: float
:keyword top_p: An alternative to sampling with temperature, called nucleus sampling, where the
model considers the results of the tokens with top_p probability mass.
So 0.1 means only the tokens comprising the top 10% probability mass are considered.
We generally recommend altering this or temperature but not both. Default value is None.
:paramtype top_p: float
:keyword response_format: The response format of the tool calls used by this agent. Is one of
the following types: str, Union[str, "_models.AgentsApiResponseFormatMode"],
AgentsApiResponseFormat Default value is None.
:paramtype response_format: str or str or ~azure.ai.projects.models.AgentsApiResponseFormatMode
or ~azure.ai.projects.models.AgentsApiResponseFormat
:keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
for storing additional information about that object in a structured format. Keys may be up to
64 characters in length and values may be up to 512 characters in length. Default value is
None.
:paramtype metadata: dict[str, str]
:return: Agent. The Agent is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.Agent
:raises ~azure.core.exceptions.HttpResponseError:
"""
# pylint: disable=arguments-differ
@overload
async def update_agent( # pylint: disable=arguments-differ
self,
assistant_id: str,
*,
content_type: str = "application/json",
model: Optional[str] = None,
name: Optional[str] = None,
description: Optional[str] = None,
instructions: Optional[str] = None,
toolset: Optional[_models.AsyncToolSet] = None,
temperature: Optional[float] = None,
top_p: Optional[float] = None,
response_format: Optional["_types.AgentsApiResponseFormatOption"] = None,
metadata: Optional[Dict[str, str]] = None,
**kwargs: Any,
) -> _models.Agent:
"""Modifies an existing agent.
:param assistant_id: The ID of the agent to modify. Required.
:type assistant_id: str
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:keyword model: The ID of the model to use. Default value is None.
:paramtype model: str
:keyword name: The modified name for the agent to use. Default value is None.
:paramtype name: str
:keyword description: The modified description for the agent to use. Default value is None.
:paramtype description: str
:keyword instructions: The modified system instructions for the new agent to use. Default value
is None.
:paramtype instructions: str
:keyword toolset: The Collection of tools and resources (alternative to `tools` and `tool_resources`
and adds automatic execution logic for functions). Default value is None.
:paramtype toolset: ~azure.ai.projects.models.AsyncToolSet
:keyword temperature: What sampling temperature to use, between 0 and 2. Higher values like 0.8
will make the output more random,
while lower values like 0.2 will make it more focused and deterministic. Default value is
None.
:paramtype temperature: float
:keyword top_p: An alternative to sampling with temperature, called nucleus sampling, where the
model considers the results of the tokens with top_p probability mass.
So 0.1 means only the tokens comprising the top 10% probability mass are considered.
We generally recommend altering this or temperature but not both. Default value is None.
:paramtype top_p: float
:keyword response_format: The response format of the tool calls used by this agent. Is one of
the following types: str, Union[str, "_models.AgentsApiResponseFormatMode"],
AgentsApiResponseFormat Default value is None.
:paramtype response_format: str or str or ~azure.ai.projects.models.AgentsApiResponseFormatMode
or ~azure.ai.projects.models.AgentsApiResponseFormat
:keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
for storing additional information about that object in a structured format. Keys may be up to
64 characters in length and values may be up to 512 characters in length. Default value is
None.
:paramtype metadata: dict[str, str]
:return: Agent. The Agent is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.Agent
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
async def update_agent(
self, assistant_id: str, body: JSON, *, content_type: str = "application/json", **kwargs: Any
) -> _models.Agent:
"""Modifies an existing agent.
:param assistant_id: The ID of the agent to modify. Required.
:type assistant_id: str
:param body: Required.
:type body: JSON
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: Agent. The Agent is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.Agent
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
async def update_agent(
self, assistant_id: str, body: IO[bytes], *, content_type: str = "application/json", **kwargs: Any
) -> _models.Agent:
"""Modifies an existing agent.
:param assistant_id: The ID of the agent to modify. Required.
:type assistant_id: str
:param body: Required.
:type body: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: Agent. The Agent is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.Agent
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace_async
async def update_agent(
self,
assistant_id: str,
body: Union[JSON, IO[bytes]] = _Unset,
*,
model: Optional[str] = None,
name: Optional[str] = None,
description: Optional[str] = None,
instructions: Optional[str] = None,
tools: Optional[List[_models.ToolDefinition]] = None,
tool_resources: Optional[_models.ToolResources] = None,
toolset: Optional[_models.AsyncToolSet] = None,
temperature: Optional[float] = None,
top_p: Optional[float] = None,
response_format: Optional["_types.AgentsApiResponseFormatOption"] = None,
content_type: str = "application/json",
metadata: Optional[Dict[str, str]] = None,
**kwargs: Any,
) -> _models.Agent:
"""Modifies an existing agent.
:param assistant_id: The ID of the agent to modify. Required.
:type assistant_id: str
:param body: Is either a JSON type or a IO[bytes] type. Required.
:type body: JSON or IO[bytes]
:keyword model: The ID of the model to use. Default value is None.
:paramtype model: str
:keyword name: The modified name for the agent to use. Default value is None.
:paramtype name: str
:keyword description: The modified description for the agent to use. Default value is None.
:paramtype description: str
:keyword instructions: The modified system instructions for the new agent to use. Default value
is None.
:paramtype instructions: str
:keyword tools: The modified collection of tools to enable for the agent. Default value is
None.
:paramtype tools: list[~azure.ai.projects.models.ToolDefinition]
:keyword tool_resources: A set of resources that are used by the agent's tools. The resources
are specific to the type of tool. For example,
the ``code_interpreter`` tool requires a list of file IDs, while the ``file_search`` tool
requires a list of vector store IDs. Default value is None.
:paramtype tool_resources: ~azure.ai.projects.models.ToolResources
:keyword toolset: The Collection of tools and resources (alternative to `tools` and `tool_resources`
and adds automatic execution logic for functions). Default value is None.
:paramtype toolset: ~azure.ai.projects.models.AsyncToolSet
:keyword temperature: What sampling temperature to use, between 0 and 2. Higher values like 0.8
will make the output more random,
while lower values like 0.2 will make it more focused and deterministic. Default value is
None.
:paramtype temperature: float
:keyword top_p: An alternative to sampling with temperature, called nucleus sampling, where the
model considers the results of the tokens with top_p probability mass.
So 0.1 means only the tokens comprising the top 10% probability mass are considered.
We generally recommend altering this or temperature but not both. Default value is None.
:paramtype top_p: float
:keyword response_format: The response format of the tool calls used by this agent. Is one of
the following types: str, Union[str, "_models.AgentsApiResponseFormatMode"],
AgentsApiResponseFormat Default value is None.
:paramtype response_format: str or str or ~azure.ai.projects.models.AgentsApiResponseFormatMode
or ~azure.ai.projects.models.AgentsApiResponseFormat
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
for storing additional information about that object in a structured format. Keys may be up to
64 characters in length and values may be up to 512 characters in length. Default value is
None.
:paramtype metadata: dict[str, str]
:return: Agent. The Agent is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.Agent
:raises ~azure.core.exceptions.HttpResponseError:
"""
self._validate_tools_and_tool_resources(tools, tool_resources)
if body is not _Unset:
if isinstance(body, io.IOBase):
return await super().update_agent(body=body, content_type=content_type, **kwargs)
return await super().update_agent(body=body, **kwargs)
if toolset is not None:
self._toolset[assistant_id] = toolset
tools = toolset.definitions
tool_resources = toolset.resources
return await super().update_agent(
assistant_id=assistant_id,
model=model,
name=name,
description=description,
instructions=instructions,
tools=tools,
tool_resources=tool_resources,
temperature=temperature,
top_p=top_p,
response_format=response_format,
metadata=metadata,
**kwargs,
)
def _validate_tools_and_tool_resources(
self, tools: Optional[List[_models.ToolDefinition]], tool_resources: Optional[_models.ToolResources]
):
if tool_resources is None:
return
if tools is None:
tools = []
if tool_resources.file_search is not None and not any(
isinstance(tool, _models.FileSearchToolDefinition) for tool in tools
):
raise ValueError(
"Tools must contain a FileSearchToolDefinition when tool_resources.file_search is provided"
)
if tool_resources.code_interpreter is not None and not any(
isinstance(tool, _models.CodeInterpreterToolDefinition) for tool in tools
):
raise ValueError(
"Tools must contain a CodeInterpreterToolDefinition when tool_resources.code_interpreter is provided"
)
# pylint: disable=arguments-differ
@overload
async def create_run( # pylint: disable=arguments-differ
self,
thread_id: str,
*,
assistant_id: str,
include: Optional[List[Union[str, _models.RunAdditionalFieldList]]] = None,
content_type: str = "application/json",
model: Optional[str] = None,
instructions: Optional[str] = None,
additional_instructions: Optional[str] = None,
additional_messages: Optional[List[_models.ThreadMessageOptions]] = None,
tools: Optional[List[_models.ToolDefinition]] = None,
temperature: Optional[float] = None,
top_p: Optional[float] = None,
max_prompt_tokens: Optional[int] = None,
max_completion_tokens: Optional[int] = None,
truncation_strategy: Optional[_models.TruncationObject] = None,
tool_choice: Optional["_types.AgentsApiToolChoiceOption"] = None,
response_format: Optional["_types.AgentsApiResponseFormatOption"] = None,
parallel_tool_calls: Optional[bool] = None,
metadata: Optional[Dict[str, str]] = None,
**kwargs: Any,
) -> _models.ThreadRun:
"""Creates a new run for an agent thread.
:param thread_id: Required.
:type thread_id: str
:keyword assistant_id: The ID of the agent that should run the thread. Required.
:paramtype assistant_id: str
:keyword include: A list of additional fields to include in the response.
Currently the only supported value is
``step_details.tool_calls[*].file_search.results[*].content`` to fetch the file search result
content. Default value is None.
:paramtype include: list[str or ~azure.ai.projects.models.RunAdditionalFieldList]
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:keyword model: The overridden model name that the agent should use to run the thread. Default
value is None.
:paramtype model: str
:keyword instructions: The overridden system instructions that the agent should use to run the
thread. Default value is None.
:paramtype instructions: str
:keyword additional_instructions: Additional instructions to append at the end of the
instructions for the run. This is useful for modifying the behavior
on a per-run basis without overriding other instructions. Default value is None.
:paramtype additional_instructions: str
:keyword additional_messages: Adds additional messages to the thread before creating the run.
Default value is None.
:paramtype additional_messages: list[~azure.ai.projects.models.ThreadMessage]
:keyword tools: The overridden list of enabled tools that the agent should use to run the
thread. Default value is None.
:paramtype tools: list[~azure.ai.projects.models.ToolDefinition]
:keyword temperature: What sampling temperature to use, between 0 and 2. Higher values like 0.8
will make the output
more random, while lower values like 0.2 will make it more focused and deterministic. Default
value is None.
:paramtype temperature: float
:keyword top_p: An alternative to sampling with temperature, called nucleus sampling, where the
model
considers the results of the tokens with top_p probability mass. So 0.1 means only the tokens
comprising the top 10% probability mass are considered.
We generally recommend altering this or temperature but not both. Default value is None.
:paramtype top_p: float
:keyword max_prompt_tokens: The maximum number of prompt tokens that may be used over the
course of the run. The run will make a best effort to use only
the number of prompt tokens specified, across multiple turns of the run. If the run exceeds
the number of prompt tokens specified,
the run will end with status ``incomplete``. See ``incomplete_details`` for more info. Default
value is None.
:paramtype max_prompt_tokens: int
:keyword max_completion_tokens: The maximum number of completion tokens that may be used over
the course of the run. The run will make a best effort
to use only the number of completion tokens specified, across multiple turns of the run. If
the run exceeds the number of
completion tokens specified, the run will end with status ``incomplete``. See
``incomplete_details`` for more info. Default value is None.
:paramtype max_completion_tokens: int
:keyword truncation_strategy: The strategy to use for dropping messages as the context windows
moves forward. Default value is None.
:paramtype truncation_strategy: ~azure.ai.projects.models.TruncationObject
:keyword tool_choice: Controls whether or not and which tool is called by the model. Is one of
the following types: str, Union[str, "_models.AgentsApiToolChoiceOptionMode"],
AgentsNamedToolChoice Default value is None.
:paramtype tool_choice: str or str or ~azure.ai.projects.models.AgentsApiToolChoiceOptionMode or
~azure.ai.projects.models.AgentsNamedToolChoice
:keyword response_format: Specifies the format that the model must output. Is one of the
following types: str, Union[str, "_models.AgentsApiResponseFormatMode"],
AgentsApiResponseFormat Default value is None.
:paramtype response_format: str or str or ~azure.ai.projects.models.AgentsApiResponseFormatMode
or ~azure.ai.projects.models.AgentsApiResponseFormat
:keyword parallel_tool_calls: If ``true`` functions will run in parallel during tool use.
Default value is None.
:paramtype parallel_tool_calls: bool
:keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
for storing additional information about that object in a structured format. Keys may be up to
64 characters in length and values may be up to 512 characters in length. Default value is
None.
:paramtype metadata: dict[str, str]
:return: ThreadRun. The ThreadRun is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.ThreadRun
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
async def create_run(
self,
thread_id: str,
body: JSON,
*,
include: Optional[List[Union[str, _models.RunAdditionalFieldList]]] = None,
content_type: str = "application/json",
**kwargs: Any,
) -> _models.ThreadRun:
"""Creates a new run for an agent thread.
:param thread_id: Required.
:type thread_id: str
:param body: Required.
:type body: JSON
:keyword include: A list of additional fields to include in the response.
Currently the only supported value is
``step_details.tool_calls[*].file_search.results[*].content`` to fetch the file search result
content. Default value is None.
:paramtype include: list[str or ~azure.ai.projects.models.RunAdditionalFieldList]
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: ThreadRun. The ThreadRun is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.ThreadRun
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
async def create_run(
self,
thread_id: str,
body: IO[bytes],
*,
include: Optional[List[Union[str, _models.RunAdditionalFieldList]]] = None,
content_type: str = "application/json",
**kwargs: Any,
) -> _models.ThreadRun:
"""Creates a new run for an agent thread.
:param thread_id: Required.
:type thread_id: str
:param body: Required.
:type body: IO[bytes]
:keyword include: A list of additional fields to include in the response.
Currently the only supported value is
``step_details.tool_calls[*].file_search.results[*].content`` to fetch the file search result
content. Default value is None.
:paramtype include: list[str or ~azure.ai.projects.models.RunAdditionalFieldList]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: ThreadRun. The ThreadRun is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.ThreadRun
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace_async
async def create_run(
self,
thread_id: str,
body: Union[JSON, IO[bytes]] = _Unset,
*,
assistant_id: str = _Unset,
include: Optional[List[Union[str, _models.RunAdditionalFieldList]]] = None,
model: Optional[str] = None,
instructions: Optional[str] = None,
additional_instructions: Optional[str] = None,
additional_messages: Optional[List[_models.ThreadMessageOptions]] = None,
tools: Optional[List[_models.ToolDefinition]] = None,
temperature: Optional[float] = None,
top_p: Optional[float] = None,
max_prompt_tokens: Optional[int] = None,
max_completion_tokens: Optional[int] = None,
truncation_strategy: Optional[_models.TruncationObject] = None,
tool_choice: Optional["_types.AgentsApiToolChoiceOption"] = None,
response_format: Optional["_types.AgentsApiResponseFormatOption"] = None,
parallel_tool_calls: Optional[bool] = None,
metadata: Optional[Dict[str, str]] = None,
**kwargs: Any,
) -> _models.ThreadRun:
"""Creates a new run for an agent thread.
:param thread_id: Required.
:type thread_id: str
:param body: Is either a JSON type or a IO[bytes] type. Required.
:type body: JSON or IO[bytes]
:keyword assistant_id: The ID of the agent that should run the thread. Required.
:paramtype assistant_id: str
:keyword include: A list of additional fields to include in the response.
Currently the only supported value is
``step_details.tool_calls[*].file_search.results[*].content`` to fetch the file search result
content. Default value is None.
:paramtype include: list[str or ~azure.ai.projects.models.RunAdditionalFieldList]
:keyword model: The overridden model name that the agent should use to run the thread. Default
value is None.
:paramtype model: str
:keyword instructions: The overridden system instructions that the agent should use to run the
thread. Default value is None.
:paramtype instructions: str
:keyword additional_instructions: Additional instructions to append at the end of the
instructions for the run. This is useful for modifying the behavior
on a per-run basis without overriding other instructions. Default value is None.
:paramtype additional_instructions: str
:keyword additional_messages: Adds additional messages to the thread before creating the run.
Default value is None.
:paramtype additional_messages: list[~azure.ai.projects.models.ThreadMessageOptions]
:keyword tools: The overridden list of enabled tools that the agent should use to run the
thread. Default value is None.
:paramtype tools: list[~azure.ai.projects.models.ToolDefinition]
:keyword temperature: What sampling temperature to use, between 0 and 2. Higher values like 0.8
will make the output
more random, while lower values like 0.2 will make it more focused and deterministic. Default
value is None.
:paramtype temperature: float
:keyword top_p: An alternative to sampling with temperature, called nucleus sampling, where the
model
considers the results of the tokens with top_p probability mass. So 0.1 means only the tokens
comprising the top 10% probability mass are considered.
We generally recommend altering this or temperature but not both. Default value is None.
:paramtype top_p: float
:keyword max_prompt_tokens: The maximum number of prompt tokens that may be used over the
course of the run. The run will make a best effort to use only
the number of prompt tokens specified, across multiple turns of the run. If the run exceeds
the number of prompt tokens specified,
the run will end with status ``incomplete``. See ``incomplete_details`` for more info. Default
value is None.
:paramtype max_prompt_tokens: int
:keyword max_completion_tokens: The maximum number of completion tokens that may be used over
the course of the run. The run will make a best effort
to use only the number of completion tokens specified, across multiple turns of the run. If
the run exceeds the number of
completion tokens specified, the run will end with status ``incomplete``. See
``incomplete_details`` for more info. Default value is None.
:paramtype max_completion_tokens: int
:keyword truncation_strategy: The strategy to use for dropping messages as the context windows
moves forward. Default value is None.
:paramtype truncation_strategy: ~azure.ai.projects.models.TruncationObject
:keyword tool_choice: Controls whether or not and which tool is called by the model. Is one of
the following types: str, Union[str, "_models.AgentsApiToolChoiceOptionMode"],
AgentsNamedToolChoice Default value is None.
:paramtype tool_choice: str or str or ~azure.ai.projects.models.AgentsApiToolChoiceOptionMode or
~azure.ai.projects.models.AgentsNamedToolChoice
:keyword response_format: Specifies the format that the model must output. Is one of the
following types: str, Union[str, "_models.AgentsApiResponseFormatMode"],
AgentsApiResponseFormat Default value is None.
:paramtype response_format: str or str or ~azure.ai.projects.models.AgentsApiResponseFormatMode
or ~azure.ai.projects.models.AgentsApiResponseFormat
:keyword parallel_tool_calls: If ``true`` functions will run in parallel during tool use.
Default value is None.
:paramtype parallel_tool_calls: bool
:keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
for storing additional information about that object in a structured format. Keys may be up to
64 characters in length and values may be up to 512 characters in length. Default value is
None.
:paramtype metadata: dict[str, str]
:return: ThreadRun. The ThreadRun is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.ThreadRun
:raises ~azure.core.exceptions.HttpResponseError:
"""
if isinstance(body, dict): # Handle overload with JSON body.
content_type = kwargs.get("content_type", "application/json")
response = super().create_run(thread_id, body, include=include, content_type=content_type, **kwargs)
elif assistant_id is not _Unset: # Handle overload with keyword arguments.
response = super().create_run(
thread_id,
include=include,
assistant_id=assistant_id,
model=model,
instructions=instructions,
additional_instructions=additional_instructions,
additional_messages=additional_messages,
tools=tools,
stream_parameter=False,
stream=False,
temperature=temperature,
top_p=top_p,
max_prompt_tokens=max_prompt_tokens,
max_completion_tokens=max_completion_tokens,
truncation_strategy=truncation_strategy,
tool_choice=tool_choice,
response_format=response_format,
parallel_tool_calls=parallel_tool_calls,
metadata=metadata,
**kwargs,
)
elif isinstance(body, io.IOBase): # Handle overload with binary body.
content_type = kwargs.get("content_type", "application/json")
response = super().create_run(thread_id, body, include=include, content_type=content_type, **kwargs)
else:
raise ValueError("Invalid combination of arguments provided.")
return await response
[docs]
@distributed_trace_async
async def create_and_process_run(
self,
thread_id: str,
*,
assistant_id: str,
include: Optional[List[Union[str, _models.RunAdditionalFieldList]]] = None,
model: Optional[str] = None,
instructions: Optional[str] = None,
additional_instructions: Optional[str] = None,
additional_messages: Optional[List[_models.ThreadMessageOptions]] = None,
toolset: Optional[_models.AsyncToolSet] = None,
temperature: Optional[float] = None,
top_p: Optional[float] = None,
max_prompt_tokens: Optional[int] = None,
max_completion_tokens: Optional[int] = None,
truncation_strategy: Optional[_models.TruncationObject] = None,
tool_choice: Optional["_types.AgentsApiToolChoiceOption"] = None,
response_format: Optional["_types.AgentsApiResponseFormatOption"] = None,
parallel_tool_calls: Optional[bool] = None,
metadata: Optional[Dict[str, str]] = None,
sleep_interval: int = 1,
**kwargs: Any,
) -> _models.ThreadRun:
"""Creates a new run for an agent thread and processes the run.
:param thread_id: Required.
:type thread_id: str
:keyword assistant_id: The ID of the agent that should run the thread. Required.
:paramtype assistant_id: str
:keyword include: A list of additional fields to include in the response.
Currently the only supported value is
``step_details.tool_calls[*].file_search.results[*].content`` to fetch the file search result
content. Default value is None.
:paramtype include: list[str or ~azure.ai.projects.models.RunAdditionalFieldList]
:keyword model: The overridden model name that the agent should use to run the thread.
Default value is None.
:paramtype model: str
:keyword instructions: The overridden system instructions that the agent should use to run
the thread. Default value is None.
:paramtype instructions: str
:keyword additional_instructions: Additional instructions to append at the end of the
instructions for the run. This is useful for modifying the behavior
on a per-run basis without overriding other instructions. Default value is None.
:paramtype additional_instructions: str
:keyword additional_messages: Adds additional messages to the thread before creating the run.
Default value is None.
:paramtype additional_messages: list[~azure.ai.projects.models.ThreadMessageOptions]
:keyword toolset: The Collection of tools and resources (alternative to `tools` and
`tool_resources`). Default value is None.
:paramtype toolset: ~azure.ai.projects.models.AsyncToolSet
:keyword temperature: What sampling temperature to use, between 0 and 2. Higher values like 0.8
will make the output
more random, while lower values like 0.2 will make it more focused and deterministic. Default
value is None.
:paramtype temperature: float
:keyword top_p: An alternative to sampling with temperature, called nucleus sampling, where the
model
considers the results of the tokens with top_p probability mass. So 0.1 means only the tokens
comprising the top 10% probability mass are considered.
We generally recommend altering this or temperature but not both. Default value is None.
:paramtype top_p: float
:keyword max_prompt_tokens: The maximum number of prompt tokens that may be used over the
course of the run. The run will make a best effort to use only
the number of prompt tokens specified, across multiple turns of the run. If the run exceeds
the number of prompt tokens specified,
the run will end with status ``incomplete``. See ``incomplete_details`` for more info. Default
value is None.
:paramtype max_prompt_tokens: int
:keyword max_completion_tokens: The maximum number of completion tokens that may be used over
the course of the run. The run will make a best effort
to use only the number of completion tokens specified, across multiple turns of the run. If
the run exceeds the number of
completion tokens specified, the run will end with status ``incomplete``. See
``incomplete_details`` for more info. Default value is None.
:paramtype max_completion_tokens: int
:keyword truncation_strategy: The strategy to use for dropping messages as the context windows
moves forward. Default value is None.
:paramtype truncation_strategy: ~azure.ai.projects.models.TruncationObject
:keyword tool_choice: Controls whether or not and which tool is called by the model. Is one of
the following types: str, Union[str, "_models.AgentsApiToolChoiceOptionMode"],
AgentsNamedToolChoice Default value is None.
:paramtype tool_choice: str or str or
~azure.ai.projects.models.AgentsApiToolChoiceOptionMode or
~azure.ai.projects.models.AgentsNamedToolChoice
:keyword response_format: Specifies the format that the model must output. Is one of the
following types: str, Union[str, "_models.AgentsApiResponseFormatMode"],
AgentsApiResponseFormat Default value is None.
:paramtype response_format: str or str or
~azure.ai.projects.models.AgentsApiResponseFormatMode or
~azure.ai.projects.models.AgentsApiResponseFormat
:keyword parallel_tool_calls: If ``true`` functions will run in parallel during tool use.
Default value is None.
:paramtype parallel_tool_calls: bool
:keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
for storing additional information about that object in a structured format. Keys may be up to
64 characters in length and values may be up to 512 characters in length. Default value is
None.
:paramtype metadata: dict[str, str]
:keyword sleep_interval: The time in seconds to wait between polling the service for run status.
Default value is 1.
:paramtype sleep_interval: int
:return: AgentRunStream. AgentRunStream is compatible with Iterable and supports streaming.
:rtype: ~azure.ai.projects.models.AsyncAgentRunStream
:raises ~azure.core.exceptions.HttpResponseError:
"""
# Create and initiate the run with additional parameters
run = await self.create_run(
thread_id=thread_id,
assistant_id=assistant_id,
include=include,
model=model,
instructions=instructions,
additional_instructions=additional_instructions,
additional_messages=additional_messages,
tools=toolset.definitions if toolset else None,
temperature=temperature,
top_p=top_p,
max_prompt_tokens=max_prompt_tokens,
max_completion_tokens=max_completion_tokens,
truncation_strategy=truncation_strategy,
tool_choice=tool_choice,
response_format=response_format,
parallel_tool_calls=parallel_tool_calls,
metadata=metadata,
**kwargs,
)
# Monitor and process the run status
while run.status in [
RunStatus.QUEUED,
RunStatus.IN_PROGRESS,
RunStatus.REQUIRES_ACTION,
]:
time.sleep(sleep_interval)
run = await self.get_run(thread_id=thread_id, run_id=run.id)
if run.status == "requires_action" and isinstance(run.required_action, _models.SubmitToolOutputsAction):
tool_calls = run.required_action.submit_tool_outputs.tool_calls
if not tool_calls:
logging.warning("No tool calls provided - cancelling run")
await self.cancel_run(thread_id=thread_id, run_id=run.id)
break
# We need tool set only if we are executing local function. In case if
# the tool is azure_function we just need to wait when it will be finished.
if any(tool_call.type == "function" for tool_call in tool_calls):
toolset = toolset or self._toolset.get(run.assistant_id)
if toolset:
tool_outputs = await toolset.execute_tool_calls(tool_calls)
else:
raise ValueError("Toolset is not available in the client.")
logging.info("Tool outputs: %s", tool_outputs)
if tool_outputs:
await self.submit_tool_outputs_to_run(
thread_id=thread_id, run_id=run.id, tool_outputs=tool_outputs
)
logging.info("Current run status: %s", run.status)
return run
@overload
async def create_stream(
self,
thread_id: str,
*,
assistant_id: str,
include: Optional[List[Union[str, _models.RunAdditionalFieldList]]] = None,
content_type: str = "application/json",
model: Optional[str] = None,
instructions: Optional[str] = None,
additional_instructions: Optional[str] = None,
additional_messages: Optional[List[_models.ThreadMessageOptions]] = None,
tools: Optional[List[_models.ToolDefinition]] = None,
temperature: Optional[float] = None,
top_p: Optional[float] = None,
max_prompt_tokens: Optional[int] = None,
max_completion_tokens: Optional[int] = None,
truncation_strategy: Optional[_models.TruncationObject] = None,
tool_choice: Optional["_types.AgentsApiToolChoiceOption"] = None,
response_format: Optional["_types.AgentsApiResponseFormatOption"] = None,
parallel_tool_calls: Optional[bool] = None,
metadata: Optional[Dict[str, str]] = None,
event_handler: None = None,
**kwargs: Any,
) -> _models.AsyncAgentRunStream[_models.AsyncAgentEventHandler]:
"""Creates a new stream for an agent thread.
:param thread_id: Required.
:type thread_id: str
:keyword assistant_id: The ID of the agent that should run the thread. Required.
:paramtype assistant_id: str
:keyword include: A list of additional fields to include in the response.
Currently the only supported value is
``step_details.tool_calls[*].file_search.results[*].content`` to fetch the file search result
content. Default value is None.
:paramtype include: list[str or ~azure.ai.projects.models.RunAdditionalFieldList]
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:keyword model: The overridden model name that the agent should use to run the thread. Default
value is None.
:paramtype model: str
:keyword instructions: The overridden system instructions that the agent should use to run the
thread. Default value is None.
:paramtype instructions: str
:keyword additional_instructions: Additional instructions to append at the end of the
instructions for the run. This is useful for modifying the behavior
on a per-run basis without overriding other instructions. Default value is None.
:paramtype additional_instructions: str
:keyword additional_messages: Adds additional messages to the thread before creating the run.
Default value is None.
:paramtype additional_messages: list[~azure.ai.projects.models.ThreadMessageOptions]
:keyword tools: The overridden list of enabled tools that the agent should use to run the
thread. Default value is None.
:paramtype tools: list[~azure.ai.projects.models.ToolDefinition]
:keyword temperature: What sampling temperature to use, between 0 and 2. Higher values like 0.8
will make the output
more random, while lower values like 0.2 will make it more focused and deterministic. Default
value is None.
:paramtype temperature: float
:keyword top_p: An alternative to sampling with temperature, called nucleus sampling, where the
model
considers the results of the tokens with top_p probability mass. So 0.1 means only the tokens
comprising the top 10% probability mass are considered.
We generally recommend altering this or temperature but not both. Default value is None.
:paramtype top_p: float
:keyword max_prompt_tokens: The maximum number of prompt tokens that may be used over the
course of the run. The run will make a best effort to use only
the number of prompt tokens specified, across multiple turns of the run. If the run exceeds
the number of prompt tokens specified,
the run will end with status ``incomplete``. See ``incomplete_details`` for more info. Default
value is None.
:paramtype max_prompt_tokens: int
:keyword max_completion_tokens: The maximum number of completion tokens that may be used over
the course of the run. The run will make a best effort
to use only the number of completion tokens specified, across multiple turns of the run. If
the run exceeds the number of
completion tokens specified, the run will end with status ``incomplete``. See
``incomplete_details`` for more info. Default value is None.
:paramtype max_completion_tokens: int
:keyword truncation_strategy: The strategy to use for dropping messages as the context windows
moves forward. Default value is None.
:paramtype truncation_strategy: ~azure.ai.projects.models.TruncationObject
:keyword tool_choice: Controls whether or not and which tool is called by the model. Is one of
the following types: str, Union[str, "_models.AgentsApiToolChoiceOptionMode"],
AgentsNamedToolChoice Default value is None.
:paramtype tool_choice: str or str or ~azure.ai.projects.models.AgentsApiToolChoiceOptionMode or
~azure.ai.projects.models.AgentsNamedToolChoice
:keyword response_format: Specifies the format that the model must output. Is one of the
following types: str, Union[str, "_models.AgentsApiResponseFormatMode"],
AgentsApiResponseFormat Default value is None.
:paramtype response_format: str or str or ~azure.ai.projects.models.AgentsApiResponseFormatMode
or ~azure.ai.projects.models.AgentsApiResponseFormat
:keyword parallel_tool_calls: If ``true`` functions will run in parallel during tool use.
Default value is None.
:paramtype parallel_tool_calls: bool
:keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
for storing additional information about that object in a structured format. Keys may be up to
64 characters in length and values may be up to 512 characters in length. Default value is
None.
:paramtype metadata: dict[str, str]
:keyword event_handler: None
:paramtype event_handler: None. _models.AsyncAgentEventHandler will be applied as default.
:return: AgentRunStream. AgentRunStream is compatible with Iterable and supports streaming.
:rtype: ~azure.ai.projects.models.AsyncAgentRunStream
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
async def create_stream(
self,
thread_id: str,
*,
assistant_id: str,
include: Optional[List[Union[str, _models.RunAdditionalFieldList]]] = None,
content_type: str = "application/json",
model: Optional[str] = None,
instructions: Optional[str] = None,
additional_instructions: Optional[str] = None,
additional_messages: Optional[List[_models.ThreadMessageOptions]] = None,
tools: Optional[List[_models.ToolDefinition]] = None,
temperature: Optional[float] = None,
top_p: Optional[float] = None,
max_prompt_tokens: Optional[int] = None,
max_completion_tokens: Optional[int] = None,
truncation_strategy: Optional[_models.TruncationObject] = None,
tool_choice: Optional["_types.AgentsApiToolChoiceOption"] = None,
response_format: Optional["_types.AgentsApiResponseFormatOption"] = None,
parallel_tool_calls: Optional[bool] = None,
metadata: Optional[Dict[str, str]] = None,
event_handler: _models.BaseAsyncAgentEventHandlerT,
**kwargs: Any,
) -> _models.AsyncAgentRunStream[_models.BaseAsyncAgentEventHandlerT]:
"""Creates a new stream for an agent thread.
:param thread_id: Required.
:type thread_id: str
:keyword assistant_id: The ID of the agent that should run the thread. Required.
:paramtype assistant_id: str
:keyword include: A list of additional fields to include in the response.
Currently the only supported value is
``step_details.tool_calls[*].file_search.results[*].content`` to fetch the file search result
content. Default value is None.
:paramtype include: list[str or ~azure.ai.projects.models.RunAdditionalFieldList]
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:keyword model: The overridden model name that the agent should use to run the thread. Default
value is None.
:paramtype model: str
:keyword instructions: The overridden system instructions that the agent should use to run the
thread. Default value is None.
:paramtype instructions: str
:keyword additional_instructions: Additional instructions to append at the end of the
instructions for the run. This is useful for modifying the behavior
on a per-run basis without overriding other instructions. Default value is None.
:paramtype additional_instructions: str
:keyword additional_messages: Adds additional messages to the thread before creating the run.
Default value is None.
:paramtype additional_messages: list[~azure.ai.projects.models.ThreadMessageOptions]
:keyword tools: The overridden list of enabled tools that the agent should use to run the
thread. Default value is None.
:paramtype tools: list[~azure.ai.projects.models.ToolDefinition]
:keyword temperature: What sampling temperature to use, between 0 and 2. Higher values like 0.8
will make the output
more random, while lower values like 0.2 will make it more focused and deterministic. Default
value is None.
:paramtype temperature: float
:keyword top_p: An alternative to sampling with temperature, called nucleus sampling, where the
model
considers the results of the tokens with top_p probability mass. So 0.1 means only the tokens
comprising the top 10% probability mass are considered.
We generally recommend altering this or temperature but not both. Default value is None.
:paramtype top_p: float
:keyword max_prompt_tokens: The maximum number of prompt tokens that may be used over the
course of the run. The run will make a best effort to use only
the number of prompt tokens specified, across multiple turns of the run. If the run exceeds
the number of prompt tokens specified,
the run will end with status ``incomplete``. See ``incomplete_details`` for more info. Default
value is None.
:paramtype max_prompt_tokens: int
:keyword max_completion_tokens: The maximum number of completion tokens that may be used over
the course of the run. The run will make a best effort
to use only the number of completion tokens specified, across multiple turns of the run. If
the run exceeds the number of
completion tokens specified, the run will end with status ``incomplete``. See
``incomplete_details`` for more info. Default value is None.
:paramtype max_completion_tokens: int
:keyword truncation_strategy: The strategy to use for dropping messages as the context windows
moves forward. Default value is None.
:paramtype truncation_strategy: ~azure.ai.projects.models.TruncationObject
:keyword tool_choice: Controls whether or not and which tool is called by the model. Is one of
the following types: str, Union[str, "_models.AgentsApiToolChoiceOptionMode"],
AgentsNamedToolChoice Default value is None.
:paramtype tool_choice: str or str or ~azure.ai.projects.models.AgentsApiToolChoiceOptionMode or
~azure.ai.projects.models.AgentsNamedToolChoice
:keyword response_format: Specifies the format that the model must output. Is one of the
following types: str, Union[str, "_models.AgentsApiResponseFormatMode"],
AgentsApiResponseFormat Default value is None.
:paramtype response_format: str or str or ~azure.ai.projects.models.AgentsApiResponseFormatMode
or ~azure.ai.projects.models.AgentsApiResponseFormat
:keyword parallel_tool_calls: If ``true`` functions will run in parallel during tool use.
Default value is None.
:paramtype parallel_tool_calls: bool
:keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
for storing additional information about that object in a structured format. Keys may be up to
64 characters in length and values may be up to 512 characters in length. Default value is
None.
:paramtype metadata: dict[str, str]
:keyword event_handler: The event handler to use for processing events during the run.
:paramtype event_handler: ~azure.ai.projects.models.AsyncAgentEventHandler
:return: AgentRunStream. AgentRunStream is compatible with Iterable and supports streaming.
:rtype: ~azure.ai.projects.models.AsyncAgentRunStream
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
async def create_stream(
self,
thread_id: str,
body: Union[JSON, IO[bytes]],
*,
include: Optional[List[Union[str, _models.RunAdditionalFieldList]]] = None,
event_handler: None = None,
content_type: str = "application/json",
**kwargs: Any,
) -> _models.AsyncAgentRunStream[_models.AsyncAgentEventHandler]:
"""Creates a new run for an agent thread.
Terminating when the Run enters a terminal state with a `data: [DONE]` message.
:param thread_id: Required.
:type thread_id: str
:param body: Required.
:type body: IO[bytes]
:keyword include: A list of additional fields to include in the response.
Currently the only supported value is
``step_details.tool_calls[*].file_search.results[*].content`` to fetch the file search result
content. Default value is None.
:paramtype include: list[str or ~azure.ai.projects.models.RunAdditionalFieldList]
:keyword event_handler: None
:paramtype event_handler: None. _models.AsyncAgentEventHandler will be applied as default.
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: AgentRunStream. AgentRunStream is compatible with Iterable and supports streaming.
:rtype: ~azure.ai.projects.models.AsyncAgentRunStream
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
async def create_stream(
self,
thread_id: str,
body: Union[JSON, IO[bytes]],
*,
include: Optional[List[Union[str, _models.RunAdditionalFieldList]]] = None,
event_handler: _models.BaseAsyncAgentEventHandlerT,
content_type: str = "application/json",
**kwargs: Any,
) -> _models.AsyncAgentRunStream[_models.BaseAsyncAgentEventHandlerT]:
"""Creates a new run for an agent thread.
Terminating when the Run enters a terminal state with a `data: [DONE]` message.
:param thread_id: Required.
:type thread_id: str
:param body: Required.
:type body: IO[bytes]
:keyword include: A list of additional fields to include in the response.
Currently the only supported value is
``step_details.tool_calls[*].file_search.results[*].content`` to fetch the file search result
content. Default value is None.
:paramtype include: list[str or ~azure.ai.projects.models.RunAdditionalFieldList]
:keyword event_handler: The event handler to use for processing events during the run. Default
value is None.
:paramtype event_handler: ~azure.ai.projects.models.AsyncAgentEventHandler
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: AgentRunStream. AgentRunStream is compatible with Iterable and supports streaming.
:rtype: ~azure.ai.projects.models.AsyncAgentRunStream
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace_async
async def create_stream( # pyright: ignore[reportInconsistentOverload]
self,
thread_id: str,
body: Union[JSON, IO[bytes]] = _Unset,
*,
include: Optional[List[Union[str, _models.RunAdditionalFieldList]]] = None,
assistant_id: str = _Unset,
model: Optional[str] = None,
instructions: Optional[str] = None,
additional_instructions: Optional[str] = None,
additional_messages: Optional[List[_models.ThreadMessageOptions]] = None,
tools: Optional[List[_models.ToolDefinition]] = None,
temperature: Optional[float] = None,
top_p: Optional[float] = None,
max_prompt_tokens: Optional[int] = None,
max_completion_tokens: Optional[int] = None,
truncation_strategy: Optional[_models.TruncationObject] = None,
tool_choice: Optional["_types.AgentsApiToolChoiceOption"] = None,
response_format: Optional["_types.AgentsApiResponseFormatOption"] = None,
parallel_tool_calls: Optional[bool] = None,
metadata: Optional[Dict[str, str]] = None,
event_handler: Optional[_models.BaseAsyncAgentEventHandlerT] = None,
**kwargs: Any,
) -> _models.AsyncAgentRunStream[_models.BaseAsyncAgentEventHandlerT]:
"""Creates a new run for an agent thread.
Terminating when the Run enters a terminal state with a `data: [DONE]` message.
:param thread_id: Required.
:type thread_id: str
:param body: Is either a JSON type or a IO[bytes] type. Required.
:type body: JSON or IO[bytes]
:keyword include: A list of additional fields to include in the response.
Currently the only supported value is
``step_details.tool_calls[*].file_search.results[*].content`` to fetch the file search result
content. Default value is None.
:paramtype include: list[str or ~azure.ai.projects.models.RunAdditionalFieldList]
:keyword assistant_id: The ID of the agent that should run the thread. Required.
:paramtype assistant_id: str
:keyword model: The overridden model name that the agent should use to run the thread. Default
value is None.
:paramtype model: str
:keyword instructions: The overridden system instructions that the agent should use to run the
thread. Default value is None.
:paramtype instructions: str
:keyword additional_instructions: Additional instructions to append at the end of the
instructions for the run. This is useful for modifying the behavior
on a per-run basis without overriding other instructions. Default value is None.
:paramtype additional_instructions: str
:keyword additional_messages: Adds additional messages to the thread before creating the run.
Default value is None.
:paramtype additional_messages: list[~azure.ai.projects.models.ThreadMessageOptions]
:keyword tools: The overridden list of enabled tools that the agent should use to run the
thread. Default value is None.
:paramtype tools: list[~azure.ai.projects.models.ToolDefinition]
:keyword temperature: What sampling temperature to use, between 0 and 2. Higher values like 0.8
will make the output
more random, while lower values like 0.2 will make it more focused and deterministic. Default
value is None.
:paramtype temperature: float
:keyword top_p: An alternative to sampling with temperature, called nucleus sampling, where the
model
considers the results of the tokens with top_p probability mass. So 0.1 means only the tokens
comprising the top 10% probability mass are considered.
We generally recommend altering this or temperature but not both. Default value is None.
:paramtype top_p: float
:keyword max_prompt_tokens: The maximum number of prompt tokens that may be used over the
course of the run. The run will make a best effort to use only
the number of prompt tokens specified, across multiple turns of the run. If the run exceeds
the number of prompt tokens specified,
the run will end with status ``incomplete``. See ``incomplete_details`` for more info. Default
value is None.
:paramtype max_prompt_tokens: int
:keyword max_completion_tokens: The maximum number of completion tokens that may be used over
the course of the run. The run will make a best effort
to use only the number of completion tokens specified, across multiple turns of the run. If
the run exceeds the number of
completion tokens specified, the run will end with status ``incomplete``. See
``incomplete_details`` for more info. Default value is None.
:paramtype max_completion_tokens: int
:keyword truncation_strategy: The strategy to use for dropping messages as the context windows
moves forward. Default value is None.
:paramtype truncation_strategy: ~azure.ai.projects.models.TruncationObject
:keyword tool_choice: Controls whether or not and which tool is called by the model. Is one of
the following types: str, Union[str, "_models.AgentsApiToolChoiceOptionMode"],
AgentsNamedToolChoice Default value is None.
:paramtype tool_choice: str or str or ~azure.ai.projects.models.AgentsApiToolChoiceOptionMode or
~azure.ai.projects.models.AgentsNamedToolChoice
:keyword response_format: Specifies the format that the model must output. Is one of the
following types: str, Union[str, "_models.AgentsApiResponseFormatMode"],
AgentsApiResponseFormat Default value is None.
:paramtype response_format: str or str or ~azure.ai.projects.models.AgentsApiResponseFormatMode
or ~azure.ai.projects.models.AgentsApiResponseFormat
:keyword parallel_tool_calls: If ``true`` functions will run in parallel during tool use.
Default value is None.
:paramtype parallel_tool_calls: bool
:keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
for storing additional information about that object in a structured format. Keys may be up to
64 characters in length and values may be up to 512 characters in length. Default value is
None.
:paramtype metadata: dict[str, str]
:keyword event_handler: The event handler to use for processing events during the run. Default
value is None.
:paramtype event_handler: ~azure.ai.projects.models.AsyncAgentEventHandler
:return: AgentRunStream. AgentRunStream is compatible with Iterable and supports streaming.
:rtype: ~azure.ai.projects.models.AsyncAgentRunStream
:raises ~azure.core.exceptions.HttpResponseError:
"""
if isinstance(body, dict): # Handle overload with JSON body.
content_type = kwargs.get("content_type", "application/json")
response = super().create_run(thread_id, body, include=include, content_type=content_type, **kwargs)
elif assistant_id is not _Unset: # Handle overload with keyword arguments.
response = super().create_run(
thread_id,
assistant_id=assistant_id,
include=include,
model=model,
instructions=instructions,
additional_instructions=additional_instructions,
additional_messages=additional_messages,
tools=tools,
stream_parameter=True,
stream=True,
temperature=temperature,
top_p=top_p,
max_prompt_tokens=max_prompt_tokens,
max_completion_tokens=max_completion_tokens,
truncation_strategy=truncation_strategy,
tool_choice=tool_choice,
response_format=response_format,
parallel_tool_calls=parallel_tool_calls,
metadata=metadata,
**kwargs,
)
elif isinstance(body, io.IOBase): # Handle overload with binary body.
content_type = kwargs.get("content_type", "application/json")
response = super().create_run(thread_id, body, include=include, content_type=content_type, **kwargs)
else:
raise ValueError("Invalid combination of arguments provided.")
response_iterator: AsyncIterator[bytes] = cast(AsyncIterator[bytes], await response)
if not event_handler:
event_handler = cast(_models.BaseAsyncAgentEventHandlerT, _models.AsyncAgentEventHandler())
return _models.AsyncAgentRunStream(response_iterator, self._handle_submit_tool_outputs, event_handler)
# pylint: disable=arguments-differ
@overload
async def submit_tool_outputs_to_run( # pylint: disable=arguments-differ
self,
thread_id: str,
run_id: str,
*,
tool_outputs: List[_models.ToolOutput],
content_type: str = "application/json",
**kwargs: Any,
) -> _models.ThreadRun:
"""Submits outputs from tools as requested by tool calls in a run. Runs that need submitted tool
outputs will have a status of 'requires_action' with a required_action.type of
'submit_tool_outputs'.
:param thread_id: Required.
:type thread_id: str
:param run_id: Required.
:type run_id: str
:keyword tool_outputs: Required.
:paramtype tool_outputs: list[~azure.ai.projects.models.ToolOutput]
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: ThreadRun. The ThreadRun is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.ThreadRun
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
async def submit_tool_outputs_to_run(
self, thread_id: str, run_id: str, body: JSON, *, content_type: str = "application/json", **kwargs: Any
) -> _models.ThreadRun:
"""Submits outputs from tools as requested by tool calls in a run. Runs that need submitted tool
outputs will have a status of 'requires_action' with a required_action.type of
'submit_tool_outputs'.
:param thread_id: Required.
:type thread_id: str
:param run_id: Required.
:type run_id: str
:param body: Required.
:type body: JSON
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:return: ThreadRun. The ThreadRun is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.ThreadRun
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
async def submit_tool_outputs_to_run(
self, thread_id: str, run_id: str, body: IO[bytes], *, content_type: str = "application/json", **kwargs: Any
) -> _models.ThreadRun:
"""Submits outputs from tools as requested by tool calls in a run. Runs that need submitted tool
outputs will have a status of 'requires_action' with a required_action.type of
'submit_tool_outputs'.
:param thread_id: Required.
:type thread_id: str
:param run_id: Required.
:type run_id: str
:param body: Required.
:type body: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:return: ThreadRun. The ThreadRun is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.ThreadRun
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
async def submit_tool_outputs_to_stream(
self,
thread_id: str,
run_id: str,
body: Union[JSON, IO[bytes]],
*,
event_handler: _models.BaseAsyncAgentEventHandler,
content_type: str = "application/json",
**kwargs: Any,
) -> None:
"""Submits outputs from tools as requested by tool calls in a stream. Runs that need submitted tool
outputs will have a status of 'requires_action' with a required_action.type of
'submit_tool_outputs'. terminating when the Run enters a terminal state with a ``data: [DONE]`` message.
:param thread_id: Required.
:type thread_id: str
:param run_id: Required.
:type run_id: str
:param body: Is either a JSON type or a IO[bytes] type. Required.
:type body: JSON or IO[bytes]
:keyword event_handler: The event handler to use for processing events during the run. Default
value is None.
:paramtype event_handler: ~azure.ai.projects.models.AsyncAgentEventHandler
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
async def submit_tool_outputs_to_stream(
self,
thread_id: str,
run_id: str,
*,
tool_outputs: List[_models.ToolOutput],
content_type: str = "application/json",
event_handler: _models.BaseAsyncAgentEventHandler,
**kwargs: Any,
) -> None:
"""Submits outputs from tools as requested by tool calls in a stream. Runs that need submitted tool
outputs will have a status of 'requires_action' with a required_action.type of
'submit_tool_outputs'. terminating when the Run enters a terminal state with a ``data: [DONE]`` message.
:param thread_id: Required.
:type thread_id: str
:param run_id: Required.
:type run_id: str
:keyword tool_outputs: Required.
:paramtype tool_outputs: list[~azure.ai.projects.models.ToolOutput]
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:keyword event_handler: The event handler to use for processing events during the run.
:paramtype event_handler: ~azure.ai.projects.models.AsyncAgentEventHandler
:raises ~azure.core.exceptions.HttpResponseError:
"""
async def _handle_submit_tool_outputs(
self, run: _models.ThreadRun, event_handler: _models.BaseAsyncAgentEventHandler
) -> None:
if isinstance(run.required_action, _models.SubmitToolOutputsAction):
tool_calls = run.required_action.submit_tool_outputs.tool_calls
if not tool_calls:
logger.debug("No tool calls to execute.")
return
# We need tool set only if we are executing local function. In case if
# the tool is azure_function we just need to wait when it will be finished.
if any(tool_call.type == "function" for tool_call in tool_calls):
toolset = self._toolset.get(run.assistant_id)
if toolset:
tool_outputs = await toolset.execute_tool_calls(tool_calls)
else:
logger.debug("Toolset is not available in the client.")
return
logger.info("Tool outputs: %s", tool_outputs)
if tool_outputs:
await self.submit_tool_outputs_to_stream(
thread_id=run.thread_id, run_id=run.id, tool_outputs=tool_outputs, event_handler=event_handler
)
# pylint: disable=arguments-differ
@overload
async def upload_file( # pylint: disable=arguments-differ
self, *, file_path: str, purpose: Union[str, _models.FilePurpose], **kwargs: Any
) -> _models.OpenAIFile:
"""Uploads a file for use by other operations.
:keyword file_path: Required.
:type file_path: str
:keyword purpose: Known values are: "fine-tune", "fine-tune-results", "assistants",
"assistants_output", "batch", "batch_output", and "vision". Required.
:paramtype purpose: str or ~azure.ai.projects.models.FilePurpose
:return: OpenAIFile. The OpenAIFile is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.OpenAIFile
:raises ~azure.core.exceptions.HttpResponseError:
"""
# pylint: disable=arguments-differ
@overload
async def upload_file( # pylint: disable=arguments-differ
self, *, file: FileType, purpose: Union[str, _models.FilePurpose], filename: Optional[str] = None, **kwargs: Any
) -> _models.OpenAIFile:
"""Uploads a file for use by other operations.
:keyword file: Required.
:paramtype file: ~azure.ai.projects._vendor.FileType
:keyword purpose: Known values are: "fine-tune", "fine-tune-results", "assistants",
"assistants_output", "batch", "batch_output", and "vision". Required.
:paramtype purpose: str or ~azure.ai.projects.models.FilePurpose
:keyword filename: Default value is None.
:paramtype filename: str
:return: OpenAIFile. The OpenAIFile is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.OpenAIFile
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
async def upload_file(self, body: JSON, **kwargs: Any) -> _models.OpenAIFile:
"""Uploads a file for use by other operations.
:param body: Required.
:type body: JSON
:return: OpenAIFile. The OpenAIFile is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.OpenAIFile
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace_async
async def upload_file(
self,
body: Optional[JSON] = None,
*,
file: Optional[FileType] = None,
file_path: Optional[str] = None,
purpose: Union[str, _models.FilePurpose, None] = None,
filename: Optional[str] = None,
**kwargs: Any,
) -> _models.OpenAIFile:
"""
Uploads a file for use by other operations, delegating to the generated operations.
:param body: JSON. Required if `file` and `purpose` are not provided.
:type body: Optional[JSON]
:keyword file: File content. Required if `body` and `purpose` are not provided.
:paramtype file: Optional[FileType]
:keyword file_path: Path to the file. Required if `body` and `purpose` are not provided.
:paramtype file_path: Optional[str]
:keyword purpose: Known values are: "fine-tune", "fine-tune-results", "assistants",
"assistants_output", "batch", "batch_output", and "vision". Required if `body` and `file` are not provided.
:paramtype purpose: Union[str, _models.FilePurpose, None]
:keyword filename: The name of the file.
:paramtype filename: Optional[str]
:return: OpenAIFile. The OpenAIFile is compatible with MutableMapping
:rtype: _models.OpenAIFile
:raises FileNotFoundError: If the file_path is invalid.
:raises IOError: If there are issues with reading the file.
:raises: HttpResponseError for HTTP errors.
"""
if body is not None:
return await super().upload_file(body=body, **kwargs)
if isinstance(purpose, FilePurpose):
purpose = purpose.value
if file is not None and purpose is not None:
return await super().upload_file(file=file, purpose=purpose, filename=filename, **kwargs)
if file_path is not None and purpose is not None:
if not os.path.isfile(file_path):
raise FileNotFoundError(f"The file path provided does not exist: {file_path}")
try:
with open(file_path, "rb") as f:
content = f.read()
# Determine filename and create correct FileType
base_filename = filename or os.path.basename(file_path)
file_content: FileType = (base_filename, content)
return await super().upload_file(file=file_content, purpose=purpose, **kwargs)
except IOError as e:
raise IOError(f"Unable to read file: {file_path}.") from e
raise ValueError("Invalid parameters for upload_file. Please provide the necessary arguments.")
@overload
async def upload_file_and_poll(self, body: JSON, *, sleep_interval: float = 1, **kwargs: Any) -> _models.OpenAIFile:
"""Uploads a file for use by other operations.
:param body: Required.
:type body: JSON
:keyword sleep_interval: Time to wait before polling for the status of the uploaded file. Default value
is 1.
:paramtype sleep_interval: float
:return: OpenAIFile. The OpenAIFile is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.OpenAIFile
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
async def upload_file_and_poll(
self,
*,
file: FileType,
purpose: Union[str, _models.FilePurpose],
filename: Optional[str] = None,
sleep_interval: float = 1,
**kwargs: Any,
) -> _models.OpenAIFile:
"""Uploads a file for use by other operations.
:keyword file: Required.
:paramtype file: ~azure.ai.projects._vendor.FileType
:keyword purpose: Known values are: "fine-tune", "fine-tune-results", "assistants",
"assistants_output", "batch", "batch_output", and "vision". Required.
:paramtype purpose: str or ~azure.ai.projects.models.FilePurpose
:keyword filename: Default value is None.
:paramtype filename: str
:keyword sleep_interval: Time to wait before polling for the status of the uploaded file. Default value
is 1.
:paramtype sleep_interval: float
:return: OpenAIFile. The OpenAIFile is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.OpenAIFile
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
async def upload_file_and_poll(
self, *, file_path: str, purpose: Union[str, _models.FilePurpose], sleep_interval: float = 1, **kwargs: Any
) -> _models.OpenAIFile:
"""Uploads a file for use by other operations.
:keyword file_path: Required.
:type file_path: str
:keyword purpose: Known values are: "fine-tune", "fine-tune-results", "assistants",
"assistants_output", "batch", "batch_output", and "vision". Required.
:paramtype purpose: str or ~azure.ai.projects.models.FilePurpose
:keyword sleep_interval: Time to wait before polling for the status of the uploaded file. Default value
is 1.
:paramtype sleep_interval: float
:return: OpenAIFile. The OpenAIFile is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.OpenAIFile
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace_async
async def upload_file_and_poll(
self,
body: Optional[JSON] = None,
*,
file: Optional[FileType] = None,
file_path: Optional[str] = None,
purpose: Union[str, _models.FilePurpose, None] = None,
filename: Optional[str] = None,
sleep_interval: float = 1,
**kwargs: Any,
) -> _models.OpenAIFile:
"""
Uploads a file for use by other operations, delegating to the generated operations.
:param body: JSON. Required if `file` and `purpose` are not provided.
:type body: Optional[JSON]
:keyword file: File content. Required if `body` and `purpose` are not provided.
:paramtype file: Optional[FileType]
:keyword file_path: Path to the file. Required if `body` and `purpose` are not provided.
:paramtype file_path: Optional[str]
:keyword purpose: Known values are: "fine-tune", "fine-tune-results", "assistants",
"assistants_output", "batch", "batch_output", and "vision". Required if `body` and `file` are not provided.
:paramtype purpose: Union[str, _models.FilePurpose, None]
:keyword filename: The name of the file.
:paramtype filename: Optional[str]
:keyword sleep_interval: Time to wait before polling for the status of the uploaded file. Default value
is 1.
:paramtype sleep_interval: float
:return: OpenAIFile. The OpenAIFile is compatible with MutableMapping
:rtype: _models.OpenAIFile
:raises FileNotFoundError: If the file_path is invalid.
:raises IOError: If there are issues with reading the file.
:raises: HttpResponseError for HTTP errors.
"""
if body is not None:
uploaded_file = await self.upload_file(body=body, **kwargs)
elif file is not None and purpose is not None:
uploaded_file = await self.upload_file(file=file, purpose=purpose, filename=filename, **kwargs)
elif file_path is not None and purpose is not None:
uploaded_file = await self.upload_file(file_path=file_path, purpose=purpose, **kwargs)
else:
raise ValueError(
"Invalid parameters for upload_file_and_poll. Please provide either 'body', "
"or both 'file' and 'purpose', or both 'file_path' and 'purpose'."
)
while uploaded_file.status in ["uploaded", "pending", "running"]:
time.sleep(sleep_interval)
uploaded_file = await self.get_file(uploaded_file.id)
return uploaded_file
@overload
async def create_vector_store_and_poll(
self, body: JSON, *, content_type: str = "application/json", sleep_interval: float = 1, **kwargs: Any
) -> _models.VectorStore:
"""Creates a vector store and poll.
:param body: Required.
:type body: JSON
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:keyword sleep_interval: Time to wait before polling for the status of the vector store. Default value
is 1.
:paramtype sleep_interval: float
:return: VectorStore. The VectorStore is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.VectorStore
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
async def create_vector_store_and_poll(
self,
*,
content_type: str = "application/json",
file_ids: Optional[List[str]] = None,
name: Optional[str] = None,
data_sources: Optional[List[_models.VectorStoreDataSource]] = None,
expires_after: Optional[_models.VectorStoreExpirationPolicy] = None,
chunking_strategy: Optional[_models.VectorStoreChunkingStrategyRequest] = None,
metadata: Optional[Dict[str, str]] = None,
sleep_interval: float = 1,
**kwargs: Any,
) -> _models.VectorStore:
"""Creates a vector store and poll.
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:keyword file_ids: A list of file IDs that the vector store should use. Useful for tools like
``file_search`` that can access files. Default value is None.
:paramtype file_ids: list[str]
:keyword name: The name of the vector store. Default value is None.
:paramtype name: str
:keyword data_sources: List of Azure assets. Default value is None.
:paramtype data_sources: list[~azure.ai.projects.models.VectorStoreDataSource]
:keyword expires_after: Details on when this vector store expires. Default value is None.
:paramtype expires_after: ~azure.ai.projects.models.VectorStoreExpirationPolicy
:keyword chunking_strategy: The chunking strategy used to chunk the file(s). If not set, will
use the auto strategy. Only applicable if file_ids is non-empty. Default value is None.
:paramtype chunking_strategy: ~azure.ai.projects.models.VectorStoreChunkingStrategyRequest
:keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
for storing additional information about that object in a structured format. Keys may be up to
64 characters in length and values may be up to 512 characters in length. Default value is
None.
:paramtype metadata: dict[str, str]
:keyword sleep_interval: Time to wait before polling for the status of the vector store. Default value
is 1.
:paramtype sleep_interval: float
:return: VectorStore. The VectorStore is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.VectorStore
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
async def create_vector_store_and_poll(
self, body: IO[bytes], *, content_type: str = "application/json", sleep_interval: float = 1, **kwargs: Any
) -> _models.VectorStore:
"""Creates a vector store and poll.
:param body: Required.
:type body: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:keyword sleep_interval: Time to wait before polling for the status of the vector store. Default value
is 1.
:paramtype sleep_interval: float
:return: VectorStore. The VectorStore is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.VectorStore
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace_async
async def create_vector_store_and_poll(
self,
body: Union[JSON, IO[bytes]] = _Unset,
*,
content_type: str = "application/json",
file_ids: Optional[List[str]] = None,
name: Optional[str] = None,
data_sources: Optional[List[_models.VectorStoreDataSource]] = None,
expires_after: Optional[_models.VectorStoreExpirationPolicy] = None,
chunking_strategy: Optional[_models.VectorStoreChunkingStrategyRequest] = None,
metadata: Optional[Dict[str, str]] = None,
sleep_interval: float = 1,
**kwargs: Any,
) -> _models.VectorStore:
"""Creates a vector store and poll.
:param body: Is either a JSON type or a IO[bytes] type. Required.
:type body: JSON or IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:keyword file_ids: A list of file IDs that the vector store should use. Useful for tools like
``file_search`` that can access files. Default value is None.
:paramtype file_ids: list[str]
:keyword name: The name of the vector store. Default value is None.
:paramtype name: str
:keyword data_sources: List of Azure assets. Default value is None.
:paramtype data_sources: list[~azure.ai.projects.models.VectorStoreDataSource]
:keyword expires_after: Details on when this vector store expires. Default value is None.
:paramtype expires_after: ~azure.ai.projects.models.VectorStoreExpirationPolicy
:keyword chunking_strategy: The chunking strategy used to chunk the file(s). If not set, will
use the auto strategy. Only applicable if file_ids is non-empty. Default value is None.
:paramtype chunking_strategy: ~azure.ai.projects.models.VectorStoreChunkingStrategyRequest
:keyword metadata: A set of up to 16 key/value pairs that can be attached to an object, used
for storing additional information about that object in a structured format. Keys may be up to
64 characters in length and values may be up to 512 characters in length. Default value is
None.
:paramtype metadata: dict[str, str]
:keyword sleep_interval: Time to wait before polling for the status of the vector store. Default value
is 1.
:paramtype sleep_interval: float
:return: VectorStore. The VectorStore is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.VectorStore
:raises ~azure.core.exceptions.HttpResponseError:
"""
if body is not _Unset:
if isinstance(body, dict):
vector_store = await super().create_vector_store(
body=body, content_type=content_type or "application/json", **kwargs
)
elif isinstance(body, io.IOBase):
vector_store = await super().create_vector_store(body=body, content_type=content_type, **kwargs)
else:
raise ValueError("Invalid 'body' type: must be a dictionary (JSON) or a file-like object (IO[bytes]).")
else:
store_configuration = None
if data_sources:
store_configuration = _models.VectorStoreConfiguration(data_sources=data_sources)
vector_store = await super().create_vector_store(
file_ids=file_ids,
store_configuration=store_configuration,
name=name,
expires_after=expires_after,
chunking_strategy=chunking_strategy,
metadata=metadata,
**kwargs,
)
while vector_store.status == "in_progress":
time.sleep(sleep_interval)
vector_store = await super().get_vector_store(vector_store.id)
return vector_store
@overload
async def create_vector_store_file_batch_and_poll(
self,
vector_store_id: str,
body: JSON,
*,
content_type: str = "application/json",
sleep_interval: float = 1,
**kwargs: Any,
) -> _models.VectorStoreFileBatch:
"""Create a vector store file batch and poll.
:param vector_store_id: Identifier of the vector store. Required.
:type vector_store_id: str
:param body: Required.
:type body: JSON
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:keyword sleep_interval: Time to wait before polling for the status of the vector store. Default value
is 1.
:paramtype sleep_interval: float
:return: VectorStoreFileBatch. The VectorStoreFileBatch is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.VectorStoreFileBatch
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
async def create_vector_store_file_batch_and_poll(
self,
vector_store_id: str,
*,
file_ids: Optional[List[str]] = None,
data_sources: Optional[List[_models.VectorStoreDataSource]] = None,
content_type: str = "application/json",
chunking_strategy: Optional[_models.VectorStoreChunkingStrategyRequest] = None,
sleep_interval: float = 1,
**kwargs: Any,
) -> _models.VectorStoreFileBatch:
"""Create a vector store file batch and poll.
:param vector_store_id: Identifier of the vector store. Required.
:type vector_store_id: str
:keyword file_ids: List of file identifiers. Required.
:paramtype file_ids: list[str]
:keyword data_sources: List of Azure assets. Default value is None.
:paramtype data_sources: list[~azure.ai.projects.models.VectorStoreDataSource]
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:keyword chunking_strategy: The chunking strategy used to chunk the file(s). If not set, will
use the auto strategy. Default value is None.
:paramtype chunking_strategy: ~azure.ai.projects.models.VectorStoreChunkingStrategyRequest
:keyword sleep_interval: Time to wait before polling for the status of the vector store. Default value
is 1.
:paramtype sleep_interval: float
:return: VectorStoreFileBatch. The VectorStoreFileBatch is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.VectorStoreFileBatch
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
async def create_vector_store_file_batch_and_poll(
self,
vector_store_id: str,
body: IO[bytes],
*,
content_type: str = "application/json",
sleep_interval: float = 1,
**kwargs: Any,
) -> _models.VectorStoreFileBatch:
"""Create a vector store file batch and poll.
:param vector_store_id: Identifier of the vector store. Required.
:type vector_store_id: str
:param body: Required.
:type body: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:keyword sleep_interval: Time to wait before polling for the status of the vector store. Default value
is 1.
:paramtype sleep_interval: float
:return: VectorStoreFileBatch. The VectorStoreFileBatch is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.VectorStoreFileBatch
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace_async
async def create_vector_store_file_batch_and_poll(
self,
vector_store_id: str,
body: Union[JSON, IO[bytes]] = _Unset,
*,
file_ids: Optional[List[str]] = None,
data_sources: Optional[List[_models.VectorStoreDataSource]] = None,
chunking_strategy: Optional[_models.VectorStoreChunkingStrategyRequest] = None,
content_type: str = "application/json",
sleep_interval: float = 1,
**kwargs: Any,
) -> _models.VectorStoreFileBatch:
"""Create a vector store file batch and poll.
:param vector_store_id: Identifier of the vector store. Required.
:type vector_store_id: str
:param body: Is either a JSON type or a IO[bytes] type. Required.
:type body: JSON or IO[bytes]
:keyword file_ids: List of file identifiers. Required.
:paramtype file_ids: list[str]
:keyword data_sources: List of Azure assets. Default value is None.
:paramtype data_sources: list[~azure.ai.client.models.VectorStoreDataSource]
:keyword chunking_strategy: The chunking strategy used to chunk the file(s). If not set, will
use the auto strategy. Default value is None.
:paramtype chunking_strategy: ~azure.ai.projects.models.VectorStoreChunkingStrategyRequest
:keyword content_type: Body parameter content-type. Defaults to "application/json".
:paramtype content_type: str
:keyword sleep_interval: Time to wait before polling for the status of the vector store. Default value
is 1.
:paramtype sleep_interval: float
:return: VectorStoreFileBatch. The VectorStoreFileBatch is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.VectorStoreFileBatch
:raises ~azure.core.exceptions.HttpResponseError:
"""
if body is not _Unset:
if isinstance(body, dict):
vector_store_file_batch = await super().create_vector_store_file_batch(
vector_store_id=vector_store_id,
body=body,
content_type=content_type or "application/json",
**kwargs,
)
elif isinstance(body, io.IOBase):
vector_store_file_batch = await super().create_vector_store_file_batch(
vector_store_id=vector_store_id,
body=body,
content_type=content_type,
**kwargs,
)
else:
raise ValueError("Invalid type for 'body'. Must be a dict (JSON) or file-like (IO[bytes]).")
else:
vector_store_file_batch = await super().create_vector_store_file_batch(
vector_store_id=vector_store_id,
file_ids=file_ids,
data_sources=data_sources,
chunking_strategy=chunking_strategy,
**kwargs,
)
while vector_store_file_batch.status == "in_progress":
time.sleep(sleep_interval)
vector_store_file_batch = await super().get_vector_store_file_batch(
vector_store_id=vector_store_id, batch_id=vector_store_file_batch.id
)
return vector_store_file_batch
@overload
async def create_vector_store_file_and_poll(
self,
vector_store_id: str,
body: JSON,
*,
content_type: str = "application/json",
sleep_interval: float = 1,
**kwargs: Any,
) -> _models.VectorStoreFile:
"""Create a vector store file by attaching a file to a vector store.
:param vector_store_id: Identifier of the vector store. Required.
:type vector_store_id: str
:param body: Required.
:type body: JSON
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:keyword sleep_interval: Time to wait before polling for the status of the vector store. Default value
is 1.
:paramtype sleep_interval: float
:return: VectorStoreFile. The VectorStoreFile is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.VectorStoreFile
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
async def create_vector_store_file_and_poll(
self,
vector_store_id: str,
*,
content_type: str = "application/json",
file_id: Optional[str] = None,
data_source: Optional[_models.VectorStoreDataSource] = None,
chunking_strategy: Optional[_models.VectorStoreChunkingStrategyRequest] = None,
sleep_interval: float = 1,
**kwargs: Any,
) -> _models.VectorStoreFile:
"""Create a vector store file by attaching a file to a vector store.
:param vector_store_id: Identifier of the vector store. Required.
:type vector_store_id: str
:keyword content_type: Body Parameter content-type. Content type parameter for JSON body.
Default value is "application/json".
:paramtype content_type: str
:keyword file_id: Identifier of the file. Default value is None.
:paramtype file_id: str
:keyword data_source: Azure asset ID. Default value is None.
:paramtype data_source: ~azure.ai.projects.models.VectorStoreDataSource
:keyword chunking_strategy: The chunking strategy used to chunk the file(s). If not set, will
use the auto strategy. Default value is None.
:paramtype chunking_strategy: ~azure.ai.projects.models.VectorStoreChunkingStrategyRequest
:keyword sleep_interval: Time to wait before polling for the status of the vector store. Default value
is 1.
:paramtype sleep_interval: float
:return: VectorStoreFile. The VectorStoreFile is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.VectorStoreFile
:raises ~azure.core.exceptions.HttpResponseError:
"""
@overload
async def create_vector_store_file_and_poll(
self,
vector_store_id: str,
body: IO[bytes],
*,
content_type: str = "application/json",
sleep_interval: float = 1,
**kwargs: Any,
) -> _models.VectorStoreFile:
"""Create a vector store file by attaching a file to a vector store.
:param vector_store_id: Identifier of the vector store. Required.
:type vector_store_id: str
:param body: Required.
:type body: IO[bytes]
:keyword content_type: Body Parameter content-type. Content type parameter for binary body.
Default value is "application/json".
:paramtype content_type: str
:keyword sleep_interval: Time to wait before polling for the status of the vector store. Default value
is 1.
:paramtype sleep_interval: float
:return: VectorStoreFile. The VectorStoreFile is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.VectorStoreFile
:raises ~azure.core.exceptions.HttpResponseError:
"""
[docs]
@distributed_trace_async
async def create_vector_store_file_and_poll(
self,
vector_store_id: str,
body: Union[JSON, IO[bytes]] = _Unset,
*,
content_type: str = "application/json",
file_id: Optional[str] = None,
data_source: Optional[_models.VectorStoreDataSource] = None,
chunking_strategy: Optional[_models.VectorStoreChunkingStrategyRequest] = None,
sleep_interval: float = 1,
**kwargs: Any,
) -> _models.VectorStoreFile:
"""Create a vector store file by attaching a file to a vector store.
:param vector_store_id: Identifier of the vector store. Required.
:type vector_store_id: str
:param body: Is either a JSON type or a IO[bytes] type. Required.
:type body: JSON or IO[bytes]
:keyword content_type: Body Parameter content-type. Defaults to 'application/json'.
:paramtype content_type: str
:keyword file_id: Identifier of the file. Default value is None.
:paramtype file_id: str
:keyword data_source: Azure asset ID. Default value is None.
:paramtype data_source: ~azure.ai.projects.models.VectorStoreDataSource
:keyword chunking_strategy: The chunking strategy used to chunk the file(s). If not set, will
use the auto strategy. Default value is None.
:paramtype chunking_strategy: ~azure.ai.projects.models.VectorStoreChunkingStrategyRequest
:keyword sleep_interval: Time to wait before polling for the status of the vector store. Default value
is 1.
:paramtype sleep_interval: float
:return: VectorStoreFile. The VectorStoreFile is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.VectorStoreFile
:raises ~azure.core.exceptions.HttpResponseError:
"""
if body is not _Unset:
if isinstance(body, dict):
vector_store_file = await super().create_vector_store_file(
vector_store_id=vector_store_id,
body=body,
content_type=content_type or "application/json",
**kwargs,
)
elif isinstance(body, io.IOBase):
vector_store_file = await super().create_vector_store_file(
vector_store_id=vector_store_id,
body=body,
content_type=content_type,
**kwargs,
)
else:
raise ValueError("Invalid type for 'body'. Must be a dict (JSON) or file-like object (IO[bytes]).")
else:
vector_store_file = await super().create_vector_store_file(
vector_store_id=vector_store_id,
file_id=file_id,
data_source=data_source,
chunking_strategy=chunking_strategy,
**kwargs,
)
while vector_store_file.status == "in_progress":
time.sleep(sleep_interval)
vector_store_file = await super().get_vector_store_file(
vector_store_id=vector_store_id, file_id=vector_store_file.id
)
return vector_store_file
[docs]
@distributed_trace_async
async def get_file_content(self, file_id: str, **kwargs: Any) -> AsyncIterator[bytes]:
"""
Asynchronously returns file content as a byte stream for the given file_id.
:param file_id: The ID of the file to retrieve. Required.
:type file_id: str
:return: An async iterator that yields bytes from the file content.
:rtype: AsyncIterator[bytes]
:raises ~azure.core.exceptions.HttpResponseError: If the HTTP request fails.
"""
kwargs["stream"] = True
response = await super()._get_file_content(file_id, **kwargs)
return cast(AsyncIterator[bytes], response)
[docs]
@distributed_trace_async
async def save_file(self, file_id: str, file_name: str, target_dir: Optional[Union[str, Path]] = None) -> None:
"""
Asynchronously saves file content retrieved using a file identifier to the specified local directory.
:param file_id: The unique identifier for the file to retrieve.
:type file_id: str
:param file_name: The name of the file to be saved.
:type file_name: str
:param target_dir: The directory where the file should be saved. Defaults to the current working directory.
:type target_dir: str or Path
:raises ValueError: If the target path is not a directory or the file name is invalid.
:raises RuntimeError: If file content retrieval fails or no content is found.
:raises TypeError: If retrieved chunks are not bytes-like objects.
:raises IOError: If writing to the file fails.
"""
try:
# Determine and validate the target directory
path = Path(target_dir).expanduser().resolve() if target_dir else Path.cwd()
path.mkdir(parents=True, exist_ok=True)
if not path.is_dir():
raise ValueError(f"The target path '{path}' is not a directory.")
# Sanitize and validate the file name
sanitized_file_name = Path(file_name).name
if not sanitized_file_name:
raise ValueError("The provided file name is invalid.")
# Retrieve the file content
file_content_stream = await self.get_file_content(file_id)
if not file_content_stream:
raise RuntimeError(f"No content retrievable for file ID '{file_id}'.")
# Collect all chunks asynchronously
chunks = []
async for chunk in file_content_stream:
if isinstance(chunk, (bytes, bytearray)):
chunks.append(chunk)
else:
raise TypeError(f"Expected bytes or bytearray, got {type(chunk).__name__}")
target_file_path = path / sanitized_file_name
# Write the collected content to the file synchronously
def write_file(collected_chunks: list):
with open(target_file_path, "wb") as file:
for chunk in collected_chunks:
file.write(chunk)
# Use the event loop to run the synchronous function in a thread executor
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, write_file, chunks)
logger.debug("File '%s' saved successfully at '%s'.", sanitized_file_name, target_file_path)
except (ValueError, RuntimeError, TypeError, IOError) as e:
logger.error("An error occurred in save_file: %s", e)
raise
[docs]
@distributed_trace_async
async def delete_agent(self, assistant_id: str, **kwargs: Any) -> _models.AgentDeletionStatus:
"""Deletes an agent.
:param assistant_id: Identifier of the agent. Required.
:type assistant_id: str
:return: AgentDeletionStatus. The AgentDeletionStatus is compatible with MutableMapping
:rtype: ~azure.ai.projects.models.AgentDeletionStatus
:raises ~azure.core.exceptions.HttpResponseError:
"""
if assistant_id in self._toolset:
del self._toolset[assistant_id]
return await super().delete_agent(assistant_id, **kwargs)
class _SyncCredentialWrapper(TokenCredential):
"""
The class, synchronizing AsyncTokenCredential.
:param async_credential: The async credential to be synchronized.
:type async_credential: ~azure.core.credentials_async.AsyncTokenCredential
"""
def __init__(self, async_credential: "AsyncTokenCredential"):
self._async_credential = async_credential
def get_token(
self,
*scopes: str,
claims: Optional[str] = None,
tenant_id: Optional[str] = None,
enable_cae: bool = False,
**kwargs: Any,
) -> "AccessToken":
pool = concurrent.futures.ThreadPoolExecutor()
return pool.submit(
asyncio.run,
self._async_credential.get_token(
*scopes,
claims=claims,
tenant_id=tenant_id,
enable_cae=enable_cae,
**kwargs,
),
).result()
__all__: List[str] = [
"AgentsOperations",
"ConnectionsOperations",
"TelemetryOperations",
"InferenceOperations",
] # Add all objects you want publicly available to users at this package level
def patch_sdk():
"""Do not remove from this file.
`patch_sdk` is a last resort escape hatch that allows you to do customizations
you can't accomplish using the techniques described in
https://aka.ms/azsdk/python/dpcodegen/python/customize
"""