Source code for azure.ai.resources.operations._single_deployment_operations

# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

import ast
import os
from pathlib import Path
import shutil
import tempfile
from typing import Any, Dict, List, Tuple, Union, Iterable, Optional
import uuid


from azure.ai.ml import MLClient
from azure.ai.ml.entities import ManagedOnlineDeployment, Model as AzureMLModel, DataCollector, DeploymentCollection, Environment, BuildContext
from azure.ai.ml.operations._operation_orchestrator import OperationOrchestrator
from azure.ai.ml._utils._endpoint_utils import upload_dependencies
from azure.core.tracing.decorator import distributed_trace
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.resource.resources.models import DeploymentMode
from azure.core.polling import LROPoller

from .._utils._scoring_script_utils import create_chat_scoring_script, create_mlmodel_file
from .._utils._registry_utils import get_registry_model
from .._utils._deployment_utils import get_default_allowed_instance_type_for_hugging_face, get_empty_deployment_arm_template
from ..entities.single_deployment import SingleDeployment
from ..entities.deployment_keys import DeploymentKeys
from ..entities.models import Model, PromptflowModel

from azure.ai.resources._telemetry import ActivityType, monitor_with_activity, monitor_with_telemetry_mixin, ActivityLogger

ops_logger = ActivityLogger(__name__)
logger, module_logger = ops_logger.package_logger, ops_logger.module_logger


