# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
# pylint: disable=protected-access
from contextlib import contextmanager
from typing import Any, Generator, Iterable, Optional, Union, cast
from marshmallow.exceptions import ValidationError as SchemaValidationError
from azure.ai.ml._artifacts._artifact_utilities import _check_and_upload_env_build_context
from azure.ai.ml._exception_helper import log_and_raise_error
from azure.ai.ml._restclient.v2021_10_01_dataplanepreview import (
AzureMachineLearningWorkspaces as ServiceClient102021Dataplane,
)
from azure.ai.ml._restclient.v2023_04_01_preview import AzureMachineLearningWorkspaces as ServiceClient042023Preview
from azure.ai.ml._restclient.v2023_04_01_preview.models import EnvironmentVersion, ListViewType
from azure.ai.ml._scope_dependent_operations import (
OperationConfig,
OperationsContainer,
OperationScope,
_ScopeDependentOperations,
)
from azure.ai.ml._telemetry import ActivityType, monitor_with_activity
from azure.ai.ml._utils._asset_utils import (
_archive_or_restore,
_get_latest,
_get_next_latest_versions_from_container,
_resolve_label_to_asset,
)
from azure.ai.ml._utils._experimental import experimental
from azure.ai.ml._utils._logger_utils import OpsLogger
from azure.ai.ml._utils._registry_utils import (
get_asset_body_for_registry_storage,
get_registry_client,
get_sas_uri_for_registry_asset,
)
from azure.ai.ml.constants._common import ARM_ID_PREFIX, ASSET_ID_FORMAT, AzureMLResourceType
from azure.ai.ml.entities._assets import Environment, WorkspaceAssetReference
from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationErrorType, ValidationException
from azure.core.exceptions import ResourceExistsError, ResourceNotFoundError
ops_logger = OpsLogger(__name__)
module_logger = ops_logger.module_logger
[docs]
class EnvironmentOperations(_ScopeDependentOperations):
"""EnvironmentOperations.
You should not instantiate this class directly. Instead, you should
create an MLClient instance that instantiates it for you and
attaches it as an attribute.
:param operation_scope: Scope variables for the operations classes of an MLClient object.
:type operation_scope: ~azure.ai.ml._scope_dependent_operations.OperationScope
:param operation_config: Common configuration for operations classes of an MLClient object.
:type operation_config: ~azure.ai.ml._scope_dependent_operations.OperationConfig
:param service_client: Service client to allow end users to operate on Azure Machine Learning Workspace
resources (ServiceClient042023Preview or ServiceClient102021Dataplane).
:type service_client: typing.Union[
~azure.ai.ml._restclient.v2023_04_01_preview._azure_machine_learning_workspaces.AzureMachineLearningWorkspaces,
~azure.ai.ml._restclient.v2021_10_01_dataplanepreview._azure_machine_learning_workspaces.
AzureMachineLearningWorkspaces]
:param all_operations: All operations classes of an MLClient object.
:type all_operations: ~azure.ai.ml._scope_dependent_operations.OperationsContainer
"""
def __init__(
self,
operation_scope: OperationScope,
operation_config: OperationConfig,
service_client: Union[ServiceClient042023Preview, ServiceClient102021Dataplane],
all_operations: OperationsContainer,
**kwargs: Any,
):
super(EnvironmentOperations, self).__init__(operation_scope, operation_config)
ops_logger.update_filter()
self._kwargs = kwargs
self._containers_operations = service_client.environment_containers
self._version_operations = service_client.environment_versions
self._service_client = service_client
self._all_operations = all_operations
self._datastore_operation = all_operations.all_operations[AzureMLResourceType.DATASTORE]
# Maps a label to a function which given an asset name,
# returns the asset associated with the label
self._managed_label_resolver = {"latest": self._get_latest_version}
[docs]
@monitor_with_activity(ops_logger, "Environment.CreateOrUpdate", ActivityType.PUBLICAPI)
def create_or_update(self, environment: Environment) -> Environment: # type: ignore
"""Returns created or updated environment asset.
:param environment: Environment object
:type environment: ~azure.ai.ml.entities._assets.Environment
:raises ~azure.ai.ml.exceptions.ValidationException: Raised if Environment cannot be successfully validated.
Details will be provided in the error message.
:raises ~azure.ai.ml.exceptions.EmptyDirectoryError: Raised if local path provided points to an empty directory.
:return: Created or updated Environment object
:rtype: ~azure.ai.ml.entities.Environment
.. admonition:: Example:
.. literalinclude:: ../samples/ml_samples_misc.py
:start-after: [START env_operations_create_or_update]
:end-before: [END env_operations_create_or_update]
:language: python
:dedent: 8
:caption: Create environment.
"""
try:
if not environment.version and environment._auto_increment_version:
next_version, latest_version = _get_next_latest_versions_from_container(
name=environment.name,
container_operation=self._containers_operations,
resource_group_name=self._operation_scope.resource_group_name,
workspace_name=self._workspace_name,
registry_name=self._registry_name,
**self._kwargs,
)
# If user not passing the version, SDK will try to update the latest version
return self._try_update_latest_version(next_version, latest_version, environment)
sas_uri = None
if self._registry_name:
if isinstance(environment, WorkspaceAssetReference):
# verify that environment is not already in registry
try:
self._version_operations.get(
name=environment.name,
version=environment.version,
resource_group_name=self._resource_group_name,
registry_name=self._registry_name,
)
except Exception as err: # pylint: disable=W0718
if isinstance(err, ResourceNotFoundError):
pass
else:
raise err
else:
msg = "A environment with this name and version already exists in registry"
raise ValidationException(
message=msg,
no_personal_data_message=msg,
target=ErrorTarget.ENVIRONMENT,
error_category=ErrorCategory.USER_ERROR,
)
environment_rest = environment._to_rest_object()
result = self._service_client.resource_management_asset_reference.begin_import_method(
resource_group_name=self._resource_group_name,
registry_name=self._registry_name,
body=environment_rest,
**self._kwargs,
).result()
if not result:
env_rest_obj = self._get(name=environment.name, version=environment.version)
return Environment._from_rest_object(env_rest_obj)
sas_uri = get_sas_uri_for_registry_asset(
service_client=self._service_client,
name=environment.name,
version=environment.version,
resource_group=self._resource_group_name,
registry=self._registry_name,
body=get_asset_body_for_registry_storage(
self._registry_name,
"environments",
environment.name,
environment.version,
),
)
# upload only in case of when its not registry
# or successfully acquire sas_uri
if not self._registry_name or sas_uri:
environment = _check_and_upload_env_build_context(
environment=environment,
operations=self,
sas_uri=sas_uri,
show_progress=self._show_progress,
)
env_version_resource = environment._to_rest_object()
env_rest_obj = (
self._version_operations.begin_create_or_update(
name=environment.name,
version=environment.version,
registry_name=self._registry_name,
body=env_version_resource,
**self._scope_kwargs,
**self._kwargs,
).result()
if self._registry_name
else self._version_operations.create_or_update(
name=environment.name,
version=environment.version,
workspace_name=self._workspace_name,
body=env_version_resource,
**self._scope_kwargs,
**self._kwargs,
)
)
if not env_rest_obj and self._registry_name:
env_rest_obj = self._get(name=str(environment.name), version=environment.version)
return Environment._from_rest_object(env_rest_obj)
except Exception as ex: # pylint: disable=W0718
if isinstance(ex, SchemaValidationError):
log_and_raise_error(ex)
else:
raise ex
def _try_update_latest_version(
self, next_version: str, latest_version: str, environment: Environment
) -> Environment:
env = None
if self._registry_name:
environment.version = next_version
env = self.create_or_update(environment)
else:
environment.version = latest_version
try: # Try to update the latest version
env = self.create_or_update(environment)
except Exception as ex: # pylint: disable=W0718
if isinstance(ex, ResourceExistsError):
environment.version = next_version
env = self.create_or_update(environment)
else:
raise ex
return env
def _get(self, name: str, version: Optional[str] = None) -> EnvironmentVersion:
if version:
return (
self._version_operations.get(
name=name,
version=version,
registry_name=self._registry_name,
**self._scope_kwargs,
**self._kwargs,
)
if self._registry_name
else self._version_operations.get(
name=name,
version=version,
workspace_name=self._workspace_name,
**self._scope_kwargs,
**self._kwargs,
)
)
return (
self._containers_operations.get(
name=name,
registry_name=self._registry_name,
**self._scope_kwargs,
**self._kwargs,
)
if self._registry_name
else self._containers_operations.get(
name=name,
workspace_name=self._workspace_name,
**self._scope_kwargs,
**self._kwargs,
)
)
[docs]
@monitor_with_activity(ops_logger, "Environment.Get", ActivityType.PUBLICAPI)
def get(self, name: str, version: Optional[str] = None, label: Optional[str] = None) -> Environment:
"""Returns the specified environment asset.
:param name: Name of the environment.
:type name: str
:param version: Version of the environment.
:type version: str
:param label: Label of the environment. (mutually exclusive with version)
:type label: str
:raises ~azure.ai.ml.exceptions.ValidationException: Raised if Environment cannot be successfully validated.
Details will be provided in the error message.
:return: Environment object
:rtype: ~azure.ai.ml.entities.Environment
.. admonition:: Example:
.. literalinclude:: ../samples/ml_samples_misc.py
:start-after: [START env_operations_get]
:end-before: [END env_operations_get]
:language: python
:dedent: 8
:caption: Get example.
"""
if version and label:
msg = "Cannot specify both version and label."
raise ValidationException(
message=msg,
target=ErrorTarget.ENVIRONMENT,
no_personal_data_message=msg,
error_category=ErrorCategory.USER_ERROR,
error_type=ValidationErrorType.INVALID_VALUE,
)
if label:
return _resolve_label_to_asset(self, name, label)
if not version:
msg = "Must provide either version or label."
raise ValidationException(
message=msg,
target=ErrorTarget.ENVIRONMENT,
no_personal_data_message=msg,
error_category=ErrorCategory.USER_ERROR,
error_type=ValidationErrorType.MISSING_FIELD,
)
name = _preprocess_environment_name(name)
env_version_resource = self._get(name, version)
return Environment._from_rest_object(env_version_resource)
[docs]
@monitor_with_activity(ops_logger, "Environment.List", ActivityType.PUBLICAPI)
def list(
self,
name: Optional[str] = None,
*,
list_view_type: ListViewType = ListViewType.ACTIVE_ONLY,
) -> Iterable[Environment]:
"""List all environment assets in workspace.
:param name: Name of the environment.
:type name: Optional[str]
:keyword list_view_type: View type for including/excluding (for example) archived environments.
Default: ACTIVE_ONLY.
:type list_view_type: Optional[ListViewType]
:return: An iterator like instance of Environment objects.
:rtype: ~azure.core.paging.ItemPaged[Environment]
.. admonition:: Example:
.. literalinclude:: ../samples/ml_samples_misc.py
:start-after: [START env_operations_list]
:end-before: [END env_operations_list]
:language: python
:dedent: 8
:caption: List example.
"""
if name:
return cast(
Iterable[Environment],
(
self._version_operations.list(
name=name,
registry_name=self._registry_name,
cls=lambda objs: [Environment._from_rest_object(obj) for obj in objs],
**self._scope_kwargs,
**self._kwargs,
)
if self._registry_name
else self._version_operations.list(
name=name,
workspace_name=self._workspace_name,
cls=lambda objs: [Environment._from_rest_object(obj) for obj in objs],
list_view_type=list_view_type,
**self._scope_kwargs,
**self._kwargs,
)
),
)
return cast(
Iterable[Environment],
(
self._containers_operations.list(
registry_name=self._registry_name,
cls=lambda objs: [Environment._from_container_rest_object(obj) for obj in objs],
**self._scope_kwargs,
**self._kwargs,
)
if self._registry_name
else self._containers_operations.list(
workspace_name=self._workspace_name,
cls=lambda objs: [Environment._from_container_rest_object(obj) for obj in objs],
list_view_type=list_view_type,
**self._scope_kwargs,
**self._kwargs,
)
),
)
[docs]
@monitor_with_activity(ops_logger, "Environment.Delete", ActivityType.PUBLICAPI)
def archive(
self,
name: str,
version: Optional[str] = None,
label: Optional[str] = None,
# pylint:disable=unused-argument
**kwargs: Any,
) -> None:
"""Archive an environment or an environment version.
:param name: Name of the environment.
:type name: str
:param version: Version of the environment.
:type version: str
:param label: Label of the environment. (mutually exclusive with version)
:type label: str
.. admonition:: Example:
.. literalinclude:: ../samples/ml_samples_misc.py
:start-after: [START env_operations_archive]
:end-before: [END env_operations_archive]
:language: python
:dedent: 8
:caption: Archive example.
"""
name = _preprocess_environment_name(name)
_archive_or_restore(
asset_operations=self,
version_operation=self._version_operations,
container_operation=self._containers_operations,
is_archived=True,
name=name,
version=version,
label=label,
)
[docs]
@monitor_with_activity(ops_logger, "Environment.Restore", ActivityType.PUBLICAPI)
def restore(
self,
name: str,
version: Optional[str] = None,
label: Optional[str] = None,
# pylint:disable=unused-argument
**kwargs: Any,
) -> None:
"""Restore an archived environment version.
:param name: Name of the environment.
:type name: str
:param version: Version of the environment.
:type version: str
:param label: Label of the environment. (mutually exclusive with version)
:type label: str
.. admonition:: Example:
.. literalinclude:: ../samples/ml_samples_misc.py
:start-after: [START env_operations_restore]
:end-before: [END env_operations_restore]
:language: python
:dedent: 8
:caption: Restore example.
"""
name = _preprocess_environment_name(name)
_archive_or_restore(
asset_operations=self,
version_operation=self._version_operations,
container_operation=self._containers_operations,
is_archived=False,
name=name,
version=version,
label=label,
)
def _get_latest_version(self, name: str) -> Environment:
"""Returns the latest version of the asset with the given name.
Latest is defined as the most recently created, not the most
recently updated.
:param name: The asset name
:type name: str
:return: The latest version of the named environment
:rtype: Environment
"""
result = _get_latest(
name,
self._version_operations,
self._resource_group_name,
self._workspace_name,
self._registry_name,
)
return Environment._from_rest_object(result)
[docs]
@monitor_with_activity(ops_logger, "Environment.Share", ActivityType.PUBLICAPI)
@experimental
def share(
self,
name: str,
version: str,
*,
share_with_name: str,
share_with_version: str,
registry_name: str,
) -> Environment:
"""Share a environment asset from workspace to registry.
:param name: Name of environment asset.
:type name: str
:param version: Version of environment asset.
:type version: str
:keyword share_with_name: Name of environment asset to share with.
:paramtype share_with_name: str
:keyword share_with_version: Version of environment asset to share with.
:paramtype share_with_version: str
:keyword registry_name: Name of the destination registry.
:paramtype registry_name: str
:return: Environment asset object.
:rtype: ~azure.ai.ml.entities.Environment
"""
# Get workspace info to get workspace GUID
workspace = self._service_client.workspaces.get(
resource_group_name=self._resource_group_name,
workspace_name=self._workspace_name,
)
workspace_guid = workspace.workspace_id
workspace_location = workspace.location
# Get environment asset ID
asset_id = ASSET_ID_FORMAT.format(
workspace_location,
workspace_guid,
AzureMLResourceType.ENVIRONMENT,
name,
version,
)
environment_ref = WorkspaceAssetReference(
name=share_with_name if share_with_name else name,
version=share_with_version if share_with_version else version,
asset_id=asset_id,
)
with self._set_registry_client(registry_name):
return self.create_or_update(environment_ref)
@contextmanager
# pylint: disable-next=docstring-missing-return,docstring-missing-rtype
def _set_registry_client(self, registry_name: str) -> Generator:
"""Sets the registry client for the environment operations.
:param registry_name: Name of the registry.
:type registry_name: str
"""
rg_ = self._operation_scope._resource_group_name
sub_ = self._operation_scope._subscription_id
registry_ = self._operation_scope.registry_name
client_ = self._service_client
environment_versions_operation_ = self._version_operations
try:
_client, _rg, _sub, _model_client = get_registry_client(
self._service_client._config.credential, registry_name
)
self._operation_scope.registry_name = registry_name
self._operation_scope._resource_group_name = _rg
self._operation_scope._subscription_id = _sub
self._service_client = _client
self._version_operations = _client.environment_versions
yield
finally:
self._operation_scope.registry_name = registry_
self._operation_scope._resource_group_name = rg_
self._operation_scope._subscription_id = sub_
self._service_client = client_
self._version_operations = environment_versions_operation_
def _preprocess_environment_name(environment_name: str) -> str:
if environment_name.startswith(ARM_ID_PREFIX):
return environment_name[len(ARM_ID_PREFIX) :]
return environment_name