azure.ai.ml.parallel package

class azure.ai.ml.parallel.ParallelJob(*, inputs: Dict[str, Input | str | bool | int | float] | None = None, outputs: Dict[str, Output] | None = None, identity: Dict | ManagedIdentityConfiguration | AmlTokenConfiguration | UserIdentityConfiguration | None = None, **kwargs: Any)[source]

Parallel job.

Parameters:
  • name (str) – Name of the job.

  • version (str) – Version of the job.

  • id (str) – Global id of the resource, Azure Resource Manager ID.

  • type (str) – Type of the job, supported is ‘parallel’.

  • description (str) – Description of the job.

  • tags (dict) – Internal use only.

  • properties (dict) – Internal use only.

  • display_name (str) – Display name of the job.

  • retry_settings (BatchRetrySettings) – parallel job run failed retry

  • logging_level (str) – A string of the logging level name

  • max_concurrency_per_instance (int) – The max parallellism that each compute instance has.

  • error_threshold (int) – The number of item processing failures should be ignored.

  • mini_batch_error_threshold (int) – The number of mini batch processing failures should be ignored.

  • task (ParallelTask) – The parallel task.

  • mini_batch_size (str) – The mini batch size.

  • partition_keys (list) – The partition keys.

  • input_data (str) – The input data.

  • inputs (dict) – Inputs of the job.

  • outputs (dict) – Outputs of the job.

Keyword Arguments:

identity (Optional[Union[ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration]]) – The identity that the job will use while running on compute.

dump(dest: str | PathLike | IO, **kwargs: Any) None

Dumps the job content into a file in YAML format.

Parameters:

dest (Union[PathLike, str, IO[AnyStr]]) – The local path or file stream to write the YAML content to. If dest is a file path, a new file will be created. If dest is an open file, the file will be written to directly.

Raises:
  • FileExistsError – Raised if dest is a file path and the file already exists.

  • IOError – Raised if dest is an open file and the file is not writable.

property base_path: str

The base path of the resource.

Returns:

The base path of the resource.

Return type:

str

property creation_context: SystemData | None

The creation context of the resource.

Returns:

The creation metadata for the resource.

Return type:

Optional[SystemData]

property id: str | None

The resource ID.

Returns:

The global ID of the resource, an Azure Resource Manager (ARM) ID.

Return type:

Optional[str]

property inputs: Dict[str, Input | str | bool | int | float]
property log_files: Dict[str, str] | None

Job output files.

Returns:

The dictionary of log names and URLs.

Return type:

Optional[Dict[str, str]]

property outputs: Dict[str, Output]
property resources: dict | JobResourceConfiguration | None
property retry_settings: RetrySettings | None
property status: str | None

The status of the job.

Common values returned include “Running”, “Completed”, and “Failed”. All possible values are:

  • NotStarted - This is a temporary state that client-side Run objects are in before cloud submission.

  • Starting - The Run has started being processed in the cloud. The caller has a run ID at this point.

  • Provisioning - On-demand compute is being created for a given job submission.

  • Preparing - The run environment is being prepared and is in one of two stages:
    • Docker image build

    • conda environment setup

  • Queued - The job is queued on the compute target. For example, in BatchAI, the job is in a queued state

    while waiting for all the requested nodes to be ready.

  • Running - The job has started to run on the compute target.

  • Finalizing - User code execution has completed, and the run is in post-processing stages.

  • CancelRequested - Cancellation has been requested for the job.

  • Completed - The run has completed successfully. This includes both the user code execution and run

    post-processing stages.

  • Failed - The run failed. Usually the Error property on a run will provide details as to why.

  • Canceled - Follows a cancellation request and indicates that the run is now successfully cancelled.

  • NotResponding - For runs that have Heartbeats enabled, no heartbeat has been recently sent.

Returns:

Status of the job.

Return type:

Optional[str]

property studio_url: str | None

Azure ML studio endpoint.

Returns:

The URL to the job details page.

Return type:

Optional[str]

property task: ParallelTask | None
property type: str | None

