# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
# pylint: disable=protected-access
import logging
from typing import Any, Dict, List, Optional, Tuple, Union, cast
from marshmallow import Schema
from azure.ai.ml._restclient.v2022_10_01_preview.models import JobBase
from azure.ai.ml._schema.job.data_transfer_job import (
DataTransferCopyJobSchema,
DataTransferExportJobSchema,
DataTransferImportJobSchema,
)
from azure.ai.ml._utils._experimental import experimental
from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, AssetTypes
from azure.ai.ml.constants._component import DataTransferTaskType, ExternalDataType, NodeType
from azure.ai.ml.entities._component.component import Component
from azure.ai.ml.entities._component.datatransfer_component import (
DataTransferComponent,
DataTransferCopyComponent,
DataTransferExportComponent,
DataTransferImportComponent,
)
from azure.ai.ml.entities._inputs_outputs import Input, Output
from azure.ai.ml.entities._inputs_outputs.external_data import Database, FileSystem
from azure.ai.ml.entities._job.data_transfer.data_transfer_job import (
DataTransferCopyJob,
DataTransferExportJob,
DataTransferImportJob,
)
from azure.ai.ml.entities._validation.core import MutableValidationResult
from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationErrorType, ValidationException
from ..._schema import PathAwareSchema
from .._job.pipeline._io import NodeOutput
from .._util import convert_ordered_dict_to_dict, load_from_dict, validate_attribute_type
from .base_node import BaseNode
module_logger = logging.getLogger(__name__)
def _build_source_sink(io_dict: Optional[Union[Dict, Database, FileSystem]]) -> Optional[Union[Database, FileSystem]]:
if io_dict is None:
return io_dict
if isinstance(io_dict, (Database, FileSystem)):
component_io = io_dict
else:
if isinstance(io_dict, dict):
data_type = io_dict.pop("type", None)
if data_type == ExternalDataType.DATABASE:
component_io = Database(**io_dict)
elif data_type == ExternalDataType.FILE_SYSTEM:
component_io = FileSystem(**io_dict)
else:
msg = "Type in source or sink only support {} and {}, currently got {}."
raise ValidationException(
message=msg.format(
ExternalDataType.DATABASE,
ExternalDataType.FILE_SYSTEM,
data_type,
),
no_personal_data_message=msg.format(
ExternalDataType.DATABASE,
ExternalDataType.FILE_SYSTEM,
"data_type",
),
target=ErrorTarget.DATA_TRANSFER_JOB,
error_category=ErrorCategory.USER_ERROR,
error_type=ValidationErrorType.INVALID_VALUE,
)
else:
msg = "Source or sink only support dict, Database and FileSystem"
raise ValidationException(
message=msg,
no_personal_data_message=msg,
target=ErrorTarget.DATA_TRANSFER_JOB,
error_category=ErrorCategory.USER_ERROR,
error_type=ValidationErrorType.INVALID_VALUE,
)
return component_io
class DataTransfer(BaseNode):
"""Base class for data transfer node, used for data transfer component version consumption.
You should not instantiate this class directly.
"""
def __init__(
self,
*,
component: Union[str, DataTransferCopyComponent, DataTransferImportComponent],
compute: Optional[str] = None,
inputs: Optional[Dict[str, Union[NodeOutput, Input, str]]] = None,
outputs: Optional[Dict[str, Union[str, Output]]] = None,
**kwargs: Any,
):
# resolve normal dict to dict[str, JobService]
kwargs.pop("type", None)
super().__init__(
type=NodeType.DATA_TRANSFER,
inputs=inputs,
outputs=outputs,
component=component,
compute=compute,
**kwargs,
)
@property
def component(self) -> Union[str, DataTransferComponent]:
res: Union[str, DataTransferComponent] = self._component
return res
@classmethod
def _load_from_rest_job(cls, obj: JobBase) -> "DataTransfer":
# Todo: need update rest api
raise NotImplementedError("Not support submit standalone job for now")
@classmethod
def _get_supported_outputs_types(cls) -> Tuple:
return str, Output
def _build_inputs(self) -> Dict:
inputs = super(DataTransfer, self)._build_inputs()
built_inputs = {}
# Validate and remove non-specified inputs
for key, value in inputs.items():
if value is not None:
built_inputs[key] = value
return built_inputs
[docs]
@experimental
class DataTransferCopy(DataTransfer):
"""Base class for data transfer copy node.
You should not instantiate this class directly. Instead, you should
create from builder function: copy_data.
:param component: Id or instance of the data transfer component/job to be run for the step
:type component: DataTransferCopyComponent
:param inputs: Inputs to the data transfer.
:type inputs: Dict[str, Union[NodeOutput, Input, str]]
:param outputs: Mapping of output data bindings used in the job.
:type outputs: Dict[str, Union[str, Output, dict]]
:param name: Name of the data transfer.
:type name: str
:param description: Description of the data transfer.
:type description: str
:param tags: Tag dictionary. Tags can be added, removed, and updated.
:type tags: dict[str, str]
:param display_name: Display name of the job.
:type display_name: str
:param experiment_name: Name of the experiment the job will be created under,
if None is provided, default will be set to current directory name.
:type experiment_name: str
:param compute: The compute target the job runs on.
:type compute: str
:param data_copy_mode: data copy mode in copy task, possible value is "merge_with_overwrite", "fail_if_conflict".
:type data_copy_mode: str
:raises ~azure.ai.ml.exceptions.ValidationException: Raised if DataTransferCopy cannot be successfully validated.
Details will be provided in the error message.
"""
def __init__(
self,
*,
component: Union[str, DataTransferCopyComponent],
compute: Optional[str] = None,
inputs: Optional[Dict[str, Union[NodeOutput, Input, str]]] = None,
outputs: Optional[Dict[str, Union[str, Output]]] = None,
data_copy_mode: Optional[str] = None,
**kwargs: Any,
):
# validate init params are valid type
validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map())
super().__init__(
inputs=inputs,
outputs=outputs,
component=component,
compute=compute,
**kwargs,
)
# init mark for _AttrDict
self._init = True
self.task = DataTransferTaskType.COPY_DATA
self.data_copy_mode = data_copy_mode
is_component = isinstance(component, DataTransferCopyComponent)
if is_component:
_component: DataTransferCopyComponent = cast(DataTransferCopyComponent, component)
self.task = _component.task or self.task
self.data_copy_mode = _component.data_copy_mode or self.data_copy_mode
self._init = False
@classmethod
def _attr_type_map(cls) -> dict:
return {
"component": (str, DataTransferCopyComponent),
}
@classmethod
def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
from azure.ai.ml._schema.pipeline import DataTransferCopySchema
return DataTransferCopySchema(context=context)
@classmethod
def _picked_fields_from_dict_to_rest_object(cls) -> List[str]:
return ["type", "task", "data_copy_mode"]
def _to_rest_object(self, **kwargs: Any) -> dict:
rest_obj = super()._to_rest_object(**kwargs)
for key, value in {
"componentId": self._get_component_id(),
"data_copy_mode": self.data_copy_mode,
}.items():
if value is not None:
rest_obj[key] = value
return cast(dict, convert_ordered_dict_to_dict(rest_obj))
@classmethod
def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> Any:
from .data_transfer_func import copy_data
loaded_data = load_from_dict(DataTransferCopyJobSchema, data, context, additional_message, **kwargs)
data_transfer_job = copy_data(base_path=context[BASE_PATH_CONTEXT_KEY], **loaded_data)
return data_transfer_job
def _to_job(self) -> DataTransferCopyJob:
return DataTransferCopyJob(
experiment_name=self.experiment_name,
name=self.name,
display_name=self.display_name,
description=self.description,
tags=self.tags,
status=self.status,
inputs=self._job_inputs,
outputs=self._job_outputs,
services=self.services,
compute=self.compute,
data_copy_mode=self.data_copy_mode,
)
# pylint: disable-next=docstring-missing-param
def __call__(self, *args: Any, **kwargs: Any) -> "DataTransferCopy":
"""Call DataTransferCopy as a function will return a new instance each time.
:return: A DataTransferCopy node
:rtype: DataTransferCopy
"""
if isinstance(self._component, Component):
# call this to validate inputs
node: DataTransferCopy = self._component(*args, **kwargs)
# merge inputs
for name, original_input in self.inputs.items():
if name not in kwargs:
# use setattr here to make sure owner of input won't change
setattr(node.inputs, name, original_input._data)
node._job_inputs[name] = original_input._data
# get outputs
for name, original_output in self.outputs.items():
# use setattr here to make sure owner of input won't change
if not isinstance(original_output, str):
setattr(node.outputs, name, original_output._data)
self._refine_optional_inputs_with_no_value(node, kwargs)
# set default values: compute, environment_variables, outputs
node._name = self.name
node.compute = self.compute
node.tags = self.tags
# Pass through the display name only if the display name is not system generated.
node.display_name = self.display_name if self.display_name != self.name else None
return node
msg = "copy_data can be called as a function only when referenced component is {}, currently got {}."
raise ValidationException(
message=msg.format(type(Component), self._component),
no_personal_data_message=msg.format(type(Component), "self._component"),
target=ErrorTarget.DATA_TRANSFER_JOB,
error_type=ValidationErrorType.INVALID_VALUE,
)
[docs]
@experimental
class DataTransferImport(DataTransfer):
"""Base class for data transfer import node.
You should not instantiate this class directly. Instead, you should
create from builder function: import_data.
:param component: Id of the data transfer built in component to be run for the step
:type component: str
:param source: The data source of file system or database
:type source: Union[Dict, Database, FileSystem]
:param outputs: Mapping of output data bindings used in the job.
:type outputs: Dict[str, Union[str, Output, dict]]
:param name: Name of the data transfer.
:type name: str
:param description: Description of the data transfer.
:type description: str
:param tags: Tag dictionary. Tags can be added, removed, and updated.
:type tags: dict[str, str]
:param display_name: Display name of the job.
:type display_name: str
:param experiment_name: Name of the experiment the job will be created under,
if None is provided, default will be set to current directory name.
:type experiment_name: str
:param compute: The compute target the job runs on.
:type compute: str
:raises ~azure.ai.ml.exceptions.ValidationException: Raised if DataTransferImport cannot be successfully validated.
Details will be provided in the error message.
"""
def __init__(
self,
*,
component: Union[str, DataTransferImportComponent],
compute: Optional[str] = None,
source: Optional[Union[Dict, Database, FileSystem]] = None,
outputs: Optional[Dict[str, Union[str, Output]]] = None,
**kwargs: Any,
):
# validate init params are valid type
validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map())
super(DataTransferImport, self).__init__(
component=component,
outputs=outputs,
compute=compute,
**kwargs,
)
# init mark for _AttrDict
self._init = True
self.task = DataTransferTaskType.IMPORT_DATA
is_component = isinstance(component, DataTransferImportComponent)
if is_component:
_component: DataTransferImportComponent = cast(DataTransferImportComponent, component)
self.task = _component.task or self.task
self.source = _build_source_sink(source)
self._init = False
@classmethod
def _attr_type_map(cls) -> dict:
return {
"component": (str, DataTransferImportComponent),
}
@classmethod
def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
from azure.ai.ml._schema.pipeline import DataTransferImportSchema
return DataTransferImportSchema(context=context)
@classmethod
def _picked_fields_from_dict_to_rest_object(cls) -> List[str]:
return ["type", "task", "source"]
def _customized_validate(self) -> MutableValidationResult:
result = super()._customized_validate()
if self.source is None:
result.append_error(
yaml_path="source",
message="Source is a required field for import data task in DataTransfer job",
)
if len(self.outputs) != 1 or list(self.outputs.keys())[0] != "sink":
result.append_error(
yaml_path="outputs.sink",
message="Outputs field only support one output called sink in import task",
)
if (
"sink" in self.outputs
and not isinstance(self.outputs["sink"], str)
and isinstance(self.outputs["sink"]._data, Output)
):
sink_output = self.outputs["sink"]._data
if self.source is not None:
if (self.source.type == ExternalDataType.DATABASE and sink_output.type != AssetTypes.MLTABLE) or (
self.source.type == ExternalDataType.FILE_SYSTEM and sink_output.type != AssetTypes.URI_FOLDER
):
result.append_error(
yaml_path="outputs.sink.type",
message="Outputs field only support type {} for {} and {} for {}".format(
AssetTypes.MLTABLE,
ExternalDataType.DATABASE,
AssetTypes.URI_FOLDER,
ExternalDataType.FILE_SYSTEM,
),
)
return result
def _to_rest_object(self, **kwargs: Any) -> dict:
rest_obj = super()._to_rest_object(**kwargs)
for key, value in {
"componentId": self._get_component_id(),
}.items():
if value is not None:
rest_obj[key] = value
return cast(dict, convert_ordered_dict_to_dict(rest_obj))
@classmethod
def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "DataTransferImport":
from .data_transfer_func import import_data
loaded_data = load_from_dict(DataTransferImportJobSchema, data, context, additional_message, **kwargs)
data_transfer_job: DataTransferImport = import_data(base_path=context[BASE_PATH_CONTEXT_KEY], **loaded_data)
return data_transfer_job
def _to_job(self) -> DataTransferImportJob:
return DataTransferImportJob(
experiment_name=self.experiment_name,
name=self.name,
display_name=self.display_name,
description=self.description,
tags=self.tags,
status=self.status,
source=self.source,
outputs=self._job_outputs,
services=self.services,
compute=self.compute,
)
[docs]
@experimental
class DataTransferExport(DataTransfer):
"""Base class for data transfer export node.
You should not instantiate this class directly. Instead, you should
create from builder function: export_data.
:param component: Id of the data transfer built in component to be run for the step
:type component: str
:param sink: The sink of external data and databases.
:type sink: Union[Dict, Database, FileSystem]
:param inputs: Mapping of input data bindings used in the job.
:type inputs: Dict[str, Union[NodeOutput, Input, str, Input]]
:param name: Name of the data transfer.
:type name: str
:param description: Description of the data transfer.
:type description: str
:param tags: Tag dictionary. Tags can be added, removed, and updated.
:type tags: dict[str, str]
:param display_name: Display name of the job.
:type display_name: str
:param experiment_name: Name of the experiment the job will be created under,
if None is provided, default will be set to current directory name.
:type experiment_name: str
:param compute: The compute target the job runs on.
:type compute: str
:raises ~azure.ai.ml.exceptions.ValidationException: Raised if DataTransferExport cannot be successfully validated.
Details will be provided in the error message.
"""
def __init__(
self,
*,
component: Union[str, DataTransferCopyComponent, DataTransferImportComponent],
compute: Optional[str] = None,
sink: Optional[Union[Dict, Database, FileSystem]] = None,
inputs: Optional[Dict[str, Union[NodeOutput, Input, str]]] = None,
**kwargs: Any,
):
# validate init params are valid type
validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map())
super(DataTransferExport, self).__init__(
component=component,
inputs=inputs,
compute=compute,
**kwargs,
)
# init mark for _AttrDict
self._init = True
self.task = DataTransferTaskType.EXPORT_DATA
is_component = isinstance(component, DataTransferExportComponent)
if is_component:
_component: DataTransferExportComponent = cast(DataTransferExportComponent, component)
self.task = _component.task or self.task
self.sink = sink
self._init = False
@property
def sink(self) -> Optional[Union[Dict, Database, FileSystem]]:
"""The sink of external data and databases.
:return: The sink of external data and databases.
:rtype: Union[None, Database, FileSystem]
"""
return self._sink
@sink.setter
def sink(self, value: Union[Dict, Database, FileSystem]) -> None:
self._sink = _build_source_sink(value)
@classmethod
def _attr_type_map(cls) -> dict:
return {
"component": (str, DataTransferExportComponent),
}
@classmethod
def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
from azure.ai.ml._schema.pipeline import DataTransferExportSchema
return DataTransferExportSchema(context=context)
@classmethod
def _picked_fields_from_dict_to_rest_object(cls) -> List[str]:
return ["type", "task", "sink"]
def _customized_validate(self) -> MutableValidationResult:
result = super()._customized_validate()
if self.sink is None:
result.append_error(
yaml_path="sink",
message="Sink is a required field for export data task in DataTransfer job",
)
if len(self.inputs) != 1 or list(self.inputs.keys())[0] != "source":
result.append_error(
yaml_path="inputs.source",
message="Inputs field only support one input called source in export task",
)
if "source" in self.inputs and isinstance(self.inputs["source"]._data, Input):
source_input = self.inputs["source"]._data
if self.sink is not None and not isinstance(self.sink, Dict):
if (self.sink.type == ExternalDataType.DATABASE and source_input.type != AssetTypes.URI_FILE) or (
self.sink.type == ExternalDataType.FILE_SYSTEM and source_input.type != AssetTypes.URI_FOLDER
):
result.append_error(
yaml_path="inputs.source.type",
message="Inputs field only support type {} for {} and {} for {}".format(
AssetTypes.URI_FILE,
ExternalDataType.DATABASE,
AssetTypes.URI_FOLDER,
ExternalDataType.FILE_SYSTEM,
),
)
return result
def _to_rest_object(self, **kwargs: Any) -> dict:
rest_obj = super()._to_rest_object(**kwargs)
for key, value in {
"componentId": self._get_component_id(),
}.items():
if value is not None:
rest_obj[key] = value
return cast(dict, convert_ordered_dict_to_dict(rest_obj))
@classmethod
def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs: Any) -> "DataTransferExport":
from .data_transfer_func import export_data
loaded_data = load_from_dict(DataTransferExportJobSchema, data, context, additional_message, **kwargs)
data_transfer_job: DataTransferExport = export_data(base_path=context[BASE_PATH_CONTEXT_KEY], **loaded_data)
return data_transfer_job
def _to_job(self) -> DataTransferExportJob:
return DataTransferExportJob(
experiment_name=self.experiment_name,
name=self.name,
display_name=self.display_name,
description=self.description,
tags=self.tags,
status=self.status,
sink=self.sink,
inputs=self._job_inputs,
services=self.services,
compute=self.compute,
)