diff --git a/kedro_mlflow/framework/hooks/mlflow_hook.py b/kedro_mlflow/framework/hooks/mlflow_hook.py index 41b45011..21abf776 100644 --- a/kedro_mlflow/framework/hooks/mlflow_hook.py +++ b/kedro_mlflow/framework/hooks/mlflow_hook.py @@ -371,6 +371,7 @@ def after_pipeline_run( pipeline=pipeline.inference, catalog=catalog, input_name=pipeline.input_name, + params_input_name=pipeline.params_input_name, **pipeline.kpm_kwargs, ) artifacts = kedro_pipeline_model.extract_pipeline_artifacts( diff --git a/kedro_mlflow/mlflow/kedro_pipeline_model.py b/kedro_mlflow/mlflow/kedro_pipeline_model.py index 5c8d2ce7..0809175d 100644 --- a/kedro_mlflow/mlflow/kedro_pipeline_model.py +++ b/kedro_mlflow/mlflow/kedro_pipeline_model.py @@ -1,6 +1,6 @@ import logging from pathlib import Path -from typing import Dict, Optional, Union +from typing import Dict, Optional, Union, Any from kedro.framework.hooks import _create_hook_manager from kedro.io import DataCatalog, MemoryDataset @@ -20,6 +20,7 @@ def __init__( input_name: str, runner: Optional[AbstractRunner] = None, copy_mode: Optional[Union[Dict[str, str], str]] = "assign", + params_input_name: Optional[str] = None, ): """[summary] @@ -30,6 +31,8 @@ def __init__( catalog (DataCatalog): The DataCatalog associated to the PipelineMl + input_name (str): TODO + runner (Optional[AbstractRunner], optional): The kedro AbstractRunner to use. Defaults to SequentialRunner if None. @@ -45,12 +48,16 @@ def __init__( - a dictionary with (dataset name, copy_mode) key/values pairs. The associated mode must be a valid kedro mode ("deepcopy", "copy" and "assign") for each. Defaults to None. + + params_input_name (Optional[str]): TODO + """ self.pipeline = ( pipeline.inference if isinstance(pipeline, PipelineML) else pipeline ) self.input_name = input_name + self.params_input_name = params_input_name self.initial_catalog = self._extract_pipeline_catalog(catalog) nb_outputs = len(self.pipeline.outputs()) @@ -107,7 +114,7 @@ def copy_mode(self, copy_mode): def _extract_pipeline_catalog(self, catalog: DataCatalog) -> DataCatalog: sub_catalog = DataCatalog() for dataset_name in self.pipeline.inputs(): - if dataset_name == self.input_name: + if dataset_name in (self.input_name, self.params_input_name): # there is no obligation that this dataset is persisted # and even if it is, we keep only an ampty memory dataset to avoid # extra uneccessary dependencies: this dataset will be replaced at @@ -145,7 +152,7 @@ def extract_pipeline_artifacts( ): artifacts = {} for name, dataset in self.initial_catalog._datasets.items(): - if name != self.input_name: + if name not in (self.input_name, self.params_input_name): if name.startswith("params:"): # we need to persist it locally for mlflow access absolute_param_path = ( @@ -177,7 +184,9 @@ def load_context(self, context): # but we rely on a mlflow function for saving, and it is unaware of kedro # pipeline structure mlflow_artifacts_keys = set(context.artifacts.keys()) - kedro_artifacts_keys = set(self.pipeline.inputs() - {self.input_name}) + kedro_artifacts_keys = set( + self.pipeline.inputs() - {self.input_name, self.params_input_name} + ) if mlflow_artifacts_keys != kedro_artifacts_keys: in_artifacts_but_not_inference = ( mlflow_artifacts_keys - kedro_artifacts_keys @@ -196,7 +205,7 @@ def load_context(self, context): updated_catalog._datasets[name]._filepath = Path(uri) self.loaded_catalog.save(name=name, data=updated_catalog.load(name)) - def predict(self, context, model_input): + def predict(self, context, model_input, params: Optional[dict[str, Any]] = None): # we create an empty hook manager but do NOT register hooks # because we want this model be executable outside of a kedro project hook_manager = _create_hook_manager() @@ -206,6 +215,9 @@ def predict(self, context, model_input): data=model_input, ) + if self.params_input_name: + self.loaded_catalog.save(name=self.params_input_name, data=params) + run_output = self.runner.run( pipeline=self.pipeline, catalog=self.loaded_catalog, diff --git a/kedro_mlflow/pipeline/pipeline_ml.py b/kedro_mlflow/pipeline/pipeline_ml.py index bdd6f807..df94fd26 100644 --- a/kedro_mlflow/pipeline/pipeline_ml.py +++ b/kedro_mlflow/pipeline/pipeline_ml.py @@ -52,6 +52,7 @@ def __init__( input_name: str, kpm_kwargs: Optional[Dict[str, str]] = None, log_model_kwargs: Optional[Dict[str, str]] = None, + params_input_name: Optional[str] = None, ): """Store all necessary information for calling mlflow.log_model in the pipeline. @@ -65,9 +66,9 @@ def __init__( stored in mlflow and use the output(s) of the training pipeline (namely, the model) to predict the outcome. - input_name (str, optional): The name of the dataset in + input_name (str): The name of the dataset in the catalog.yml which the model's user must provide - for prediction (i.e. the data). Defaults to None. + for prediction (i.e. the data). kpm_kwargs: extra arguments to be passed to `KedroPipelineModel` when the PipelineML object is automatically saved at the end of a run. @@ -79,6 +80,7 @@ def __init__( extra arguments to be passed to `mlflow.pyfunc.log_model`, e.g.: - "signature" accepts an extra "auto" which automatically infer the signature based on "input_name" dataset + params_input_name (str, optional): TODO """ @@ -86,6 +88,7 @@ def __init__( self.inference = inference self.input_name = input_name + self.params_input_name = params_input_name # they will be passed to KedroPipelineModel to enable flexibility kpm_kwargs = kpm_kwargs or {} @@ -104,7 +107,7 @@ def training(self) -> Pipeline: return Pipeline(self.nodes) @property - def inference(self) -> str: + def inference(self) -> Pipeline: return self._inference @inference.setter @@ -127,6 +130,22 @@ def input_name(self, name: str) -> None: ) self._input_name = name + @property + def params_input_name(self) -> str | None: + return self._params_input_name + + @params_input_name.setter + def params_input_name(self, name: str | None) -> None: + if name is not None: + allowed_names = self.inference.inputs() + pp_allowed_names = "\n - ".join(allowed_names) + if name not in allowed_names: + raise KedroMlflowPipelineMLError( + f"params_input_name='{name}' but it must be an input of 'inference'" + f", i.e. one of: \n - {pp_allowed_names}" + ) + self._params_input_name = name + def _check_inference(self, inference: Pipeline) -> None: nb_outputs = len(inference.outputs()) outputs_txt = "\n - ".join(inference.outputs()) @@ -146,7 +165,7 @@ def _check_consistency(self) -> None: free_inputs_set = ( self.inference.inputs() - - {self.input_name} + - {self.input_name, self.params_input_name} - self.all_outputs() - self.inputs() - inference_parameters # it is allowed to pass parameters: they will be automatically persisted by the hook @@ -160,7 +179,7 @@ def _check_consistency(self) -> None: " \nNo free input is allowed." " Please make sure that 'inference.inputs()' are all" " in 'training.all_outputs() + training.inputs()'" - "except 'input_name' and parameters which starts with 'params:'." + "except 'input_name', 'params_input_name' and parameters which starts with 'params:'." ) def _turn_pipeline_to_ml(self, pipeline: Pipeline): @@ -170,6 +189,7 @@ def _turn_pipeline_to_ml(self, pipeline: Pipeline): input_name=self.input_name, kpm_kwargs=self.kpm_kwargs, log_model_kwargs=self.log_model_kwargs, + params_input_name=self.params_input_name, ) def only_nodes(self, *node_names: str) -> "Pipeline": # pragma: no cover @@ -225,13 +245,13 @@ def tag(self, tags: Union[str, Iterable[str]]) -> "PipelineML": def filter( self, - tags: Iterable[str] = None, - from_nodes: Iterable[str] = None, - to_nodes: Iterable[str] = None, - node_names: Iterable[str] = None, - from_inputs: Iterable[str] = None, - to_outputs: Iterable[str] = None, - node_namespace: str = None, + tags: Optional[Iterable[str]] = None, + from_nodes: Optional[Iterable[str]] = None, + to_nodes: Optional[Iterable[str]] = None, + node_names: Optional[Iterable[str]] = None, + from_inputs: Optional[Iterable[str]] = None, + to_outputs: Optional[Iterable[str]] = None, + node_namespace: Optional[str] = None, ) -> "Pipeline": # see from_inputs for an explanation of why we don't call super() pipeline = self.training.filter( diff --git a/kedro_mlflow/pipeline/pipeline_ml_factory.py b/kedro_mlflow/pipeline/pipeline_ml_factory.py index 6ea2efac..a8edbfd5 100644 --- a/kedro_mlflow/pipeline/pipeline_ml_factory.py +++ b/kedro_mlflow/pipeline/pipeline_ml_factory.py @@ -1,14 +1,16 @@ from kedro.pipeline import Pipeline from kedro_mlflow.pipeline.pipeline_ml import PipelineML +from typing import Optional def pipeline_ml_factory( training: Pipeline, inference: Pipeline, - input_name: str = None, + input_name: str, kpm_kwargs=None, log_model_kwargs=None, + params_input_name: Optional[str] = None, ) -> PipelineML: """This function is a helper to create `PipelineML` object directly from two Kedro `Pipelines` (one of @@ -23,9 +25,9 @@ def pipeline_ml_factory( stored in mlflow and use the output(s) of the training pipeline (namely, the model) to predict the outcome. - input_name (str, optional): The name of the dataset in + input_name (str): The name of the dataset in the catalog.yml which the model's user must provide - for prediction (i.e. the data). Defaults to None. + for prediction (i.e. the data). kpm_kwargs: extra arguments to be passed to `KedroPipelineModel` when the PipelineML object is automatically saved at the end of a run. @@ -37,6 +39,7 @@ def pipeline_ml_factory( extra arguments to be passed to `mlflow.pyfunc.log_model` when the PipelineML object is automatically saved at the end of a run. See mlflow documentation to see all available options: https://www.mlflow.org/docs/latest/python_api/mlflow.pyfunc.html#mlflow.pyfunc.log_model + params_input_name (str, optional): TODO Returns: PipelineML: A `PipelineML` which is automatically @@ -51,5 +54,6 @@ def pipeline_ml_factory( input_name=input_name, kpm_kwargs=kpm_kwargs, log_model_kwargs=log_model_kwargs, + params_input_name=params_input_name, ) return pipeline