The type of the job.

Returns:

The type of the job.

Return type:

Optional[str]

class azure.ai.ml.parallel.RunFunction(*, code: str | None = None, entry_script: str | None = None, program_arguments: str | None = None, model: str | None = None, append_row_to: str | None = None, environment: Environment | str | None = None, **kwargs: Any)[source]

Run Function.

Parameters:
  • code (str) – A local or remote path pointing at source code.

  • entry_script (str) – User script which will be run in parallel on multiple nodes. This is specified as a local file path. The entry_script should contain two functions: init(): this function should be used for any costly or common preparation for subsequent inferences, e.g., deserializing and loading the model into a global object. run(mini_batch): The method to be parallelized. Each invocation will have one mini-batch. ‘mini_batch’: Batch inference will invoke run method and pass either a list or a Pandas DataFrame as an argument to the method. Each entry in min_batch will be a filepath if input is a FileDataset, a Pandas DataFrame if input is a TabularDataset. run() method should return a Pandas DataFrame or an array. For append_row output_action, these returned elements are appended into the common output file. For summary_only, the contents of the elements are ignored. For all output actions, each returned output element indicates one successful inference of input element in the input mini-batch. Each parallel worker process will call init once and then loop over run function until all mini-batches are processed.

  • program_arguments – The arguments of the parallel task.

  • model (str) – The model of the parallel task.

  • append_row_to (str) – All values output by run() method invocations will be aggregated into one unique file which is created in the output location. if it is not set, ‘summary_only’ would invoked, which means user script is expected to store the output itself.

  • environment (Union[Environment, str]) – Environment that training job will run in.

get(key: Any, default: Any | None = None) Any
has_key(k: Any) bool
items() list
keys() list
update(*args: Any, **kwargs: Any) None
values() list
azure.ai.ml.parallel.parallel_run_function(*, name: str | None = None, description: str | None = None, tags: Dict | None = None, properties: Dict | None = None, display_name: str | None = None, experiment_name: str | None = None, compute: str | None = None, retry_settings: BatchRetrySettings | None = None, environment_variables: Dict | None = None, logging_level: str | None = None, max_concurrency_per_instance: int | None = None, error_threshold: int | None = None, mini_batch_error_threshold: int | None = None, task: RunFunction | None = None, mini_batch_size: str | None = None, partition_keys: List | None = None, input_data: str | None = None, inputs: Dict | None = None, outputs: Dict | None = None, instance_count: int | None = None, instance_type: str | None = None, docker_args: str | None = None, shm_size: str | None = None, identity: ManagedIdentityConfiguration | AmlTokenConfiguration | UserIdentityConfiguration | None = None, is_deterministic: bool = True, **kwargs: Any) Parallel[source]

Create a Parallel object which can be used inside dsl.pipeline as a function and can also be created as a standalone parallel job.

For an example of using ParallelRunStep, see the notebook https://aka.ms/parallel-example-notebook

Note

To use parallel_run_function:

  • Create a azure.ai.ml.entities._builders.Parallel object to specify how parallel run is performed, with parameters to control batch size,number of nodes per compute target, and a reference to your custom Python script.

  • Build pipeline with the parallel object as a function. defines inputs and outputs for the step.

  • Sumbit the pipeline to run.

from azure.ai.ml import Input, Output, parallel