[docs]class SingleDeploymentOperations: """Operations class for SingleDeployment objects You should not instantiate this class directly. Instead, you should create an AIClient instance that instantiates it for you and attaches it as an attribute. :param ml_client: The Azure Machine Learning client :type ml_client: ~azure.ai.ml.MLClient """ def __init__(self, ml_client: MLClient, connections, **kwargs) -> None: self._ml_client = ml_client self._connections = connections ops_logger.update_info(kwargs) self._resource_management_client = ResourceManagementClient(self._ml_client._credential, self._ml_client.subscription_id)
[docs] @distributed_trace @monitor_with_activity(logger, "SingleDeployment.BeginCreateOrUpdate", ActivityType.PUBLICAPI) def begin_create_or_update(self, deployment: SingleDeployment) -> LROPoller[SingleDeployment]: """Create or update a deployment. :param deployment: The deployment resource to create or update remotely. :type deployment: ~azure.ai.resources.entities.SingleDeployment :return: A poller for the long-running operation. :rtype: ~azure.core.polling.LROPoller[~azure.ai.resources.entities.SingleDeployment] """ model = deployment.model endpoint_name = deployment.endpoint_name if deployment.endpoint_name else deployment.name data_collector = None if deployment.data_collector_enabled: data_collector = DataCollector( collections={ "model_inputs": DeploymentCollection( enabled="true", ), "model_outputs": DeploymentCollection( enabled="true", ) }, sampling_rate=1, ) model = deployment.model temp_dir = tempfile.TemporaryDirectory() if isinstance(model, PromptflowModel): if not deployment.instance_type: deployment.instance_type = "Standard_DS3_v2" # Create dockerfile with open(f"{model.path}/Dockerfile", "w+") as f: base_image = "mcr.microsoft.com/azureml/promptflow/promptflow-runtime-stable:latest" if not model.base_image else model.base_image f.writelines([f"FROM {base_image}\n", "COPY ./* /\n", "RUN pip install -r requirements.txt\n"]) azureml_environment = Environment( build=BuildContext( path=model.path ), inference_config={ "liveness_route": {"path": "/health", "port": 8080}, "readiness_route": {"path": "/health", "port": 8080}, "scoring_route": {"path": "/score", "port": 8080}, }, is_anonymous=True, ) azureml_model = AzureMLModel(name=f"{deployment.name}-deployment-pf", path=model.path, type="custom_model") deployment_environment_variables = ( deployment.environment_variables if deployment.environment_variables else {} ) v2_deployment: ManagedOnlineDeployment = ManagedOnlineDeployment( # type: ignore[no-redef] name=deployment.name, endpoint_name=endpoint_name, model=azureml_model, environment=azureml_environment, instance_type=deployment.instance_type, instance_count=deployment.instance_count if not deployment.instance_count else 1, environment_variables={ "PROMPTFLOW_RUN_MODE": "serving", "PRT_CONFIG_OVERRIDE": f"deployment.subscription_id={self._ml_client.subscription_id},deployment.resource_group={self._ml_client.resource_group_name},deployment.workspace_name={self._ml_client.workspace_name},deployment.endpoint_name={endpoint_name},deployment.deployment_name={deployment.name}", **deployment_environment_variables, }, app_insights_enabled=deployment.app_insights_enabled, data_collector=data_collector, ) if isinstance(model, Model): if not deployment.instance_type: deployment.instance_type = "Standard_DS3_v2" if model.loader_module and model.chat_module: raise Exception("Only one of loader_module or chat_module for a model can be specified but not both.") azureml_environment = None scoring_script = None azureml_model = None if model.conda_file and model.loader_module: create_mlmodel_file(model) azureml_model = AzureMLModel(name=f"{deployment.name}-deployment-model", path=model.path, type="mlflow_model") if model.conda_file and model.chat_module: azureml_model = AzureMLModel(name=f"{deployment.name}-deployment-model", path=model.path, type="custom_model") model_dir_name = Path(model.path).resolve().name create_chat_scoring_script(temp_dir.name, model.chat_module, model_dir_name) azureml_environment = Environment( image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04", conda_file=str(Path(model.path) / model.conda_file), is_anonymous=True, ) scoring_script = "score.py" else: # validate that path has an mlmodel file and continue if "mlmodel" not in [path.lower() for path in os.listdir(model.path)]: raise Exception( "An MLModel file must be present in model directory if not" " specifying conda_file and one of loader_module or chat_module for deployment." ) v2_deployment: ManagedOnlineDeployment = ManagedOnlineDeployment( # type: ignore[no-redef] name=deployment.name, endpoint_name=endpoint_name, model=azureml_model, environment=azureml_environment, code_path=temp_dir.name if scoring_script else None, scoring_script=scoring_script, instance_type=deployment.instance_type, instance_count=1, environment_variables=deployment.environment_variables, app_insights_enabled=deployment.app_insights_enabled, data_collector=data_collector, ) if isinstance(model, str) and "registries" in model: model_details = get_registry_model( self._ml_client._credential, id=model, ) model_id = model if not deployment.instance_type: if "registries/HuggingFace" in model_details.id: default_instance_type, allowed_instance_types = get_default_allowed_instance_type_for_hugging_face( model_details, self._ml_client._credential ) default_instance_type = str(default_instance_type) # type: ignore[no-redef] self._check_default_instance_type_and_populate( default_instance_type, deployment, allowed_instance_types=allowed_instance_types ) if "registries/azureml/" in model_details.id: default_instance_type = model_details.properties["inference-recommended-sku"] allowed_instance_types = [] if "," in default_instance_type: allowed_instance_types = model_details.properties["inference-recommended-sku"].split(",") default_instance_type = allowed_instance_types[0] min_sku_spec = model_details.properties["inference-min-sku-spec"].split("|") self._check_default_instance_type_and_populate( default_instance_type, deployment, min_sku_spec=min_sku_spec ) if "registries/azureml-meta" in model_details.id: allowed_skus = ast.literal_eval(model_details.tags["inference_compute_allow_list"]) # check available quota for each sku in the allowed_sku list # pick the sku that has available quota and is the cheapest vm_sizes = self._ml_client.compute._vmsize_operations.list( location=self._ml_client.compute._get_workspace_location() ) # create mapping of allowed SKU to (SKU family, number of vCPUs, and cost per hour on linux) filtered_vm_sizes = [vm_size for vm_size in vm_sizes.value if vm_size.name in allowed_skus] sku_to_family_vcpu_cost_map: Dict[Any, Tuple[Any, Any, Optional[Any]]] = {} sku_families = [] for vm_size in filtered_vm_sizes: cost = None for vm_price in vm_size.estimated_vm_prices.values: if vm_price.os_type == "Linux" and vm_price.vm_tier == "Standard": cost = vm_price.retail_price sku_to_family_vcpu_cost_map[vm_size.name] = (vm_size.family, vm_size.v_cp_us, cost) sku_families.append(vm_size.family) # sort allowed skus by price and find the first vm that has enough quota sku_to_family_vcpu_cost_map = dict( sorted(sku_to_family_vcpu_cost_map.items(), key=lambda item: item[1][2]) # type: ignore ) # get usage info and filter it down to dedicated usage for each SKU family usage_info = self._ml_client.compute.list_usage() filtered_usage_info = { filtered_usage.name["value"]: filtered_usage for filtered_usage in [ usage for usage in usage_info if usage.name["value"] in sku_families and "Dedicated" in usage.name["localized_value"] ] } # loop over each sku and check if the family has enough cores available that will not # exceed family limit for sku_name, sku_details in sku_to_family_vcpu_cost_map.items(): family, vcpus, cost = sku_details family_usage = filtered_usage_info[family] if deployment.instance_count * vcpus + family_usage.current_value <= family_usage.limit: deployment.instance_type = sku_name break if not deployment.instance_type: # if not enough quota, raise an exception and list out SKUs that user needs to request quota for raise Exception( f"There is no quota in the project's region for these model's allowed inference instance types: {allowed_skus}. " "Please request a quota increase for one of these instance types or try to deploying to a project in a region " "with more quota." ) v2_deployment: ManagedOnlineDeployment = ManagedOnlineDeployment( # type: ignore[no-redef] name=deployment.name, endpoint_name=endpoint_name, model=model_id, instance_type=deployment.instance_type, instance_count=1, app_insights_enabled=deployment.app_insights_enabled, data_collector=data_collector, ) if deployment.data_collector_enabled: self._ml_client.online_deployments._register_collection_data_assets(v2_deployment) orchestrators = OperationOrchestrator( operation_container=self._ml_client.online_deployments._all_operations, operation_scope=self._ml_client.online_deployments._operation_scope, operation_config=self._ml_client.online_deployments._operation_config, ) upload_dependencies(v2_deployment, orchestrators) location = self._ml_client.online_deployments._get_workspace_location() v2_deployment.tags = deployment.tags v2_deployment.properties = deployment.properties shutil.rmtree(temp_dir.name) template = get_empty_deployment_arm_template() template["parameters"]["workspaceName"] = {"defaultValue": self._ml_client.workspace_name, "type": "String"} template["parameters"]["onlineEndpointName"] = {"defaultValue": endpoint_name, "type": "String"} template["parameters"]["onlineDeploymentName"] = {"defaultValue": deployment.name, "type": "String"} template["parameters"]["onlineDeploymentProperties"] = {"defaultValue": v2_deployment._to_rest_object(location=location).properties.serialize(), "type": "Object"} template["parameters"]["location"] = {"defaultValue": location, "type": "String"} template["parameters"]["deploymentInstanceCount"] = {"defaultValue": deployment.instance_count, "type": "int"} def lro_callback(raw_response, deserialized, headers): outputs = deserialized.properties.outputs return SingleDeployment._from_v2_endpoint_deployment( self._ml_client.online_endpoints.get(outputs["online_endpoint_name"]["value"]), self._ml_client.online_deployments.get(outputs["online_deployment_name"]["value"], outputs["online_endpoint_name"]["value"]) ) return self._resource_management_client.deployments.begin_create_or_update( self._ml_client.resource_group_name, str(uuid.uuid4()), { "properties": { "template": template, "mode": DeploymentMode.incremental, } }, cls=lro_callback, )
[docs] @distributed_trace @monitor_with_activity(logger, "SingleDeployment.Get", ActivityType.PUBLICAPI) def get(self, name: str, endpoint_name: Optional[str] = None) -> SingleDeployment: """Get a deployment by name. :param name: The deployment name :type name: str :param endpoint_name: The endpoint name :type endpoint_name: str :return: The deployment with the provided name. :rtype: ~azure.ai.resources.entities.SingleDeployment """ endpoint_name = endpoint_name if endpoint_name else name deployment = self._ml_client.online_deployments.get( name=name, endpoint_name=endpoint_name if endpoint_name else name, ) endpoint = self._ml_client.online_endpoints.get(endpoint_name) return SingleDeployment._from_v2_endpoint_deployment(endpoint, deployment)
[docs] @distributed_trace @monitor_with_activity(logger, "SingleDeployment.List", ActivityType.PUBLICAPI) def list(self) -> Iterable[SingleDeployment]: """List all deployments. :return: An iterator of deployment objects :rtype: Iterable[~azure.ai.resources.entities.SingleDeployment] """ deployments = [] endpoints = self._ml_client.online_endpoints.list() for endpoint in endpoints: v2_deployments = self._ml_client.online_deployments.list(endpoint.name) deployments.extend([SingleDeployment._from_v2_endpoint_deployment(endpoint, deployment) for deployment in v2_deployments]) return deployments
[docs] @distributed_trace @monitor_with_activity(logger, "Deployment.GetKeys", ActivityType.PUBLICAPI) def get_keys(self, name: str, endpoint_name: Optional[str] = None) -> DeploymentKeys: """Get the deployment keys. :param name: The deployment name :type name: str :param endpoint_name: The endpoint name :type endpoint_name: str :return: The deployment keys :rtype: ~azure.ai.resources.entities.DeploymentKeys """ endpoint_name = endpoint_name if endpoint_name else name return DeploymentKeys._from_v2_endpoint_keys(self._ml_client.online_endpoints.get_keys(endpoint_name))
[docs] @distributed_trace @monitor_with_activity(logger, "SingleDeployment.Delete", ActivityType.PUBLICAPI) def delete(self, name: str, endpoint_name: Optional[str] = None) -> None: """Delete a deployment. :param name: The deployment name :type name: str :param endpoint_name: The endpoint name :type endpoint_name: str """ self._ml_client.online_deployments.delete( name=name, endpoint_name=endpoint_name if endpoint_name else name, ).result()
[docs] @distributed_trace @monitor_with_activity(logger, "SingleDeployment.Invoke", ActivityType.PUBLICAPI) def invoke(self, name: str, request_file: Union[str, os.PathLike], endpoint_name: Optional[str] = None) -> Any: """Invoke a deployment. :param name: The deployment name :type name: str :param request_file: The request file :type request_file: Union[str, os.PathLike] :param endpoint_name: The endpoint name :type endpoint_name: str :return: The response from the deployment :rtype: Any """ return self._ml_client.online_endpoints.invoke( endpoint_name=endpoint_name if endpoint_name else name, request_file=request_file, deployment_name=name, )
def _check_default_instance_type_and_populate( self, instance_type: str, deployment: SingleDeployment, allowed_instance_types: Optional[List[str]] = None, min_sku_spec: Optional[str] = None, ) -> None: vm_sizes = self._ml_client.compute.list_sizes() inference_sku_vm_info = [vm for vm in vm_sizes if vm.name == instance_type][0] usage_info = self._ml_client.compute.list_usage() # from the list of all usage, get the usage specific to the recommend sku's family sku_family_usage = next( ( usage for usage in usage_info if ( usage.name["value"] == inference_sku_vm_info.family and "Dedicated" in usage.name["localized_value"] ) ) ) # if the family has enough cores available that will not exceed family limit, choose as deployment sku if ( sku_family_usage.current_value + inference_sku_vm_info.v_cp_us * deployment.instance_count <= sku_family_usage.limit ): deployment.instance_type = instance_type else: exception_message = f"The recommended inference instance type for this model is {instance_type}, for which there is not enough quota.\n" if allowed_instance_types: exception_message += ( f"The following instance types are allowed for this model: {allowed_instance_types}. Please provide an instance type from this " "list for which there is enough quota." ) elif min_sku_spec: cpu, gpu, ram, storage = min_sku_spec.split("|") exception_message += ( f"Please provide an instance_type that meets the following minimum parameters: {cpu} vCPU cores, {gpu} GPU cores, " f"{ram} GB of vRAM, {storage} GB of storage." ) raise Exception(exception_message)