# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
# pylint: disable=protected-access,broad-except
import random
import re
import subprocess
from typing import Any, Dict, Optional
from marshmallow.exceptions import ValidationError as SchemaValidationError
from azure.ai.ml._exception_helper import log_and_raise_error
from azure.ai.ml._local_endpoints import LocalEndpointMode
from azure.ai.ml._restclient.v2022_02_01_preview.models import DeploymentLogsRequest
from azure.ai.ml._restclient.v2023_04_01_preview import AzureMachineLearningWorkspaces as ServiceClient042023Preview
from azure.ai.ml._scope_dependent_operations import (
OperationConfig,
OperationsContainer,
OperationScope,
_ScopeDependentOperations,
)
from azure.ai.ml._telemetry import ActivityType, monitor_with_activity
from azure.ai.ml._utils._arm_id_utils import AMLVersionedArmId
from azure.ai.ml._utils._azureml_polling import AzureMLPolling
from azure.ai.ml._utils._endpoint_utils import upload_dependencies, validate_scoring_script
from azure.ai.ml._utils._logger_utils import OpsLogger
from azure.ai.ml._utils._package_utils import package_deployment
from azure.ai.ml.constants._common import ARM_ID_PREFIX, AzureMLResourceType, LROConfigurations
from azure.ai.ml.constants._deployment import DEFAULT_MDC_PATH, EndpointDeploymentLogContainerType, SmallSKUs
from azure.ai.ml.entities import Data, OnlineDeployment
from azure.ai.ml.exceptions import (
ErrorCategory,
ErrorTarget,
InvalidVSCodeRequestError,
LocalDeploymentGPUNotAvailable,
ValidationErrorType,
ValidationException,
)
from azure.core.credentials import TokenCredential
from azure.core.paging import ItemPaged
from azure.core.polling import LROPoller
from azure.core.tracing.decorator import distributed_trace
from ._local_deployment_helper import _LocalDeploymentHelper
from ._operation_orchestrator import OperationOrchestrator
ops_logger = OpsLogger(__name__)
module_logger = ops_logger.module_logger
[docs]
class OnlineDeploymentOperations(_ScopeDependentOperations):
"""OnlineDeploymentOperations.
You should not instantiate this class directly. Instead, you should create an MLClient instance that instantiates it
for you and attaches it as an attribute.
"""
def __init__(
self,
operation_scope: OperationScope,
operation_config: OperationConfig,
service_client_04_2023_preview: ServiceClient042023Preview,
all_operations: OperationsContainer,
local_deployment_helper: _LocalDeploymentHelper,
credentials: Optional[TokenCredential] = None,
**kwargs: Dict,
):
super(OnlineDeploymentOperations, self).__init__(operation_scope, operation_config)
ops_logger.update_filter()
self._local_deployment_helper = local_deployment_helper
self._online_deployment = service_client_04_2023_preview.online_deployments
self._online_endpoint_operations = service_client_04_2023_preview.online_endpoints
self._all_operations = all_operations
self._credentials = credentials
self._init_kwargs = kwargs
[docs]
@distributed_trace
@monitor_with_activity(ops_logger, "OnlineDeployment.BeginCreateOrUpdate", ActivityType.PUBLICAPI)
def begin_create_or_update(
self,
deployment: OnlineDeployment,
*,
local: bool = False,
vscode_debug: bool = False,
skip_script_validation: bool = False,
local_enable_gpu: bool = False,
**kwargs: Any,
) -> LROPoller[OnlineDeployment]:
"""Create or update a deployment.
:param deployment: the deployment entity
:type deployment: ~azure.ai.ml.entities.OnlineDeployment
:keyword local: Whether deployment should be created locally, defaults to False
:paramtype local: bool
:keyword vscode_debug: Whether to open VSCode instance to debug local deployment, defaults to False
:paramtype vscode_debug: bool
:keyword skip_script_validation: Whether or not to skip validation of the deployment script. Defaults to False.
:paramtype skip_script_validation: bool
:keyword local_enable_gpu: enable local container to access gpu
:paramtype local_enable_gpu: bool
:raises ~azure.ai.ml.exceptions.ValidationException: Raised if OnlineDeployment cannot
be successfully validated. Details will be provided in the error message.
:raises ~azure.ai.ml.exceptions.AssetException: Raised if OnlineDeployment assets
(e.g. Data, Code, Model, Environment) cannot be successfully validated.
Details will be provided in the error message.
:raises ~azure.ai.ml.exceptions.ModelException: Raised if OnlineDeployment model cannot be
successfully validated. Details will be provided in the error message.
:raises ~azure.ai.ml.exceptions.DeploymentException: Raised if OnlineDeployment type is unsupported.
Details will be provided in the error message.
:raises ~azure.ai.ml.exceptions.LocalEndpointNotFoundError: Raised if local endpoint resource does not exist.
:raises ~azure.ai.ml.exceptions.LocalEndpointInFailedStateError: Raised if local endpoint is in a failed state.
:raises ~azure.ai.ml.exceptions.InvalidLocalEndpointError: Raised if Docker image cannot be
found for local deployment.
:raises ~azure.ai.ml.exceptions.LocalEndpointImageBuildError: Raised if Docker image cannot be
successfully built for local deployment.
:raises ~azure.ai.ml.exceptions.RequiredLocalArtifactsNotFoundError: Raised if local artifacts cannot be
found for local deployment.
:raises ~azure.ai.ml.exceptions.InvalidVSCodeRequestError: Raised if VS Debug is invoked with a remote endpoint.
VSCode debug is only supported for local endpoints.
:raises ~azure.ai.ml.exceptions.LocalDeploymentGPUNotAvailable: Raised if Nvidia GPU is not available in the
system and local_enable_gpu is set while local deployment
:raises ~azure.ai.ml.exceptions.VSCodeCommandNotFound: Raised if VSCode instance cannot be instantiated.
:return: A poller to track the operation status
:rtype: ~azure.core.polling.LROPoller[~azure.ai.ml.entities.OnlineDeployment]
"""
try:
if vscode_debug and not local:
raise InvalidVSCodeRequestError(
msg="VSCode Debug is only support for local endpoints. Please set local to True."
)
if local:
if local_enable_gpu:
try:
subprocess.run("nvidia-smi", check=True)
except Exception as ex:
raise LocalDeploymentGPUNotAvailable(
msg=(
"Nvidia GPU is not available in your local system."
" Use nvidia-smi command to see the available GPU"
)
) from ex
return self._local_deployment_helper.create_or_update(
deployment=deployment,
local_endpoint_mode=self._get_local_endpoint_mode(vscode_debug),
local_enable_gpu=local_enable_gpu,
)
if deployment and deployment.instance_type and deployment.instance_type.lower() in SmallSKUs:
module_logger.warning(
"Instance type %s may be too small for compute resources. "
"Minimum recommended compute SKU is Standard_DS3_v2 for general purpose endpoints. Learn more about SKUs here: " # pylint: disable=line-too-long
"https://learn.microsoft.com/azure/machine-learning/referencemanaged-online-endpoints-vm-sku-list",
deployment.instance_type,
)
if (
not skip_script_validation
and deployment
and deployment.code_configuration
and not deployment.code_configuration.code.startswith(ARM_ID_PREFIX) # type: ignore[union-attr]
and not re.match(AMLVersionedArmId.REGEX_PATTERN, deployment.code_configuration.code) # type: ignore
):
validate_scoring_script(deployment)
path_format_arguments = {
"endpointName": deployment.name,
"resourceGroupName": self._resource_group_name,
"workspaceName": self._workspace_name,
}
# This get() is to ensure, the endpoint exists and fail before even start the deployment
module_logger.info("Check: endpoint %s exists", deployment.endpoint_name)
self._online_endpoint_operations.get(
resource_group_name=self._resource_group_name,
workspace_name=self._workspace_name,
endpoint_name=deployment.endpoint_name,
)
orchestrators = OperationOrchestrator(
operation_container=self._all_operations,
operation_scope=self._operation_scope,
operation_config=self._operation_config,
)
if deployment.data_collector:
self._register_collection_data_assets(deployment=deployment)
upload_dependencies(deployment, orchestrators)
try:
location = self._get_workspace_location()
is_package_model = deployment.package_model if hasattr(deployment, "package_model") else False
if kwargs.pop("package_model", False) or is_package_model:
deployment = package_deployment(deployment, self._all_operations.all_operations["models"])
module_logger.info("\nStarting deployment")
deployment_rest = deployment._to_rest_object(location=location) # type: ignore
poller = self._online_deployment.begin_create_or_update(
resource_group_name=self._resource_group_name,
workspace_name=self._workspace_name,
endpoint_name=deployment.endpoint_name,
deployment_name=deployment.name,
body=deployment_rest,
polling=AzureMLPolling(
LROConfigurations.POLL_INTERVAL,
path_format_arguments=path_format_arguments,
**self._init_kwargs,
),
polling_interval=LROConfigurations.POLL_INTERVAL,
**self._init_kwargs,
cls=lambda response, deserialized, headers: OnlineDeployment._from_rest_object(deserialized),
)
return poller
except Exception as ex:
raise ex
except Exception as ex: # pylint: disable=W0718
if isinstance(ex, (ValidationException, SchemaValidationError)):
log_and_raise_error(ex)
else:
raise ex
[docs]
@distributed_trace
@monitor_with_activity(ops_logger, "OnlineDeployment.Get", ActivityType.PUBLICAPI)
def get(self, name: str, endpoint_name: str, *, local: Optional[bool] = False) -> OnlineDeployment:
"""Get a deployment resource.
:param name: The name of the deployment
:type name: str
:param endpoint_name: The name of the endpoint
:type endpoint_name: str
:keyword local: Whether deployment should be retrieved from local docker environment, defaults to False
:paramtype local: Optional[bool]
:raises ~azure.ai.ml.exceptions.LocalEndpointNotFoundError: Raised if local endpoint resource does not exist.
:return: a deployment entity
:rtype: ~azure.ai.ml.entities.OnlineDeployment
"""
if local:
deployment = self._local_deployment_helper.get(endpoint_name=endpoint_name, deployment_name=name)
else:
deployment = OnlineDeployment._from_rest_object(
self._online_deployment.get(
endpoint_name=endpoint_name,
deployment_name=name,
resource_group_name=self._resource_group_name,
workspace_name=self._workspace_name,
**self._init_kwargs,
)
)
deployment.endpoint_name = endpoint_name
return deployment
[docs]
@distributed_trace
@monitor_with_activity(ops_logger, "OnlineDeployment.Delete", ActivityType.PUBLICAPI)
def begin_delete(self, name: str, endpoint_name: str, *, local: Optional[bool] = False) -> LROPoller[None]:
"""Delete a deployment.
:param name: The name of the deployment
:type name: str
:param endpoint_name: The name of the endpoint
:type endpoint_name: str
:keyword local: Whether deployment should be retrieved from local docker environment, defaults to False
:paramtype local: Optional[bool]
:raises ~azure.ai.ml.exceptions.LocalEndpointNotFoundError: Raised if local endpoint resource does not exist.
:return: A poller to track the operation status
:rtype: ~azure.core.polling.LROPoller[None]
"""
if local:
return self._local_deployment_helper.delete(name=endpoint_name, deployment_name=name)
return self._online_deployment.begin_delete(
endpoint_name=endpoint_name,
deployment_name=name,
resource_group_name=self._resource_group_name,
workspace_name=self._workspace_name,
**self._init_kwargs,
)
[docs]
@distributed_trace
@monitor_with_activity(ops_logger, "OnlineDeployment.GetLogs", ActivityType.PUBLICAPI)
def get_logs(
self,
name: str,
endpoint_name: str,
lines: int,
*,
container_type: Optional[str] = None,
local: bool = False,
) -> str:
"""Retrive the logs from online deployment.
:param name: The name of the deployment
:type name: str
:param endpoint_name: The name of the endpoint
:type endpoint_name: str
:param lines: The maximum number of lines to tail
:type lines: int
:keyword container_type: The type of container to retrieve logs from. Possible values include:
"StorageInitializer", "InferenceServer", defaults to None
:type container_type: Optional[str]
:keyword local: [description], defaults to False
:paramtype local: bool
:return: the logs
:rtype: str
"""
if local:
return self._local_deployment_helper.get_deployment_logs(
endpoint_name=endpoint_name, deployment_name=name, lines=lines
)
if container_type:
container_type = self._validate_deployment_log_container_type(container_type) # type: ignore
log_request = DeploymentLogsRequest(container_type=container_type, tail=lines)
return str(
self._online_deployment.get_logs(
resource_group_name=self._resource_group_name,
workspace_name=self._workspace_name,
endpoint_name=endpoint_name,
deployment_name=name,
body=log_request,
**self._init_kwargs,
).content
)
[docs]
@distributed_trace
@monitor_with_activity(ops_logger, "OnlineDeployment.List", ActivityType.PUBLICAPI)
def list(self, endpoint_name: str, *, local: bool = False) -> ItemPaged[OnlineDeployment]:
"""List a deployment resource.
:param endpoint_name: The name of the endpoint
:type endpoint_name: str
:keyword local: Whether deployment should be retrieved from local docker environment, defaults to False
:paramtype local: bool
:return: an iterator of deployment entities
:rtype: Iterable[~azure.ai.ml.entities.OnlineDeployment]
"""
if local:
return self._local_deployment_helper.list()
return self._online_deployment.list(
endpoint_name=endpoint_name,
resource_group_name=self._resource_group_name,
workspace_name=self._workspace_name,
cls=lambda objs: [OnlineDeployment._from_rest_object(obj) for obj in objs],
**self._init_kwargs,
)
def _validate_deployment_log_container_type(self, container_type: EndpointDeploymentLogContainerType) -> str:
if container_type == EndpointDeploymentLogContainerType.INFERENCE_SERVER:
return EndpointDeploymentLogContainerType.INFERENCE_SERVER_REST
if container_type == EndpointDeploymentLogContainerType.STORAGE_INITIALIZER:
return EndpointDeploymentLogContainerType.STORAGE_INITIALIZER_REST
msg = "Invalid container type '{}'. Supported container types are {} and {}"
msg = msg.format(
container_type,
EndpointDeploymentLogContainerType.INFERENCE_SERVER,
EndpointDeploymentLogContainerType.STORAGE_INITIALIZER,
)
raise ValidationException(
message=msg,
target=ErrorTarget.ONLINE_DEPLOYMENT,
no_personal_data_message=msg,
error_category=ErrorCategory.USER_ERROR,
error_type=ValidationErrorType.INVALID_VALUE,
)
def _get_ARM_deployment_name(self, name: str) -> str:
random.seed(version=2)
return f"{self._workspace_name}-{name}-{random.randint(1, 10000000)}"
def _get_workspace_location(self) -> str:
"""Get the workspace location
TODO[TASK 1260265]: can we cache this information and only refresh when the operation_scope is changed?
:return: The workspace location
:rtype: str
"""
return str(
self._all_operations.all_operations[AzureMLResourceType.WORKSPACE].get(self._workspace_name).location
)
def _get_local_endpoint_mode(self, vscode_debug: Any) -> LocalEndpointMode:
return LocalEndpointMode.VSCodeDevContainer if vscode_debug else LocalEndpointMode.DetachedContainer
def _register_collection_data_assets(self, deployment: OnlineDeployment) -> None:
for name, value in deployment.data_collector.collections.items():
data_name = f"{deployment.endpoint_name}-{deployment.name}-{name}"
data_version = "1"
data_path = f"{DEFAULT_MDC_PATH}/{deployment.endpoint_name}/{deployment.name}/{name}"
if value.data:
if value.data.name:
data_name = value.data.name
if value.data.version:
data_version = value.data.version
if value.data.path:
data_path = value.data.path
data_object = Data(
name=data_name,
version=data_version,
path=data_path,
)
try:
result = self._all_operations._all_operations[AzureMLResourceType.DATA].create_or_update(data_object)
except Exception as e:
if "already exists" in str(e):
result = self._all_operations._all_operations[AzureMLResourceType.DATA].get(data_name, data_version)
else:
raise e
deployment.data_collector.collections[name].data = (
f"/subscriptions/{self._subscription_id}/resourceGroups/{self._resource_group_name}"
f"/providers/Microsoft.MachineLearningServices/workspaces/{self._workspace_name}"
f"/data/{result.name}/versions/{result.version}"
)