parallel_run = parallel_run_function(
    name="batch_score_with_tabular_input",
    display_name="Batch Score with Tabular Dataset",
    description="parallel component for batch score",
    inputs=dict(
        job_data_path=Input(
            type=AssetTypes.MLTABLE,
            description="The data to be split and scored in parallel",
        ),
        score_model=Input(
            type=AssetTypes.URI_FOLDER, description="The model for batch score."
        ),
    ),
    outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)),
    input_data="${{inputs.job_data_path}}",
    max_concurrency_per_instance=2,  # Optional, default is 1
    mini_batch_size="100",  # optional
    mini_batch_error_threshold=5,  # Optional, allowed failed count on mini batch items, default is -1
    logging_level="DEBUG",  # Optional, default is INFO
    error_threshold=5,  # Optional, allowed failed count totally, default is -1
    retry_settings=dict(max_retries=2, timeout=60),  # Optional
    task=RunFunction(
        code="./src",
        entry_script="tabular_batch_inference.py",
        environment=Environment(
            image="mcr.microsoft.com/azureml/openmpi3.1.2-ubuntu18.04",
            conda_file="./src/environment_parallel.yml",
        ),
        program_arguments="--model ${{inputs.score_model}}",
        append_row_to="${{outputs.job_output_path}}",  # Optional, if not set, summary_only
    ),
)
Keyword Arguments:
  • name (str) – Name of the parallel job or component created.

  • description (str) – A friendly description of the parallel.

  • tags (Dict) – Tags to be attached to this parallel.

  • properties (Dict) – The asset property dictionary.

  • display_name (str) – A friendly name.

  • experiment_name (str) – Name of the experiment the job will be created under, if None is provided, default will be set to current directory name. Will be ignored as a pipeline step.

  • compute (str) – The name of the compute where the parallel job is executed (will not be used if the parallel is used as a component/function).

  • retry_settings (BatchRetrySettings) – Parallel component run failed retry

  • environment_variables (Dict[str, str]) – A dictionary of environment variables names and values. These environment variables are set on the process where user script is being executed.

  • logging_level (str) – A string of the logging level name, which is defined in ‘logging’. Possible values are ‘WARNING’, ‘INFO’, and ‘DEBUG’. (optional, default value is ‘INFO’.) This value could be set through PipelineParameter.

  • max_concurrency_per_instance (int) – The max parallellism that each compute instance has.

  • error_threshold (int) – The number of record failures for Tabular Dataset and file failures for File Dataset that should be ignored during processing. If the error count goes above this value, then the job will be aborted. Error threshold is for the entire input rather than the individual mini-batch sent to run() method. The range is [-1, int.max]. -1 indicates ignore all failures during processing

  • mini_batch_error_threshold (int) – The number of mini batch processing failures should be ignored

  • task (RunFunction) – The parallel task

  • mini_batch_size (str) – For FileDataset input, this field is the number of files a user script can process in one run() call. For TabularDataset input, this field is the approximate size of data the user script can process in one run() call. Example values are 1024, 1024KB, 10MB, and 1GB. (optional, default value is 10 files for FileDataset and 1MB for TabularDataset.) This value could be set through PipelineParameter.

  • partition_keys (List) – The keys used to partition dataset into mini-batches. If specified, the data with the same key will be partitioned into the same mini-batch. If both partition_keys and mini_batch_size are specified, the partition keys will take effect. The input(s) must be partitioned dataset(s), and the partition_keys must be a subset of the keys of every input dataset for this to work

  • input_data (str) – The input data.

  • inputs (Dict) – A dict of inputs used by this parallel.

  • outputs (Dict) – The outputs of this parallel

  • instance_count (int) – Optional number of instances or nodes used by the compute target. Defaults to 1

  • instance_type (str) – Optional type of VM used as supported by the compute target..

  • docker_args (str) – Extra arguments to pass to the Docker run command. This would override any parameters that have already been set by the system, or in this section. This parameter is only supported for Azure ML compute types.

  • shm_size (str) – Size of the docker container’s shared memory block. This should be in the format of (number)(unit) where number as to be greater than 0 and the unit can be one of b(bytes), k(kilobytes), m(megabytes), or g(gigabytes).

  • identity (Optional[Union[ ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration]]) – Identity that PRS job will use while running on compute.

  • is_deterministic (bool) – Specify whether the parallel will return same output given same input. If a parallel (component) is deterministic, when use it as a node/step in a pipeline, it will reuse results from a previous submitted job in current workspace which has same inputs and settings. In this case, this step will not use any compute resource. Defaults to True, specify is_deterministic=False if you would like to avoid such reuse behavior, defaults to True.

Returns:

The parallel node

Return type:

Parallel