Source code for azure.ai.ml.entities._builders.command_func

# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

# pylint: disable=protected-access

import os
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

from azure.ai.ml.constants._common import AssetTypes, LegacyAssetTypes
from azure.ai.ml.constants._component import ComponentSource
from azure.ai.ml.entities._assets.environment import Environment
from azure.ai.ml.entities._component.command_component import CommandComponent
from azure.ai.ml.entities._credentials import (
    AmlTokenConfiguration,
    ManagedIdentityConfiguration,
    UserIdentityConfiguration,
)
from azure.ai.ml.entities._inputs_outputs import Input, Output
from azure.ai.ml.entities._job.distribution import (
    DistributionConfiguration,
    MpiDistribution,
    PyTorchDistribution,
    RayDistribution,
    TensorFlowDistribution,
)
from azure.ai.ml.entities._job.job_service import (
    JobService,
    JupyterLabJobService,
    SshJobService,
    TensorBoardJobService,
    VsCodeJobService,
)
from azure.ai.ml.entities._job.pipeline._component_translatable import ComponentTranslatableMixin
from azure.ai.ml.entities._job.sweep.search_space import SweepDistribution
from azure.ai.ml.exceptions import ErrorTarget, ValidationErrorType, ValidationException

from .command import Command

SUPPORTED_INPUTS = [
    LegacyAssetTypes.PATH,
    AssetTypes.URI_FILE,
    AssetTypes.URI_FOLDER,
    AssetTypes.CUSTOM_MODEL,
    AssetTypes.MLFLOW_MODEL,
    AssetTypes.MLTABLE,
    AssetTypes.TRITON_MODEL,
]


def _parse_input(input_value: Union[Input, Dict, SweepDistribution, str, bool, int, float]) -> Tuple:
    component_input = None
    job_input: Optional[Union[Input, Dict, SweepDistribution, str, bool, int, float]] = None

    if isinstance(input_value, Input):
        component_input = Input(**input_value._to_dict())
        input_type = input_value.type
        if input_type in SUPPORTED_INPUTS:
            job_input = Input(**input_value._to_dict())
    elif isinstance(input_value, dict):
        # if user provided dict, we try to parse it to Input.
        # for job input, only parse for path type
        input_type = input_value.get("type", None)
        if input_type in SUPPORTED_INPUTS:
            job_input = Input(**input_value)
        component_input = Input(**input_value)
    elif isinstance(input_value, (SweepDistribution, str, bool, int, float)):
        # Input bindings are not supported
        component_input = ComponentTranslatableMixin._to_input_builder_function(input_value)
        job_input = input_value
    else:
        msg = f"Unsupported input type: {type(input_value)}"
        msg += ", only Input, dict, str, bool, int and float are supported."
        raise ValidationException(
            message=msg,
            no_personal_data_message=msg,
            target=ErrorTarget.JOB,
            error_type=ValidationErrorType.INVALID_VALUE,
        )
    return component_input, job_input


def _parse_output(output_value: Optional[Union[Output, Dict, str]]) -> Tuple:
    component_output = None
    job_output: Optional[Union[Output, Dict, str]] = None

    if isinstance(output_value, Output):
        component_output = Output(**output_value._to_dict())
        job_output = Output(**output_value._to_dict())
    elif not output_value:
        # output value can be None or empty dictionary
        # None output value will be packed into a JobOutput object with mode = ReadWriteMount & type = UriFolder
        component_output = ComponentTranslatableMixin._to_output(output_value)
        job_output = output_value
    elif isinstance(output_value, dict):  # When output value is a non-empty dictionary
        job_output = Output(**output_value)
        component_output = Output(**output_value)
    elif isinstance(output_value, str):  # When output is passed in from pipeline job yaml
        job_output = output_value
    else:
        msg = f"Unsupported output type: {type(output_value)}, only Output and dict are supported."
        raise ValidationException(
            message=msg,
            no_personal_data_message=msg,
            target=ErrorTarget.JOB,
            error_type=ValidationErrorType.INVALID_VALUE,
        )
    return component_output, job_output


