# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import logging
from enum import Enum
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union, cast
from marshmallow import Schema
from azure.ai.ml.entities._component.component import Component, NodeType
from azure.ai.ml.entities._inputs_outputs import Input, Output
from azure.ai.ml.entities._job.job import Job
from azure.ai.ml.entities._validation import MutableValidationResult
from ..._schema import PathAwareSchema
from .._job.pipeline.pipeline_job_settings import PipelineJobSettings
from .._util import convert_ordered_dict_to_dict, copy_output_setting, validate_attribute_type
from .base_node import BaseNode
if TYPE_CHECKING:
from azure.ai.ml.entities._job.pipeline.pipeline_job import PipelineJob
module_logger = logging.getLogger(__name__)
[docs]
class Pipeline(BaseNode):
"""Base class for pipeline node, used for pipeline component version consumption. You should not instantiate this
class directly. Instead, you should use @pipeline decorator to create a pipeline node.
:param component: Id or instance of the pipeline component/job to be run for the step.
:type component: Union[~azure.ai.ml.entities.Component, str]
:param inputs: Inputs of the pipeline node.
:type inputs: Optional[Dict[str, Union[
~azure.ai.ml.entities.Input,
str, bool, int, float, Enum, "Input"]]].
:param outputs: Outputs of the pipeline node.
:type outputs: Optional[Dict[str, Union[str, ~azure.ai.ml.entities.Output, "Output"]]]
:param settings: Setting of pipeline node, only taking effect for root pipeline job.
:type settings: Optional[~azure.ai.ml.entities._job.pipeline.pipeline_job_settings.PipelineJobSettings]
"""
def __init__(
self,
*,
component: Union[Component, str],
inputs: Optional[
Dict[
str,
Union[
Input,
str,
bool,
int,
float,
Enum,
"Input",
],
]
] = None,
outputs: Optional[Dict[str, Union[str, Output, "Output"]]] = None,
settings: Optional[PipelineJobSettings] = None,
**kwargs: Any,
) -> None:
# validate init params are valid type
validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map())
kwargs.pop("type", None)
BaseNode.__init__(
self,
type=NodeType.PIPELINE,
component=component,
inputs=inputs,
outputs=outputs,
**kwargs,
)
# copy pipeline component output's setting to node level
self._copy_pipeline_component_out_setting_to_node()
self._settings: Optional[PipelineJobSettings] = None
self.settings = settings
@property
def component(self) -> Optional[Union[str, Component]]:
"""Id or instance of the pipeline component/job to be run for the step.
:return: Id or instance of the pipeline component/job.
:rtype: Union[str, ~azure.ai.ml.entities.Component]
"""
res: Union[str, Component] = self._component
return res
@property
def settings(self) -> Optional[PipelineJobSettings]:
"""Settings of the pipeline.
Note: settings is available only when create node as a job.
i.e. ml_client.jobs.create_or_update(node).
:return: Settings of the pipeline.
:rtype: ~azure.ai.ml.entities.PipelineJobSettings
"""
if self._settings is None:
self._settings = PipelineJobSettings()
return self._settings
@settings.setter
def settings(self, value: Union[PipelineJobSettings, Dict]) -> None:
"""Set the settings of the pipeline.
:param value: The settings of the pipeline.
:type value: Union[~azure.ai.ml.entities.PipelineJobSettings, dict]
:raises TypeError: If the value is not an instance of PipelineJobSettings or a dict.
"""
if value is not None:
if isinstance(value, PipelineJobSettings):
# since PipelineJobSettings inherit _AttrDict, we need add this branch to distinguish with dict
pass
elif isinstance(value, dict):
value = PipelineJobSettings(**value)
else:
raise TypeError("settings must be PipelineJobSettings or dict but got {}".format(type(value)))
self._settings = value
@classmethod
def _get_supported_inputs_types(cls) -> None:
# Return None here to skip validation,
# as input could be custom class object(parameter group).
return None
@property
def _skip_required_compute_missing_validation(self) -> bool:
return True
@classmethod
def _get_skip_fields_in_schema_validation(cls) -> List[str]:
# pipeline component must be a file reference when loading from yaml,
# so the created object can't pass schema validation.
return ["component"]
@classmethod
def _attr_type_map(cls) -> dict:
# Use local import to avoid recursive reference as BaseNode is imported in PipelineComponent.
from azure.ai.ml.entities import PipelineComponent
return {
"component": (str, PipelineComponent),
}
def _to_job(self) -> "PipelineJob":
from azure.ai.ml.entities._job.pipeline.pipeline_job import PipelineJob
return PipelineJob(
name=self.name,
display_name=self.display_name,
description=self.description,
tags=self.tags,
properties=self.properties,
# Filter None out to avoid case below failed with conflict keys check:
# group: None (user not specified)
# group.xx: 1 (user specified
inputs={k: v for k, v in self._job_inputs.items() if v},
outputs=self._job_outputs,
component=self.component,
settings=self.settings,
)
def _customized_validate(self) -> MutableValidationResult:
"""Check unsupported settings when use as a node.
:return: The validation result
:rtype: MutableValidationResult
"""
# Note: settings is not supported on node,
# jobs.create_or_update(node) will call node._to_job() at first,
# thus won't reach here.
# pylint: disable=protected-access
from azure.ai.ml.entities import PipelineComponent
validation_result = super(Pipeline, self)._customized_validate()
ignored_keys = PipelineComponent._check_ignored_keys(self)
if ignored_keys:
validation_result.append_warning(message=f"{ignored_keys} ignored on node {self.name!r}.")
if isinstance(self.component, PipelineComponent):
validation_result.merge_with(self.component._customized_validate())
return validation_result
def _to_rest_object(self, **kwargs: Any) -> dict:
rest_obj: Dict = super()._to_rest_object(**kwargs)
rest_obj.update(
convert_ordered_dict_to_dict(
{
"componentId": self._get_component_id(),
}
)
)
return rest_obj
def _build_inputs(self) -> Dict:
inputs = super(Pipeline, self)._build_inputs()
built_inputs = {}
# Validate and remove non-specified inputs
for key, value in inputs.items():
if value is not None:
built_inputs[key] = value
return built_inputs
@classmethod
def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
from azure.ai.ml._schema.pipeline.pipeline_component import PipelineSchema
return PipelineSchema(context=context)
def _copy_pipeline_component_out_setting_to_node(self) -> None:
"""Copy pipeline component output's setting to node level."""
from azure.ai.ml.entities import PipelineComponent
from azure.ai.ml.entities._job.pipeline._io import NodeOutput
if not isinstance(self.component, PipelineComponent):
return
for key, val in self.component.outputs.items():
node_output = cast(NodeOutput, self.outputs.get(key))
copy_output_setting(source=val, target=node_output)
@classmethod
def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "Job":
raise NotImplementedError()