Source code for azure.ai.ml.operations._deployment_template_operations

# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

# pylint: disable=broad-exception-caught,protected-access,f-string-without-interpolation
# pylint: disable=too-many-locals,docstring-missing-param,docstring-missing-return,docstring-missing-rtype
# pylint: disable=no-else-return,too-many-statements

from typing import Any, Dict, Iterable, Optional, cast

from azure.ai.ml._scope_dependent_operations import OperationScope, OperationConfig, _ScopeDependentOperations
from azure.ai.ml._telemetry import ActivityType, monitor_with_telemetry_mixin
from azure.ai.ml._utils._experimental import experimental
from azure.ai.ml._utils._logger_utils import OpsLogger
from azure.ai.ml.entities import DeploymentTemplate
from azure.core.tracing.decorator import distributed_trace
from azure.core.exceptions import ResourceNotFoundError

ops_logger = OpsLogger(__name__)
module_logger = ops_logger.module_logger


[docs] @experimental class DeploymentTemplateOperations(_ScopeDependentOperations): """DeploymentTemplateOperations. You should not instantiate this class directly. Instead, you should create an MLClient instance that instantiates it for you and attaches it as an attribute. """ def __init__( self, operation_scope: OperationScope, operation_config: "OperationConfig", service_client_04_2024_dataplanepreview, **kwargs: Dict[str, Any], ): super(DeploymentTemplateOperations, self).__init__(operation_scope, operation_config) # ops_logger.update_info(kwargs) self._operation_scope = operation_scope self._operation_config = operation_config self._service_client = service_client_04_2024_dataplanepreview self._init_kwargs = kwargs def _get_registry_endpoint(self) -> str: """Dynamically determine the registry endpoint based on registry region. :return: The API endpoint URL for the registry :rtype: str """ try: # Import here to avoid circular dependencies from azure.ai.ml.operations import RegistryOperations from azure.ai.ml._restclient.v2022_10_01_preview import ( AzureMachineLearningWorkspaces as ServiceClient102022, ) # Try to get credential from service client or operation config credential = None if hasattr(self._service_client, "_config") and hasattr(self._service_client._config, "credential"): credential = self._service_client._config.credential elif hasattr(self._operation_config, "credential"): credential = self._operation_config.credential if credential and self._operation_scope.registry_name: # Get registry information to determine the region registry_operations = RegistryOperations( operation_scope=self._operation_scope, service_client=ServiceClient102022( credential=credential, subscription_id=self._operation_scope.subscription_id, resource_group_name=self._operation_scope.resource_group_name, ), all_operations=None, # type: ignore[arg-type] credentials=credential, ) registry = registry_operations.get(self._operation_scope.registry_name) # Extract region from registry location or replication locations region = None if registry.location: region = registry.location elif registry.replication_locations and len(registry.replication_locations) > 0: region = registry.replication_locations[0].location if region: # Format the endpoint using the detected region # return f"https://int.experiments.azureml-test.net" return f"https://{region}.api.azureml.ms" except Exception as e: module_logger.warning("Could not determine registry region dynamically: %s. Using default.", e) # Fallback to default region if unable to determine dynamically return f"https://int.experiments.azureml-test.net" def _convert_dict_to_deployment_template(self, dict_data: Dict[str, Any]) -> DeploymentTemplate: """Convert dictionary format to DeploymentTemplate object. This helper method converts a user-provided dictionary to a DeploymentTemplate object using the constructor directly. It handles field name variations and type conversions. :param dict_data: Dictionary containing deployment template data :type dict_data: Dict[str, Any] :return: DeploymentTemplate object :rtype: ~azure.ai.ml.entities.DeploymentTemplate """ # Create a copy to avoid modifying the original data = dict_data.copy() # Handle field name variations (both snake_case and camelCase should work) def get_field_value(data: dict, primary_name: str, alt_name: str = None, default=None): # type: ignore """Get field value, trying both primary and alternative names.""" if primary_name in data: return data[primary_name] elif alt_name and alt_name in data: return data[alt_name] return default # Extract constructor parameters with field name mappings name = data.get("name") version = data.get("version") description = data.get("description") tags = data.get("tags") # Validate required fields if not name: raise ValueError("name is required") if not version: raise ValueError("version is required") # Handle environment field - check multiple possible names environment = ( data.get("environment") or data.get("environment_id") or data.get("environmentId") ) # Also check camelCase REST API format if not environment: raise ValueError("environment is required but was not found in the data") # Handle field name variations for constructor parameters allowed_instance_type = get_field_value(data, "allowed_instance_type", "allowedInstanceType") if isinstance(allowed_instance_type, str): # Convert space-separated string to list allowed_instance_type = allowed_instance_type.split() default_instance_type = get_field_value(data, "default_instance_type", "defaultInstanceType") deployment_template_type = get_field_value(data, "deployment_template_type", "deploymentTemplateType") model_mount_path = get_field_value(data, "model_mount_path", "modelMountPath") scoring_path = get_field_value(data, "scoring_path", "scoringPath") scoring_port = get_field_value(data, "scoring_port", "scoringPort") if scoring_port is not None and isinstance(scoring_port, str): try: scoring_port = int(scoring_port) except (ValueError, TypeError): scoring_port = None instance_count = get_field_value(data, "instance_count", "instanceCount") if instance_count is not None and isinstance(instance_count, str): try: instance_count = int(instance_count) except (ValueError, TypeError): instance_count = None environment_variables = get_field_value(data, "environment_variables", "environmentVariables") # Handle request settings request_settings_data = get_field_value(data, "request_settings", "requestSettings") request_settings = None if request_settings_data and isinstance(request_settings_data, dict): from azure.ai.ml.entities._deployment.deployment_template_settings import OnlineRequestSettings # Handle field name variations in request settings timeout = get_field_value(request_settings_data, "request_timeout_ms", "requestTimeout") # Also check for 'request_timeout' as an alternative name if timeout is None: timeout = request_settings_data.get("request_timeout") max_concurrent = get_field_value( request_settings_data, "max_concurrent_requests_per_instance", "maxConcurrentRequestsPerInstance" ) # Convert string values to integers if needed if timeout is not None and isinstance(timeout, str): try: timeout = int(timeout) except (ValueError, TypeError): timeout = None if max_concurrent is not None and isinstance(max_concurrent, str): try: max_concurrent = int(max_concurrent) except (ValueError, TypeError): max_concurrent = None request_settings = OnlineRequestSettings( request_timeout_ms=timeout, max_concurrent_requests_per_instance=max_concurrent ) # Handle probe settings def create_probe_settings(probe_data): if not probe_data or not isinstance(probe_data, dict): return None from azure.ai.ml.entities._deployment.deployment_template_settings import ProbeSettings # Helper function to convert string values to integers def convert_to_int(value): if value is not None and isinstance(value, str): try: return int(value) except (ValueError, TypeError): return None return value return ProbeSettings( initial_delay=convert_to_int(get_field_value(probe_data, "initial_delay", "initialDelay")), period=convert_to_int(probe_data.get("period")), timeout=convert_to_int(probe_data.get("timeout")), failure_threshold=convert_to_int(get_field_value(probe_data, "failure_threshold", "failureThreshold")), success_threshold=convert_to_int(get_field_value(probe_data, "success_threshold", "successThreshold")), scheme=probe_data.get("scheme"), path=probe_data.get("path"), port=convert_to_int(probe_data.get("port")), method=get_field_value(probe_data, "method", "httpMethod"), ) liveness_probe_data = get_field_value(data, "liveness_probe", "livenessProbe") liveness_probe = create_probe_settings(liveness_probe_data) readiness_probe_data = get_field_value(data, "readiness_probe", "readinessProbe") readiness_probe = create_probe_settings(readiness_probe_data) # Get other fields model = data.get("model") code_configuration = data.get("code_configuration") app_insights_enabled = data.get("app_insights_enabled") stage = data.get("stage") type_field = data.get("type") # Create DeploymentTemplate object using constructor return DeploymentTemplate( name=name, version=version, description=description, tags=tags, environment=environment, request_settings=request_settings, liveness_probe=liveness_probe, readiness_probe=readiness_probe, instance_count=instance_count, instance_type=default_instance_type, model=model, code_configuration=code_configuration, environment_variables=environment_variables, app_insights_enabled=app_insights_enabled, allowed_instance_type=allowed_instance_type, default_instance_type=default_instance_type, scoring_port=scoring_port, scoring_path=scoring_path, model_mount_path=model_mount_path, type=type_field, deployment_template_type=deployment_template_type, stage=stage, )
[docs] @distributed_trace @monitor_with_telemetry_mixin(ops_logger, "DeploymentTemplate.List", ActivityType.PUBLICAPI) def list( self, *, name: Optional[str] = None, tags: Optional[str] = None, count: Optional[int] = None, stage: Optional[str] = None, list_view_type: str = "ActiveOnly", **kwargs: Any, ) -> Iterable[DeploymentTemplate]: """List deployment templates. :keyword name: Filter by deployment template name. :paramtype name: Optional[str] :keyword tags: Comma-separated list of tag names (and optionally values). Example: tag1,tag2=value2. :paramtype tags: Optional[str] :keyword count: Maximum number of items to return. :paramtype count: Optional[int] :keyword stage: Filter by deployment template stage. :paramtype stage: Optional[str] :keyword list_view_type: View type for including/excluding (for example) archived entities. :paramtype list_view_type: str :return: Iterator of deployment template objects. :rtype: ~azure.core.paging.ItemPaged[~azure.ai.ml.entities.DeploymentTemplate] """ endpoint = self._get_registry_endpoint() return cast( Iterable[DeploymentTemplate], self._service_client.deployment_templates.list( endpoint=endpoint, subscription_id=self._operation_scope.subscription_id, resource_group_name=self._operation_scope.resource_group_name, registry_name=self._operation_scope.registry_name, name=name, tags=tags, count=count, stage=stage, list_view_type=list_view_type, **kwargs, ), )
[docs] @distributed_trace @monitor_with_telemetry_mixin(ops_logger, "DeploymentTemplate.Get", ActivityType.PUBLICAPI) def get(self, name: str, version: Optional[str] = None, **kwargs: Any) -> DeploymentTemplate: """Get a deployment template by name and version. :param name: Name of the deployment template. :type name: str :param version: Version of the deployment template. If not provided, gets the latest version. :type version: Optional[str] :return: DeploymentTemplate object. :rtype: ~azure.ai.ml.entities.DeploymentTemplate :raises: ~azure.core.exceptions.ResourceNotFoundError if deployment template not found. """ version = version or "latest" try: endpoint = self._get_registry_endpoint() result = self._service_client.deployment_templates.get( endpoint=endpoint, subscription_id=self._operation_scope.subscription_id, resource_group_name=self._operation_scope.resource_group_name, registry_name=self._operation_scope.registry_name, name=name, version=version, **kwargs, ) return DeploymentTemplate._from_rest_object(result) except Exception as e: module_logger.warning("DeploymentTemplate get operation failed: %s", e) raise ResourceNotFoundError(f"DeploymentTemplate {name}:{version} not found") from e
[docs] @distributed_trace @monitor_with_telemetry_mixin(ops_logger, "DeploymentTemplate.CreateOrUpdate", ActivityType.PUBLICAPI) def create_or_update(self, deployment_template: DeploymentTemplate, **kwargs: Any) -> DeploymentTemplate: """Create or update a deployment template. :param deployment_template: DeploymentTemplate object to create or update, dictionary containing deployment template definition, or path to a YAML file containing deployment template definition. :type deployment_template: Union[DeploymentTemplate, Dict[str, Any], str, PathLike] :return: DeploymentTemplate object representing the created or updated resource. :rtype: ~azure.ai.ml.entities.DeploymentTemplate """ try: # Ensure we have a DeploymentTemplate object if not isinstance(deployment_template, DeploymentTemplate): raise ValueError("deployment_template must be a DeploymentTemplate object") if hasattr(self._service_client, "deployment_templates"): endpoint = self._get_registry_endpoint() rest_object = deployment_template._to_rest_object() self._service_client.deployment_templates.begin_create( endpoint=endpoint, subscription_id=self._operation_scope.subscription_id, resource_group_name=self._operation_scope.resource_group_name, registry_name=self._operation_scope.registry_name, name=deployment_template.name, version=deployment_template.version, body=rest_object, **kwargs, ) return deployment_template else: raise RuntimeError("DeploymentTemplate service not available") except Exception as e: module_logger.error("DeploymentTemplate create_or_update operation failed: %s", e) raise
[docs] @distributed_trace @monitor_with_telemetry_mixin(ops_logger, "DeploymentTemplate.Delete", ActivityType.PUBLICAPI) def delete(self, name: str, version: Optional[str] = None, **kwargs: Any) -> None: """Delete a deployment template. :param name: Name of the deployment template to delete. :type name: str :param version: Version of the deployment template to delete. If not provided, deletes the latest version. :type version: Optional[str] """ version = version or "latest" try: if hasattr(self._service_client, "deployment_templates"): endpoint = self._get_registry_endpoint() self._service_client.deployment_templates.delete_deployment_template( endpoint=endpoint, subscription_id=self._operation_scope.subscription_id, resource_group_name=self._operation_scope.resource_group_name, registry_name=self._operation_scope.registry_name, name=name, version=version, **kwargs, ) else: raise RuntimeError("DeploymentTemplate service not available") except ResourceNotFoundError: raise except Exception as e: module_logger.error("DeploymentTemplate delete operation failed: %s", e) raise
[docs] @distributed_trace @monitor_with_telemetry_mixin(ops_logger, "DeploymentTemplate.Archive", ActivityType.PUBLICAPI) def archive(self, name: str, version: Optional[str] = None, **kwargs: Any) -> DeploymentTemplate: """Archive a deployment template by setting its stage to 'Archived'. :param name: Name of the deployment template to archive. :type name: str :param version: Version of the deployment template to archive. If not provided, archives the latest version. :type version: Optional[str] :return: DeploymentTemplate object representing the archived template. :rtype: ~azure.ai.ml.entities.DeploymentTemplate :raises: ~azure.core.exceptions.ResourceNotFoundError if deployment template not found. """ try: # Get the existing template template = self.get(name=name, version=version, **kwargs) # Set stage to Archived template.stage = "Archived" # Update the template using create_or_update return self.create_or_update(template, **kwargs) except Exception as e: module_logger.error("DeploymentTemplate archive operation failed: %s", e) raise
[docs] @distributed_trace @monitor_with_telemetry_mixin(ops_logger, "DeploymentTemplate.Restore", ActivityType.PUBLICAPI) def restore(self, name: str, version: Optional[str] = None, **kwargs: Any) -> DeploymentTemplate: """Restore a deployment template by setting its stage to 'Development'. :param name: Name of the deployment template to restore. :type name: str :param version: Version of the deployment template to restore. If not provided, restores the latest version. :type version: Optional[str] :return: DeploymentTemplate object representing the restored template. :rtype: ~azure.ai.ml.entities.DeploymentTemplate :raises: ~azure.core.exceptions.ResourceNotFoundError if deployment template not found. """ try: # Get the existing template template = self.get(name=name, version=version, **kwargs) # Set stage to Development template.stage = "Development" # Update the template using create_or_update return self.create_or_update(template, **kwargs) except Exception as e: module_logger.error("DeploymentTemplate restore operation failed: %s", e) raise