def _parse_inputs_outputs(io_dict: Dict, parse_func: Callable) -> Tuple[Dict, Dict]:
    component_io_dict, job_io_dict = {}, {}
    if io_dict:
        for key, val in io_dict.items():
            component_io, job_io = parse_func(val)
            component_io_dict[key] = component_io
            job_io_dict[key] = job_io
    return component_io_dict, job_io_dict


[docs] def command( *, name: Optional[str] = None, description: Optional[str] = None, tags: Optional[Dict] = None, properties: Optional[Dict] = None, display_name: Optional[str] = None, command: Optional[str] = None, # pylint: disable=redefined-outer-name experiment_name: Optional[str] = None, environment: Optional[Union[str, Environment]] = None, environment_variables: Optional[Dict] = None, distribution: Optional[ Union[ Dict, MpiDistribution, TensorFlowDistribution, PyTorchDistribution, RayDistribution, DistributionConfiguration, ] ] = None, compute: Optional[str] = None, inputs: Optional[Dict] = None, outputs: Optional[Dict] = None, instance_count: Optional[int] = None, instance_type: Optional[str] = None, locations: Optional[List[str]] = None, docker_args: Optional[Union[str, List[str]]] = None, shm_size: Optional[str] = None, timeout: Optional[int] = None, code: Optional[Union[str, os.PathLike]] = None, identity: Optional[Union[ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration]] = None, is_deterministic: bool = True, services: Optional[ Dict[str, Union[JobService, JupyterLabJobService, SshJobService, TensorBoardJobService, VsCodeJobService]] ] = None, job_tier: Optional[str] = None, priority: Optional[str] = None, parent_job_name: Optional[str] = None, **kwargs: Any, ) -> Command: """Creates a Command object which can be used inside a dsl.pipeline function or used as a standalone Command job. :keyword name: The name of the Command job or component. :paramtype name: Optional[str] :keyword description: The description of the Command. Defaults to None. :paramtype description: Optional[str] :keyword tags: Tag dictionary. Tags can be added, removed, and updated. Defaults to None. :paramtype tags: Optional[dict[str, str]] :keyword properties: The job property dictionary. Defaults to None. :paramtype properties: Optional[dict[str, str]] :keyword display_name: The display name of the job. Defaults to a randomly generated name. :paramtype display_name: Optional[str] :keyword command: The command to be executed. Defaults to None. :paramtype command: Optional[str] :keyword experiment_name: The name of the experiment that the job will be created under. Defaults to current directory name. :paramtype experiment_name: Optional[str] :keyword environment: The environment that the job will run in. :paramtype environment: Optional[Union[str, ~azure.ai.ml.entities.Environment]] :keyword environment_variables: A dictionary of environment variable names and values. These environment variables are set on the process where user script is being executed. Defaults to None. :paramtype environment_variables: Optional[dict[str, str]] :keyword distribution: The configuration for distributed jobs. Defaults to None. :paramtype distribution: Optional[Union[dict, ~azure.ai.ml.PyTorchDistribution, ~azure.ai.ml.MpiDistribution, ~azure.ai.ml.TensorFlowDistribution, ~azure.ai.ml.RayDistribution]] :keyword compute: The compute target the job will run on. Defaults to default compute. :paramtype compute: Optional[str] :keyword inputs: A mapping of input names to input data sources used in the job. Defaults to None. :paramtype inputs: Optional[dict[str, Union[~azure.ai.ml.Input, str, bool, int, float, Enum]]] :keyword outputs: A mapping of output names to output data sources used in the job. Defaults to None. :paramtype outputs: Optional[dict[str, Union[str, ~azure.ai.ml.Output]]] :keyword instance_count: The number of instances or nodes to be used by the compute target. Defaults to 1. :paramtype instance_count: Optional[int] :keyword instance_type: The type of VM to be used by the compute target. :paramtype instance_type: Optional[str] :keyword locations: The list of locations where the job will run. :paramtype locations: Optional[List[str]] :keyword docker_args: Extra arguments to pass to the Docker run command. This would override any parameters that have already been set by the system, or in this section. This parameter is only supported for Azure ML compute types. Defaults to None. :paramtype docker_args: Optional[Union[str,List[str]]] :keyword shm_size: The size of the Docker container's shared memory block. This should be in the format of (number)(unit) where the number has to be greater than 0 and the unit can be one of b(bytes), k(kilobytes), m(megabytes), or g(gigabytes). :paramtype shm_size: Optional[str] :keyword timeout: The number, in seconds, after which the job will be cancelled. :paramtype timeout: Optional[int] :keyword code: The source code to run the job. Can be a local path or "http:", "https:", or "azureml:" url pointing to a remote location. :paramtype code: Optional[Union[str, os.PathLike]] :keyword identity: The identity that the command job will use while running on compute. :paramtype identity: Optional[Union[ ~azure.ai.ml.entities.ManagedIdentityConfiguration, ~azure.ai.ml.entities.AmlTokenConfiguration, ~azure.ai.ml.entities.UserIdentityConfiguration]] :keyword is_deterministic: Specifies whether the Command will return the same output given the same input. Defaults to True. When True, if a Command Component is deterministic and has been run before in the current workspace with the same input and settings, it will reuse results from a previously submitted job when used as a node or step in a pipeline. In that scenario, no compute resources will be used. :paramtype is_deterministic: bool :keyword services: The interactive services for the node. Defaults to None. This is an experimental parameter, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information. :paramtype services: Optional[dict[str, Union[~azure.ai.ml.entities.JobService, ~azure.ai.ml.entities.JupyterLabJobService, ~azure.ai.ml.entities.SshJobService, ~azure.ai.ml.entities.TensorBoardJobService, ~azure.ai.ml.entities.VsCodeJobService]]] :keyword job_tier: The job tier. Accepted values are "Spot", "Basic", "Standard", or "Premium". :paramtype job_tier: Optional[str] :keyword priority: The priority of the job on the compute. Accepted values are "low", "medium", and "high". Defaults to "medium". :paramtype priority: Optional[str] :keyword parent_job_name: parent job id for command job :paramtype parent_job_name: Optional[str] :return: A Command object. :rtype: ~azure.ai.ml.entities.Command .. admonition:: Example: .. literalinclude:: ../samples/ml_samples_command_configurations.py :start-after: [START command_function] :end-before: [END command_function] :language: python :dedent: 8 :caption: Creating a Command Job using the command() builder method. """ # pylint: disable=too-many-locals inputs = inputs or {} outputs = outputs or {} component_inputs, job_inputs = _parse_inputs_outputs(inputs, parse_func=_parse_input) # job inputs can not be None job_inputs = {k: v for k, v in job_inputs.items() if v is not None} component_outputs, job_outputs = _parse_inputs_outputs(outputs, parse_func=_parse_output) component = kwargs.pop("component", None) if component is None: component = CommandComponent( name=name, tags=tags, code=code, command=command, environment=environment, display_name=display_name, description=description, inputs=component_inputs, outputs=component_outputs, distribution=distribution, environment_variables=environment_variables, _source=ComponentSource.BUILDER, is_deterministic=is_deterministic, **kwargs, ) command_obj = Command( component=component, name=name, description=description, tags=tags, properties=properties, display_name=display_name, experiment_name=experiment_name, compute=compute, inputs=job_inputs, outputs=job_outputs, identity=identity, distribution=distribution, environment=environment, environment_variables=environment_variables, services=services, parent_job_name=parent_job_name, **kwargs, ) if ( locations is not None or instance_count is not None or instance_type is not None or docker_args is not None or shm_size is not None ): command_obj.set_resources( locations=locations, instance_count=instance_count, instance_type=instance_type, docker_args=docker_args, shm_size=shm_size, ) if timeout is not None: command_obj.set_limits(timeout=timeout) if job_tier is not None or priority is not None: command_obj.set_queue_settings(job_tier=job_tier, priority=priority) return command_obj