# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
# pylint: disable=protected-access
import itertools
import logging
import typing
from functools import partial
from pathlib import Path
from typing import Any, Dict, Generator, List, Optional, Union, cast
from typing_extensions import Literal
from azure.ai.ml._restclient.v2024_01_01_preview.models import JobBase
from azure.ai.ml._restclient.v2024_01_01_preview.models import PipelineJob as RestPipelineJob
from azure.ai.ml._schema import PathAwareSchema
from azure.ai.ml._schema.pipeline.pipeline_job import PipelineJobSchema
from azure.ai.ml._utils._arm_id_utils import get_resource_name_from_arm_id_safe
from azure.ai.ml._utils.utils import (
camel_to_snake,
is_data_binding_expression,
is_private_preview_enabled,
transform_dict_keys,
)
from azure.ai.ml.constants import JobType
from azure.ai.ml.constants._common import AZUREML_PRIVATE_FEATURES_ENV_VAR, BASE_PATH_CONTEXT_KEY
from azure.ai.ml.constants._component import ComponentSource
from azure.ai.ml.constants._job.pipeline import ValidationErrorCode
from azure.ai.ml.entities._builders import BaseNode
from azure.ai.ml.entities._builders.condition_node import ConditionNode
from azure.ai.ml.entities._builders.control_flow_node import LoopNode
from azure.ai.ml.entities._builders.import_node import Import
from azure.ai.ml.entities._builders.parallel import Parallel
from azure.ai.ml.entities._builders.pipeline import Pipeline
from azure.ai.ml.entities._component.component import Component
from azure.ai.ml.entities._component.pipeline_component import PipelineComponent
# from azure.ai.ml.entities._job.identity import AmlToken, Identity, ManagedIdentity, UserIdentity
from azure.ai.ml.entities._credentials import (
AmlTokenConfiguration,
ManagedIdentityConfiguration,
UserIdentityConfiguration,
_BaseJobIdentityConfiguration,
)
from azure.ai.ml.entities._inputs_outputs import Input, Output
from azure.ai.ml.entities._inputs_outputs.group_input import GroupInput
from azure.ai.ml.entities._job._input_output_helpers import (
from_rest_data_outputs,
from_rest_inputs_to_dataset_literal,
to_rest_data_outputs,
to_rest_dataset_literal_inputs,
)
from azure.ai.ml.entities._job.import_job import ImportJob
from azure.ai.ml.entities._job.job import Job
from azure.ai.ml.entities._job.job_service import JobServiceBase
from azure.ai.ml.entities._job.pipeline._io import PipelineInput, PipelineJobIOMixin
from azure.ai.ml.entities._job.pipeline.pipeline_job_settings import PipelineJobSettings
from azure.ai.ml.entities._mixins import YamlTranslatableMixin
from azure.ai.ml.entities._system_data import SystemData
from azure.ai.ml.entities._validation import MutableValidationResult, PathAwareSchemaValidatableMixin
from azure.ai.ml.exceptions import ErrorTarget, UserErrorException, ValidationException
module_logger = logging.getLogger(__name__)
[docs]
class PipelineJob(Job, YamlTranslatableMixin, PipelineJobIOMixin, PathAwareSchemaValidatableMixin):
"""Pipeline job.
You should not instantiate this class directly. Instead, you should
use the `@pipeline` decorator to create a `PipelineJob`.
:param component: Pipeline component version. The field is mutually exclusive with 'jobs'.
:type component: Union[str, ~azure.ai.ml.entities._component.pipeline_component.PipelineComponent]
:param inputs: Inputs to the pipeline job.
:type inputs: dict[str, Union[~azure.ai.ml.entities.Input, str, bool, int, float]]
:param outputs: Outputs of the pipeline job.
:type outputs: dict[str, ~azure.ai.ml.entities.Output]
:param name: Name of the PipelineJob. Defaults to None.
:type name: str
:param description: Description of the pipeline job. Defaults to None
:type description: str
:param display_name: Display name of the pipeline job. Defaults to None
:type display_name: str
:param experiment_name: Name of the experiment the job will be created under.
If None is provided, the experiment will be set to the current directory. Defaults to None
:type experiment_name: str
:param jobs: Pipeline component node name to component object. Defaults to None
:type jobs: dict[str, ~azure.ai.ml.entities._builders.BaseNode]
:param settings: Setting of the pipeline job. Defaults to None
:type settings: ~azure.ai.ml.entities.PipelineJobSettings
:param identity: Identity that the training job will use while running on compute. Defaults to None
:type identity: Union[
~azure.ai.ml.entities._credentials.ManagedIdentityConfiguration,
~azure.ai.ml.entities._credentials.AmlTokenConfiguration,
~azure.ai.ml.entities._credentials.UserIdentityConfiguration
]
:param compute: Compute target name of the built pipeline. Defaults to None
:type compute: str
:param tags: Tag dictionary. Tags can be added, removed, and updated. Defaults to None
:type tags: dict[str, str]
:param kwargs: A dictionary of additional configuration parameters. Defaults to None
:type kwargs: dict
.. admonition:: Example:
.. literalinclude:: ../samples/ml_samples_pipeline_job_configurations.py
:start-after: [START configure_pipeline_job_and_settings]
:end-before: [END configure_pipeline_job_and_settings]
:language: python
:dedent: 8
:caption: Shows how to create a pipeline using this class.
"""
def __init__(
self,
*,
component: Optional[Union[str, PipelineComponent, Component]] = None,
inputs: Optional[Dict[str, Union[Input, str, bool, int, float]]] = None,
outputs: Optional[Dict[str, Output]] = None,
name: Optional[str] = None,
description: Optional[str] = None,
display_name: Optional[str] = None,
experiment_name: Optional[str] = None,
jobs: Optional[Dict[str, BaseNode]] = None,
settings: Optional[PipelineJobSettings] = None,
identity: Optional[
Union[ManagedIdentityConfiguration, AmlTokenConfiguration, UserIdentityConfiguration]
] = None,
compute: Optional[str] = None,
tags: Optional[Dict[str, str]] = None,
**kwargs: Any,
) -> None:
# initialize io
inputs, outputs = inputs or {}, outputs or {}
if isinstance(component, PipelineComponent) and component._source in [
ComponentSource.DSL,
ComponentSource.YAML_COMPONENT,
]:
self._inputs = self._build_inputs_dict(inputs, input_definition_dict=component.inputs)
# for pipeline component created pipeline jobs,
# it's output should have same value with the component outputs,
# then override it with given outputs (filter out None value)
pipeline_outputs = {k: v for k, v in (outputs or {}).items() if v}
self._outputs = self._build_pipeline_outputs_dict({**component.outputs, **pipeline_outputs})
else:
# Build inputs/outputs dict without meta when definition not available
self._inputs = self._build_inputs_dict(inputs)
# for node created pipeline jobs,
# it's output should have same value with the given outputs
self._outputs = self._build_pipeline_outputs_dict(outputs=outputs)
source = kwargs.pop("_source", ComponentSource.CLASS)
if component is None:
component = PipelineComponent(
jobs=jobs,
description=description,
display_name=display_name,
base_path=kwargs.get(BASE_PATH_CONTEXT_KEY),
_source=source,
)
# If component is Pipeline component, jobs will be component.jobs
self._jobs = (jobs or {}) if isinstance(component, str) else {}
self.component: Union[PipelineComponent, str] = cast(Union[PipelineComponent, str], component)
if "type" not in kwargs:
kwargs["type"] = JobType.PIPELINE
if isinstance(component, PipelineComponent):
description = component.description if description is None else description
display_name = component.display_name if display_name is None else display_name
super(PipelineJob, self).__init__(
name=name,
description=description,
tags=tags,
display_name=display_name,
experiment_name=experiment_name,
compute=compute,
**kwargs,
)
self._remove_pipeline_input()
self.compute = compute
self._settings: Any = None
self.settings = settings
self.identity = identity
# TODO: remove default code & environment?
self._default_code = None
self._default_environment = None
@property
def inputs(self) -> Dict:
"""Inputs of the pipeline job.
:return: Inputs of the pipeline job.
:rtype: dict[str, Union[~azure.ai.ml.entities.Input, str, bool, int, float]]
"""
return self._inputs
@property
def outputs(self) -> Dict[str, Union[str, Output]]:
"""Outputs of the pipeline job.
:return: Outputs of the pipeline job.
:rtype: dict[str, Union[str, ~azure.ai.ml.entities.Output]]
"""
return self._outputs
@property
def jobs(self) -> Dict:
"""Return jobs of pipeline job.
:return: Jobs of pipeline job.
:rtype: dict
"""
res: dict = self.component.jobs if isinstance(self.component, PipelineComponent) else self._jobs
return res
@property
def settings(self) -> Optional[PipelineJobSettings]:
"""Settings of the pipeline job.
:return: Settings of the pipeline job.
:rtype: ~azure.ai.ml.entities.PipelineJobSettings
"""
if self._settings is None:
self._settings = PipelineJobSettings()
res: Optional[PipelineJobSettings] = self._settings
return res
@settings.setter
def settings(self, value: Union[Dict, PipelineJobSettings]) -> None:
"""Set the pipeline job settings.
:param value: The pipeline job settings.
:type value: Union[dict, ~azure.ai.ml.entities.PipelineJobSettings]
"""
if value is not None:
if isinstance(value, PipelineJobSettings):
# since PipelineJobSettings inherit _AttrDict, we need add this branch to distinguish with dict
pass
elif isinstance(value, dict):
value = PipelineJobSettings(**value)
else:
raise TypeError("settings must be PipelineJobSettings or dict but got {}".format(type(value)))
self._settings = value
@classmethod
def _create_validation_error(cls, message: str, no_personal_data_message: str) -> ValidationException:
return ValidationException(
message=message,
no_personal_data_message=no_personal_data_message,
target=ErrorTarget.PIPELINE,
)
@classmethod
def _create_schema_for_validation(cls, context: Any) -> PathAwareSchema:
# import this to ensure that nodes are registered before schema is created.
return PipelineJobSchema(context=context)
@classmethod
def _get_skip_fields_in_schema_validation(cls) -> typing.List[str]:
# jobs validations are done in _customized_validate()
return ["component", "jobs"]
@property
def _skip_required_compute_missing_validation(self) -> Literal[True]:
return True
def _validate_compute_is_set(self) -> MutableValidationResult:
validation_result = self._create_empty_validation_result()
if self.compute is not None:
return validation_result
if self.settings is not None and self.settings.default_compute is not None:
return validation_result
if not isinstance(self.component, str):
validation_result.merge_with(self.component._validate_compute_is_set())
return validation_result
def _customized_validate(self) -> MutableValidationResult:
"""Validate that all provided inputs and parameters are valid for current pipeline and components in it.
:return: The validation result
:rtype: MutableValidationResult
"""
validation_result = super(PipelineJob, self)._customized_validate()
if isinstance(self.component, PipelineComponent):
# Merge with pipeline component validate result for structure validation.
# Skip top level parameter missing type error
validation_result.merge_with(
self.component._customized_validate(),
condition_skip=lambda x: x.error_code == ValidationErrorCode.PARAMETER_TYPE_UNKNOWN
and x.yaml_path.startswith("inputs"),
)
# Validate compute
validation_result.merge_with(self._validate_compute_is_set())
# Validate Input
validation_result.merge_with(self._validate_input())
# Validate initialization & finalization jobs
validation_result.merge_with(self._validate_init_finalize_job())
return validation_result
def _validate_input(self) -> MutableValidationResult:
validation_result = self._create_empty_validation_result()
if not isinstance(self.component, str):
# TODO(1979547): refine this logic: not all nodes have `_get_input_binding_dict` method
used_pipeline_inputs = set(
itertools.chain(
*[
self.component._get_input_binding_dict(node if not isinstance(node, LoopNode) else node.body)[0]
for node in self.jobs.values()
if not isinstance(node, ConditionNode)
# condition node has no inputs
]
)
)
# validate inputs
if not isinstance(self.component, Component):
return validation_result
for key, meta in self.component.inputs.items():
if key not in used_pipeline_inputs: # pylint: disable=possibly-used-before-assignment
# Only validate inputs certainly used.
continue
# raise error when required input with no default value not set
if (
self.inputs.get(key, None) is None # input not provided
and meta.optional is not True # and it's required
and meta.default is None # and it does not have default
):
name = self.name or self.display_name
name = f"{name!r} " if name else ""
validation_result.append_error(
yaml_path=f"inputs.{key}",
message=f"Required input {key!r} for pipeline {name}not provided.",
)
return validation_result
def _validate_init_finalize_job(self) -> MutableValidationResult: # pylint: disable=too-many-statements
from azure.ai.ml.entities._job.pipeline._io import InputOutputBase, _GroupAttrDict
validation_result = self._create_empty_validation_result()
# subgraph (PipelineComponent) should not have on_init/on_finalize set
for job_name, job in self.jobs.items():
if job.type != "pipeline":
continue
if job.settings.on_init:
validation_result.append_error(
yaml_path=f"jobs.{job_name}.settings.on_init",
message="On_init is not supported for pipeline component.",
)
if job.settings.on_finalize:
validation_result.append_error(
yaml_path=f"jobs.{job_name}.settings.on_finalize",
message="On_finalize is not supported for pipeline component.",
)
on_init = None
on_finalize = None
if self.settings is not None:
# quick return if neither on_init nor on_finalize is set
if self.settings.on_init is None and self.settings.on_finalize is None:
return validation_result
on_init, on_finalize = self.settings.on_init, self.settings.on_finalize
append_on_init_error = partial(validation_result.append_error, "settings.on_init")
append_on_finalize_error = partial(validation_result.append_error, "settings.on_finalize")
# on_init and on_finalize cannot be same
if on_init == on_finalize:
append_on_init_error(f"Invalid on_init job {on_init}, it should be different from on_finalize.")
append_on_finalize_error(f"Invalid on_finalize job {on_finalize}, it should be different from on_init.")
# pipeline should have at least one normal node
if len(set(self.jobs.keys()) - {on_init, on_finalize}) == 0:
validation_result.append_error(yaml_path="jobs", message="No other job except for on_init/on_finalize job.")
def _is_control_flow_node(_validate_job_name: str) -> bool:
from azure.ai.ml.entities._builders.control_flow_node import ControlFlowNode
_validate_job = self.jobs[_validate_job_name]
return issubclass(type(_validate_job), ControlFlowNode)
def _is_isolated_job(_validate_job_name: str) -> bool:
def _try_get_data_bindings(
_name: str, _input_output_data: Union["_GroupAttrDict", "InputOutputBase"]
) -> Optional[List]:
"""Try to get data bindings from input/output data, return None if not found.
:param _name: The name to use when flattening GroupAttrDict
:type _name: str
:param _input_output_data: The input/output data
:type _input_output_data: Union[_GroupAttrDict, str, InputOutputBase]
:return: A list of data bindings, or None if not found
:rtype: Optional[List[str]]
"""
# handle group input
if GroupInput._is_group_attr_dict(_input_output_data):
_new_input_output_data: _GroupAttrDict = cast(_GroupAttrDict, _input_output_data)
# flatten to avoid nested cases
flattened_values: List[Input] = list(_new_input_output_data.flatten(_name).values())
# handle invalid empty group
if len(flattened_values) == 0:
return None
return [_value.path for _value in flattened_values]
_input_output_data = _input_output_data._data
if isinstance(_input_output_data, str):
return [_input_output_data]
if not hasattr(_input_output_data, "_data_binding"):
return None
return [_input_output_data._data_binding()]
_validate_job = self.jobs[_validate_job_name]
# no input to validate job
for _input_name in _validate_job.inputs:
_data_bindings = _try_get_data_bindings(_input_name, _validate_job.inputs[_input_name])
if _data_bindings is None:
continue
for _data_binding in _data_bindings:
if is_data_binding_expression(_data_binding, ["parent", "jobs"]):
return False
# no output from validate job - iterate other jobs input(s) to validate
for _job_name, _job in self.jobs.items():
# exclude control flow node as it does not have inputs
if _is_control_flow_node(_job_name):
continue
for _input_name in _job.inputs:
_data_bindings = _try_get_data_bindings(_input_name, _job.inputs[_input_name])
if _data_bindings is None:
continue
for _data_binding in _data_bindings:
if is_data_binding_expression(_data_binding, ["parent", "jobs", _validate_job_name]):
return False
return True
# validate on_init
if on_init is not None:
if on_init not in self.jobs:
append_on_init_error(f"On_init job name {on_init} not exists in jobs.")
else:
if _is_control_flow_node(on_init):
append_on_init_error("On_init job should not be a control flow node.")
elif not _is_isolated_job(on_init):
append_on_init_error("On_init job should not have connection to other execution node.")
# validate on_finalize
if on_finalize is not None:
if on_finalize not in self.jobs:
append_on_finalize_error(f"On_finalize job name {on_finalize} not exists in jobs.")
else:
if _is_control_flow_node(on_finalize):
append_on_finalize_error("On_finalize job should not be a control flow node.")
elif not _is_isolated_job(on_finalize):
append_on_finalize_error("On_finalize job should not have connection to other execution node.")
return validation_result
def _remove_pipeline_input(self) -> None:
"""Remove None pipeline input.If not remove, it will pass "None" to backend."""
redundant_pipeline_inputs = []
for pipeline_input_name, pipeline_input in self._inputs.items():
if isinstance(pipeline_input, PipelineInput) and pipeline_input._data is None:
redundant_pipeline_inputs.append(pipeline_input_name)
for redundant_pipeline_input in redundant_pipeline_inputs:
self._inputs.pop(redundant_pipeline_input)
def _check_private_preview_features(self) -> None:
"""Checks is private preview features included in pipeline.
If private preview environment not set, raise exception.
"""
if not is_private_preview_enabled():
error_msg = (
"{} is a private preview feature, "
f"please set environment variable {AZUREML_PRIVATE_FEATURES_ENV_VAR} to true to use it."
)
# check has not supported nodes
for _, node in self.jobs.items():
# TODO: Remove in PuP
if isinstance(node, (ImportJob, Import)):
msg = error_msg.format("Import job in pipeline")
raise UserErrorException(message=msg, no_personal_data_message=msg)
def _to_node(self, context: Optional[Dict] = None, **kwargs: Any) -> "Pipeline":
"""Translate a command job to a pipeline node when load schema.
(Write a pipeline job as node in yaml is not supported presently.)
:param context: Context of command job YAML file.
:type context: dict
:return: Translated command component.
:rtype: Pipeline
"""
component = self._to_component(context, **kwargs)
return Pipeline(
component=component,
compute=self.compute,
# Need to supply the inputs with double curly.
inputs=self.inputs,
outputs=self.outputs,
description=self.description,
tags=self.tags,
display_name=self.display_name,
properties=self.properties,
)
def _to_rest_object(self) -> JobBase:
"""Build current parameterized pipeline instance to a pipeline job object before submission.
:return: Rest pipeline job.
:rtype: JobBase
"""
# Check if there are private preview features in it
self._check_private_preview_features()
# Build the inputs to dict. Handle both value & binding assignment.
# Example: {
# "input_data": {"data": {"path": "path/to/input/data"}, "mode"="Mount"},
# "input_value": 10,
# "learning_rate": "${{jobs.step1.inputs.learning_rate}}"
# }
built_inputs = self._build_inputs()
# Build the outputs to dict
# example: {"eval_output": "${{jobs.eval.outputs.eval_output}}"}
built_outputs = self._build_outputs()
if self.settings is not None:
settings_dict = self.settings._to_dict()
if isinstance(self.component, PipelineComponent):
source = self.component._source
# Build the jobs to dict
rest_component_jobs = self.component._build_rest_component_jobs()
else:
source = ComponentSource.REMOTE_WORKSPACE_JOB
rest_component_jobs = {}
# add _source on pipeline job.settings
if "_source" not in settings_dict: # pylint: disable=possibly-used-before-assignment
settings_dict.update({"_source": source})
# TODO: Revisit this logic when multiple types of component jobs are supported
rest_compute = self.compute
# This will be resolved in job_operations _resolve_arm_id_or_upload_dependencies.
component_id = self.component if isinstance(self.component, str) else self.component.id
# TODO remove it in the future.
# MFE not support pass None or empty input value. Remove the empty inputs in pipeline job.
built_inputs = {k: v for k, v in built_inputs.items() if v is not None and v != ""}
pipeline_job = RestPipelineJob(
compute_id=rest_compute,
component_id=component_id,
display_name=self.display_name,
tags=self.tags,
description=self.description,
properties=self.properties,
experiment_name=self.experiment_name,
jobs=rest_component_jobs,
inputs=to_rest_dataset_literal_inputs(built_inputs, job_type=self.type),
outputs=to_rest_data_outputs(built_outputs),
settings=settings_dict,
services={k: v._to_rest_object() for k, v in self.services.items()} if self.services else None,
identity=self.identity._to_job_rest_object() if self.identity else None,
)
rest_job = JobBase(properties=pipeline_job)
rest_job.name = self.name
return rest_job
@classmethod
def _load_from_rest(cls, obj: JobBase) -> "PipelineJob":
"""Build a pipeline instance from rest pipeline object.
:param obj: The REST Pipeline Object
:type obj: JobBase
:return: pipeline job.
:rtype: PipelineJob
"""
properties: RestPipelineJob = obj.properties
# Workaround for BatchEndpoint as these fields are not filled in
# Unpack the inputs
from_rest_inputs = from_rest_inputs_to_dataset_literal(properties.inputs) or {}
from_rest_outputs = from_rest_data_outputs(properties.outputs) or {}
# Unpack the component jobs
sub_nodes = PipelineComponent._resolve_sub_nodes(properties.jobs) if properties.jobs else {}
# backend may still store Camel settings, eg: DefaultDatastore, translate them to snake when load back
settings_dict = transform_dict_keys(properties.settings, camel_to_snake) if properties.settings else None
settings_sdk = PipelineJobSettings(**settings_dict) if settings_dict else PipelineJobSettings()
# Create component or use component id
if getattr(properties, "component_id", None):
component = properties.component_id
else:
component = PipelineComponent._load_from_rest_pipeline_job(
{
"inputs": from_rest_inputs,
"outputs": from_rest_outputs,
"display_name": properties.display_name,
"description": properties.description,
"jobs": sub_nodes,
}
)
job = PipelineJob(
component=component,
inputs=from_rest_inputs,
outputs=from_rest_outputs,
name=obj.name,
id=obj.id,
jobs=sub_nodes,
display_name=properties.display_name,
tags=properties.tags,
properties=properties.properties,
experiment_name=properties.experiment_name,
status=properties.status,
creation_context=SystemData._from_rest_object(obj.system_data) if obj.system_data else None,
services=JobServiceBase._from_rest_job_services(properties.services) if properties.services else None,
compute=get_resource_name_from_arm_id_safe(properties.compute_id),
settings=settings_sdk,
identity=(
_BaseJobIdentityConfiguration._from_rest_object(properties.identity) if properties.identity else None
),
)
return job
def _to_dict(self) -> Dict:
res: dict = self._dump_for_validation()
return res
@classmethod
def _component_items_from_path(cls, data: Dict) -> Generator:
if "jobs" in data:
for node_name, job_instance in data["jobs"].items():
potential_component_path = job_instance["component"] if "component" in job_instance else None
if isinstance(potential_component_path, str) and potential_component_path.startswith("file:"):
yield node_name, potential_component_path
@classmethod
def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "PipelineJob":
path_first_occurrence: dict = {}
component_first_occurrence = {}
for node_name, component_path in cls._component_items_from_path(data):
if component_path in path_first_occurrence:
component_first_occurrence[node_name] = path_first_occurrence[component_path]
# set components to be replaced here may break the validation logic
else:
path_first_occurrence[component_path] = node_name
# use this instead of azure.ai.ml.entities._util.load_from_dict to avoid parsing
loaded_schema = cls._create_schema_for_validation(context=context).load(data, **kwargs)
# replace repeat component with first occurrence to reduce arm id resolution
# current load yaml file logic is in azure.ai.ml._schema.core.schema.YamlFileSchema.load_from_file
# is it possible to load the same yaml file only once in 1 pipeline loading?
for node_name, first_occurrence in component_first_occurrence.items():
job = loaded_schema["jobs"][node_name]
job._component = loaded_schema["jobs"][first_occurrence].component
# For Parallel job, should also align task attribute which is usually from component.task
if isinstance(job, Parallel):
job.task = job._component.task
# parallel.task.code is based on parallel._component.base_path, so need to update it
job._base_path = job._component.base_path
return PipelineJob(
base_path=context[BASE_PATH_CONTEXT_KEY],
_source=ComponentSource.YAML_JOB,
**loaded_schema,
)
def __str__(self) -> str:
try:
res_to_yaml: str = self._to_yaml()
return res_to_yaml
except BaseException: # pylint: disable=W0718
res: str = super(PipelineJob, self).__str__()
return res
def _get_telemetry_values(self) -> Dict:
telemetry_values: dict = super()._get_telemetry_values()
if isinstance(self.component, PipelineComponent):
telemetry_values.update(self.component._get_telemetry_values())
else:
telemetry_values.update({"source": ComponentSource.REMOTE_WORKSPACE_JOB})
telemetry_values.pop("is_anonymous")
return telemetry_values
def _to_component(self, context: Optional[Dict] = None, **kwargs: Any) -> "PipelineComponent":
"""Translate a pipeline job to pipeline component.
:param context: Context of pipeline job YAML file.
:type context: dict
:return: Translated pipeline component.
:rtype: PipelineComponent
"""
ignored_keys = PipelineComponent._check_ignored_keys(self)
if ignored_keys:
name = self.name or self.display_name
name = f"{name!r} " if name else ""
module_logger.warning("%s ignored when translating PipelineJob %sto PipelineComponent.", ignored_keys, name)
pipeline_job_dict = kwargs.get("pipeline_job_dict", {})
context = context or {BASE_PATH_CONTEXT_KEY: Path("./")}
# Create anonymous pipeline component with default version as 1
return PipelineComponent(
base_path=context[BASE_PATH_CONTEXT_KEY],
display_name=self.display_name,
inputs=self._to_inputs(inputs=self.inputs, pipeline_job_dict=pipeline_job_dict),
outputs=self._to_outputs(outputs=self.outputs, pipeline_job_dict=pipeline_job_dict),
jobs=self.jobs,
)