# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
# pylint: disable=protected-access
import logging
import os
from typing import Dict, Optional, Union
from marshmallow import INCLUDE
from azure.ai.ml._restclient.v2023_04_01_preview.models import SweepJob
from azure.ai.ml._schema.core.fields import ExperimentalField
from azure.ai.ml.entities._assets import Environment
from ..._schema import NestedField, UnionField
from ..._schema.job.distribution import (
MPIDistributionSchema,
PyTorchDistributionSchema,
RayDistributionSchema,
TensorFlowDistributionSchema,
)
from .distribution import (
DistributionConfiguration,
MpiDistribution,
PyTorchDistribution,
RayDistribution,
TensorFlowDistribution,
)
from .job_resource_configuration import JobResourceConfiguration
from .queue_settings import QueueSettings
module_logger = logging.getLogger(__name__)
# no reference found. leave it for future use.
INPUT_BINDING_PREFIX = "AZURE_ML_INPUT_"
OLD_INPUT_BINDING_PREFIX = "AZURE_ML_INPUT"
[docs]
class ParameterizedCommand:
"""Command component version that contains the command and supporting parameters for a Command component
or job.
This class should not be instantiated directly. Instead, use the child class
~azure.ai.ml.entities.CommandComponent.
:param command: The command to be executed. Defaults to "".
:type command: str
:param resources: The compute resource configuration for the command.
:type resources: Optional[Union[dict, ~azure.ai.ml.entities.JobResourceConfiguration]]
:param code: The source code to run the job. Can be a local path or "http:", "https:", or "azureml:" url pointing
to a remote location.
:type code: Optional[str]
:param environment_variables: A dictionary of environment variable names and values.
These environment variables are set on the process where user script is being executed.
:type environment_variables: Optional[dict[str, str]]
:param distribution: The distribution configuration for distributed jobs.
:type distribution: Optional[Union[dict, ~azure.ai.ml.PyTorchDistribution, ~azure.ai.ml.MpiDistribution,
~azure.ai.ml.TensorFlowDistribution, ~azure.ai.ml.RayDistribution]]
:param environment: The environment that the job will run in.
:type environment: Optional[Union[str, ~azure.ai.ml.entities.Environment]]
:param queue_settings: The queue settings for the job.
:type queue_settings: Optional[~azure.ai.ml.entities.QueueSettings]
:keyword kwargs: A dictionary of additional configuration parameters.
:paramtype kwargs: dict
"""
def __init__(
self,
command: Optional[str] = "",
resources: Optional[Union[dict, JobResourceConfiguration]] = None,
code: Optional[Union[str, os.PathLike]] = None,
environment_variables: Optional[Dict] = None,
distribution: Optional[
Union[
Dict,
MpiDistribution,
TensorFlowDistribution,
PyTorchDistribution,
RayDistribution,
DistributionConfiguration,
]
] = None,
environment: Optional[Union[Environment, str]] = None,
queue_settings: Optional[QueueSettings] = None,
**kwargs: Dict,
) -> None:
super().__init__(**kwargs)
self.command = command
self.code = code
self.environment_variables = dict(environment_variables) if environment_variables else {}
self.environment = environment
self.distribution = distribution
self.resources = resources # type: ignore[assignment]
self.queue_settings = queue_settings
@property
def distribution(
self,
) -> Optional[
Union[
dict,
MpiDistribution,
TensorFlowDistribution,
PyTorchDistribution,
RayDistribution,
DistributionConfiguration,
]
]:
"""The configuration for the distributed command component or job.
:return: The distribution configuration.
:rtype: Union[~azure.ai.ml.PyTorchDistribution, ~azure.ai.ml.MpiDistribution,
~azure.ai.ml.TensorFlowDistribution, ~azure.ai.ml.RayDistribution]
"""
return self._distribution
@distribution.setter
def distribution(self, value: Union[dict, PyTorchDistribution, MpiDistribution]) -> None:
"""Sets the configuration for the distributed command component or job.
:param value: The distribution configuration for distributed jobs.
:type value: Union[dict, ~azure.ai.ml.PyTorchDistribution, ~azure.ai.ml.MpiDistribution,
~azure.ai.ml.TensorFlowDistribution, ~azure.ai.ml.RayDistribution]
"""
if isinstance(value, dict):
dist_schema = UnionField(
[
NestedField(PyTorchDistributionSchema, unknown=INCLUDE),
NestedField(TensorFlowDistributionSchema, unknown=INCLUDE),
NestedField(MPIDistributionSchema, unknown=INCLUDE),
ExperimentalField(NestedField(RayDistributionSchema, unknown=INCLUDE)),
]
)
value = dist_schema._deserialize(value=value, attr=None, data=None)
self._distribution = value
@property
def resources(self) -> JobResourceConfiguration:
"""The compute resource configuration for the command component or job.
:return: The compute resource configuration for the command component or job.
:rtype: ~azure.ai.ml.entities.JobResourceConfiguration
"""
return self._resources
@resources.setter
def resources(self, value: Union[dict, JobResourceConfiguration]) -> None:
"""Sets the compute resource configuration for the command component or job.
:param value: The compute resource configuration for the command component or job.
:type value: Union[dict, ~azure.ai.ml.entities.JobResourceConfiguration]
"""
if isinstance(value, dict):
value = JobResourceConfiguration(**value)
self._resources = value
@classmethod
def _load_from_sweep_job(cls, sweep_job: SweepJob) -> "ParameterizedCommand":
parameterized_command = cls(
command=sweep_job.trial.command,
code=sweep_job.trial.code_id,
environment_variables=sweep_job.trial.environment_variables,
environment=sweep_job.trial.environment_id,
distribution=DistributionConfiguration._from_rest_object(sweep_job.trial.distribution),
resources=JobResourceConfiguration._from_rest_object(sweep_job.trial.resources),
queue_settings=QueueSettings._from_rest_object(sweep_job.queue_settings),
)
return parameterized_command