# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import os
from typing import Any, Dict, List, Optional, Union
from marshmallow import Schema
from azure.ai.ml._schema.component.spark_component import SparkComponentSchema
from azure.ai.ml.constants._common import COMPONENT_TYPE
from azure.ai.ml.constants._component import NodeType
from azure.ai.ml.constants._job.job import RestSparkConfKey
from azure.ai.ml.entities._assets import Environment
from azure.ai.ml.entities._job.parameterized_spark import ParameterizedSpark
from ..._schema import PathAwareSchema
from .._job.spark_job_entry_mixin import SparkJobEntry, SparkJobEntryMixin
from .._util import convert_ordered_dict_to_dict, validate_attribute_type
from .._validation import MutableValidationResult
from ._additional_includes import AdditionalIncludesMixin
from .component import Component
[docs]
class SparkComponent(
Component, ParameterizedSpark, SparkJobEntryMixin, AdditionalIncludesMixin
): # pylint: disable=too-many-instance-attributes
"""Spark component version, used to define a Spark Component or Job.
:keyword code: The source code to run the job. Can be a local path or "http:", "https:", or "azureml:" url pointing
to a remote location. Defaults to ".", indicating the current directory.
:type code: Union[str, os.PathLike]
:keyword entry: The file or class entry point.
:paramtype entry: Optional[Union[dict[str, str], ~azure.ai.ml.entities.SparkJobEntry]]
:keyword py_files: The list of .zip, .egg or .py files to place on the PYTHONPATH for Python apps. Defaults to None.
:paramtype py_files: Optional[List[str]]
:keyword jars: The list of .JAR files to include on the driver and executor classpaths. Defaults to None.
:paramtype jars: Optional[List[str]]
:keyword files: The list of files to be placed in the working directory of each executor. Defaults to None.
:paramtype files: Optional[List[str]]
:keyword archives: The list of archives to be extracted into the working directory of each executor.
Defaults to None.
:paramtype archives: Optional[List[str]]
:keyword driver_cores: The number of cores to use for the driver process, only in cluster mode.
:paramtype driver_cores: Optional[int]
:keyword driver_memory: The amount of memory to use for the driver process, formatted as strings with a size unit
suffix ("k", "m", "g" or "t") (e.g. "512m", "2g").
:paramtype driver_memory: Optional[str]
:keyword executor_cores: The number of cores to use on each executor.
:paramtype executor_cores: Optional[int]
:keyword executor_memory: The amount of memory to use per executor process, formatted as strings with a size unit
suffix ("k", "m", "g" or "t") (e.g. "512m", "2g").
:paramtype executor_memory: Optional[str]
:keyword executor_instances: The initial number of executors.
:paramtype executor_instances: Optional[int]
:keyword dynamic_allocation_enabled: Whether to use dynamic resource allocation, which scales the number of
executors registered with this application up and down based on the workload. Defaults to False.
:paramtype dynamic_allocation_enabled: Optional[bool]
:keyword dynamic_allocation_min_executors: The lower bound for the number of executors if dynamic allocation is
enabled.
:paramtype dynamic_allocation_min_executors: Optional[int]
:keyword dynamic_allocation_max_executors: The upper bound for the number of executors if dynamic allocation is
enabled.
:paramtype dynamic_allocation_max_executors: Optional[int]
:keyword conf: A dictionary with pre-defined Spark configurations key and values. Defaults to None.
:paramtype conf: Optional[dict[str, str]]
:keyword environment: The Azure ML environment to run the job in.
:paramtype environment: Optional[Union[str, ~azure.ai.ml.entities.Environment]]
:keyword inputs: A mapping of input names to input data sources used in the job. Defaults to None.
:paramtype inputs: Optional[dict[str, Union[
~azure.ai.ml.entities._job.pipeline._io.NodeOutput,
~azure.ai.ml.Input,
str,
bool,
int,
float,
Enum,
]]]
:keyword outputs: A mapping of output names to output data sources used in the job. Defaults to None.
:paramtype outputs: Optional[dict[str, Union[str, ~azure.ai.ml.Output]]]
:keyword args: The arguments for the job. Defaults to None.
:paramtype args: Optional[str]
:keyword additional_includes: A list of shared additional files to be included in the component. Defaults to None.
:paramtype additional_includes: Optional[List[str]]
.. admonition:: Example:
.. literalinclude:: ../samples/ml_samples_spark_configurations.py
:start-after: [START spark_component_definition]
:end-before: [END spark_component_definition]
:language: python
:dedent: 8
:caption: Creating SparkComponent.
"""
def __init__(
self,
*,
code: Optional[Union[str, os.PathLike]] = ".",
entry: Optional[Union[Dict[str, str], SparkJobEntry]] = None,
py_files: Optional[List[str]] = None,
jars: Optional[List[str]] = None,
files: Optional[List[str]] = None,
archives: Optional[List[str]] = None,
driver_cores: Optional[Union[int, str]] = None,
driver_memory: Optional[str] = None,
executor_cores: Optional[Union[int, str]] = None,
executor_memory: Optional[str] = None,
executor_instances: Optional[Union[int, str]] = None,
dynamic_allocation_enabled: Optional[Union[bool, str]] = None,
dynamic_allocation_min_executors: Optional[Union[int, str]] = None,
dynamic_allocation_max_executors: Optional[Union[int, str]] = None,
conf: Optional[Dict[str, str]] = None,
environment: Optional[Union[str, Environment]] = None,
inputs: Optional[Dict] = None,
outputs: Optional[Dict] = None,
args: Optional[str] = None,
additional_includes: Optional[List] = None,
**kwargs: Any,
) -> None:
# validate init params are valid type
validate_attribute_type(attrs_to_check=locals(), attr_type_map=self._attr_type_map())
kwargs[COMPONENT_TYPE] = NodeType.SPARK
super().__init__(
inputs=inputs,
outputs=outputs,
**kwargs,
)
self.code: Optional[Union[str, os.PathLike]] = code
self.entry = entry
self.py_files = py_files
self.jars = jars
self.files = files
self.archives = archives
self.conf = conf
self.environment = environment
self.args = args
self.additional_includes = additional_includes or []
# For pipeline spark job, we also allow user to set driver_cores, driver_memory and so on by setting conf.
# If root level fields are not set by user, we promote conf setting to root level to facilitate subsequent
# verification. This usually happens when we use to_component(SparkJob) or builder function spark() as a node
# in pipeline sdk
conf = conf or {}
self.driver_cores = driver_cores or conf.get(RestSparkConfKey.DRIVER_CORES, None)
self.driver_memory = driver_memory or conf.get(RestSparkConfKey.DRIVER_MEMORY, None)
self.executor_cores = executor_cores or conf.get(RestSparkConfKey.EXECUTOR_CORES, None)
self.executor_memory = executor_memory or conf.get(RestSparkConfKey.EXECUTOR_MEMORY, None)
self.executor_instances = executor_instances or conf.get(RestSparkConfKey.EXECUTOR_INSTANCES, None)
self.dynamic_allocation_enabled = dynamic_allocation_enabled or conf.get(
RestSparkConfKey.DYNAMIC_ALLOCATION_ENABLED, None
)
self.dynamic_allocation_min_executors = dynamic_allocation_min_executors or conf.get(
RestSparkConfKey.DYNAMIC_ALLOCATION_MIN_EXECUTORS, None
)
self.dynamic_allocation_max_executors = dynamic_allocation_max_executors or conf.get(
RestSparkConfKey.DYNAMIC_ALLOCATION_MAX_EXECUTORS, None
)
@classmethod
def _create_schema_for_validation(cls, context: Any) -> Union[PathAwareSchema, Schema]:
return SparkComponentSchema(context=context)
@classmethod
def _attr_type_map(cls) -> dict:
return {
"environment": (str, Environment),
"code": (str, os.PathLike),
}
def _customized_validate(self) -> MutableValidationResult:
validation_result = super()._customized_validate()
self._append_diagnostics_and_check_if_origin_code_reliable_for_local_path_validation(validation_result)
return validation_result
def _to_dict(self) -> Dict:
# TODO: Bug Item number: 2897665
res: Dict = convert_ordered_dict_to_dict( # type: ignore
{**self._other_parameter, **super(SparkComponent, self)._to_dict()}
)
return res
def _to_ordered_dict_for_yaml_dump(self) -> Dict:
"""Dump the component content into a sorted yaml string.
:return: The ordered dict
:rtype: Dict
"""
obj: dict = super()._to_ordered_dict_for_yaml_dump()
# dict dumped base on schema will transfer code to an absolute path, while we want to keep its original value
if self.code and isinstance(self.code, str):
obj["code"] = self.code
return obj
def _get_environment_id(self) -> Union[str, None]:
# Return environment id of environment
# handle case when environment is defined inline
if isinstance(self.environment, Environment):
res: Optional[str] = self.environment.id
return res
return self.environment
def __str__(self) -> str:
try:
toYaml: str = self._to_yaml()
return toYaml
except BaseException: # pylint: disable=W0718
toStr: str = super(SparkComponent, self).__str__()
return toStr