Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Understanding PipelineML and pipeline_ml #16

Closed
akruszewski opened this issue Jul 7, 2020 · 11 comments · Fixed by #125
Closed

Understanding PipelineML and pipeline_ml #16

akruszewski opened this issue Jul 7, 2020 · 11 comments · Fixed by #125
Assignees
Labels
documentation Improvements or additions to documentation
Milestone

Comments

@akruszewski
Copy link

Hi @Galileo-Galilei. As I mentioned in other issue, I'm working currently with integrating my training and inference pipelines with MLPipeline. Unfortunately I'm confused with handling inputs and outputs, I can't wrap my head around it.

Context

My training pipeline is built from three other pipelines: de_pipeline (data engineering), fe_pipeline (feature engineering) and md_pipeline (training aka. modeling).

My inference pipeline is buit from the same pipelines but with predict argument which change their behavior (they're using previously saved models for imputer and prediction.

In my current implementation it looks like this:

     de_pipeline_predict = pipeline(
         de.create_pipeline(predict=True),  # type: ignore
         inputs={"remote_raw": "remote_new", "imputer": "imputer"},
         namespace="new",
     )
     fe_pipeline_predict = pipeline(
         fe.create_pipeline(predict=True),  # type: ignore
         namespace="new",
     )

     # `new_preds` output would be mapped to `new.new_preds` because of
     # namespace usage, so we use map `new_preds` to `new_preds` to retain the
     # name and keep catalog clean.
     md_pipeline_predict = pipeline(
         md.create_pipeline(predict=True),  # type: ignore
         inputs={"lgbm": "lgbm"},
         outputs={"new_preds": "new_preds"},
         namespace="new",

My pipelines also getting as input parameters, obtained from kedro configuration (by that I mean conf/base/parameters.yaml).

When I'm trying to glue them together with:

     train_pipeline = de_pipeline + fe_pipeline + md_pipeline
     predict_pipeline = de_pipeline_predict + fe_pipeline_predict + md_pipeline_predict

     training = pipeline_ml(
         training=train_pipeline,
         inference=predict_pipeline,
     )

and running my training pipeline I'm getting:

kedro_mlflow.pipeline.pipeline_ml.KedroMlflowPipelineMLInputsError:
        The following inputs are free for the inference pipeline:
        - lgbm
     - remote_new
     - imputer
     - params:data_engineering
     - params:target.
        Only one free input is allowed.
        Please make sure that 'inference' pipeline inputs are 'training' pipeline outputs,
        except one.

I'm understand the issue here, but I don't know how to proceed with that ("un-free" inputs which should be obtained (automatically?) using Kedro features). I would be glad for any tips.

@Galileo-Galilei
Copy link
Owner

Galileo-Galilei commented Jul 7, 2020

TL; DR: make sure that training_pipeline.outputs() and predict_pipeline.inputs() are identical (except for one value for the data to predict in the second set, which seems to be remote_new in your project.). Note: the namespace may have modified them and causes some interference and it might be a bug.


A bit of context: what is the pipeline_ml function for?

Unlike all the others functions of the plugin, this function is not intended for production: it packages a whole pipeline to make it easy to serve (in api or batch) in one command line. The goal is to share a pipeline very easily and to facilitate test and reuse between data scientists.

For production purpose, you need a tool to schedule / monitor the pipelines (like airflow for instance).

What is its goal?

Under the hood, the PipelineML class (the type of the output of pipeline_ml) is nothing but a kedro Pipeline (the "training" pipeline) which has access to another pipeline (the "predict" pipeline) (you can see it in the code here).

When the KedroPipelineHook detects that the pipeline you are trying to run (with kedro run) is not a standard Pipeline but a PipelineML, it tries to log it in mlflow with the KedroPipelineModel class which is actually the class that contains all the code logic.

This class is inspired from mlflow example on how to create a custom mlflow model. Basically, it is a class that implements a predict method which will be called automatically when serving the model, and a load_context method which manages the inputs your predict method needs. Inded, any logged model that respects this format can benefits from mlflow capabilites, including model service.

How does it works?

If you have read the KedroPipelineModel code described above, you can see that the KedroPipelineModel needs 2 elements to be logged in mlflow :

  • the predict function (here, your kedro pipeline used for prediction)
  • the data required for the predict function to work (here your lightgbm model, your imputer and a couple of parameters)

You can fill these arguments by hand but it can be a bit tedious and this is where the pipeline_ml comes into play: your predict pipeline is a higher order function, i.e. a function generated by the training pipeline. All the arguments your predict pipeline needs are generated by the training pipeline, and the pipeline_ml binds them together: it should have all the informations needed for logging. When the training pipeline is run, the associated "predict" is logged.

To do so, it applies the following process:

How to fix your bug

The error comes from the fact that training_pipeline.outputs() and predict_pipeline.inputs() do not have the same entries.
It may either comes from the fact that you do not use the same name for entries in the model (fix that!) or that the namespace introduces a prefix that changes the entries names. In the second case, it may a bug to investigate (I confess I have not tried pipeline_ml with namespaces which are quite recent in Kedro). Your pipeline should look like this (I'm coding on the fly, the code is not tested):

full_pipeline= Pipeline(
        [
            node(
                func=preprocess,
                inputs=dict(data="raw_data"),
                outputs="preprocessed_data",
                tags=["training", "inference"]
            ),
            node(
                func=fit_imputer,
                inputs=dict(data="preprocessed_data"),
                outputs="imputer",
                tags=["training"]
            ),
            node(
                func=transform_imputer,
                inputs=dict(data="preprocessed_data", imputer="imputer"),
                outputs="imputed_data",
                tags=["training", "inference"]
            ),
            node(
                func=train_model,
                inputs=dict(data="imputed_data"),
                outputs="model",
                tags=["training"]
            ),
            node(
                func=predict,
                inputs=dict(data="imputed_data", model="model"),
                outputs="predictions",
                tags=["inference"]
            ),

        ]
    )

### create the pipeline like you. Use kedro viz to see what's going on!
train_pipeline=full_pipeline.only_nodes_with_tags("training") # print the pipeline and check that train_pipeline.outputs={imputer, model} 
predict_pipeline=full_pipeline.only_nodes_with_tags("inference") # print the pipeline and check that predict_pipeline.inputs={raw_data, imputer, model} 

training = pipeline_ml(
         training=train_pipeline,
         inference=predict_pipeline,
         input_name="raw_data"
)

If you still struggle with the function and needs additional help, can you please share :

  • the result of training_pipeline.outputs(), predict_pipeline.inputs() and training.input_name (you must pass this argument)
  • the actual values of training_pipeline and predict_pipeline (i.e. the list of nodes). If you can show me code in a github repo it will make things easier to debug.

Additional infos

I will publish a detailed example in the docs this weekend, if you can wait until this it should make the explanation more clear I guess.

@laurids-reichardt
Copy link

Hi @Galileo-Galilei, thanks for your great work!

My understanding of pipeline_ml might be wrong, but I believe the requirement that training_pipeline.outputs() and predict_pipeline.inputs() are identical might be a bit too restrictive. Consider the following pipeline, which matches text strings to predefined labels:

def create_pipeline(**kwargs):
    return Pipeline(
        [
            node(
                name="Split Data",
                func=split_data,
                inputs=["text_samples", "parameters"],
                outputs=["X_train", "X_test", "y_train", "y_test"],
                tags=["training"],
            ),
            node(
                name="Fit MultiLabelBinarizer",
                func=fit_label_binarizer,
                inputs="y_train",
                outputs="mlb",
                tags=["training"],
            ),
            node(
                name="Transform Labels",
                func=transform_labels,
                inputs=["mlb", "y_train", "y_test"],
                outputs=["Y_train", "Y_test"],
                tags=["training"],
            ),
            node(
                name="Train Model",
                func=train_model,
                inputs=["X_train", "Y_train"],
                outputs="classifier",
                tags=["training"],
            ),
            node(
                name="Evaluate Model",
                func=evaluate_model,
                inputs=["classifier", "X_test", "Y_test"],
                outputs=None,
                tags=["evaluation"],
            ),
            node(
                name="Make Prediction",
                func=make_prediction,
                inputs=["classifier", "mlb", "features"],
                outputs=None,
                tags=["inference"],
            ),
        ]
    )

Nodes:

def split_data(text_samples: pd.DataFrame, parameters: Dict) -> List:
    # extract features
    X = text_samples["features"].values

    # extract labels
    y = text_samples["labels"].values

    # split dataset into train and test data
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=parameters["test_size"], random_state=parameters["random_state"]
    )

    return [X_train, X_test, y_train, y_test]

def fit_label_binarizer(y_train: np.ndarray) -> MultiLabelBinarizer:
    # multi label binarizer to transform data labels
    mlb = MultiLabelBinarizer()

    # fit the mlb on the train label data
    mlb.fit(y_train)

    return mlb

def transform_labels(mlb: MultiLabelBinarizer, y_train: np.ndarray, y_test: np.ndarray) -> List:
    # transform train label data
    Y_train = mlb.transform(y_train)

    # transform test label data
    Y_test = mlb.transform(y_test)

    return [Y_train, Y_test]


def train_model(X_train: np.ndarray, Y_train: np.ndarray) -> Pipeline:
    # scikit-learn classifier pipeline
    classifier = Pipeline(
        [
            ("vectorizer", CountVectorizer()),
            ("tfidf", TfidfTransformer()),
            ("clf", OneVsRestClassifier(LinearSVC())),
        ]
    )

    # fit the classifier on the train data
    classifier.fit(X_train, Y_train)

    return classifier


def evaluate_model(classifier: Pipeline, X_test: np.ndarray, Y_test: np.ndarray):
    # make prediction with test data
    predicted = classifier.predict(X_test)

    # accuracy score of the trained classifier
    accu = accuracy_score(Y_test, predicted)

    # log accuracy
    logger = logging.getLogger(__name__)
    logger.info("Model has an accuracy of %.3f", accu)


def make_prediction(classifier: Pipeline, mlb: MultiLabelBinarizer, features: np.ndarray) -> List:
    # model inference on features
    predicted = classifier.predict(features)

    # inverse transform prediction matrix back to string labels
    all_labels = mlb.inverse_transform(predicted)

    # map input values to predicted label
    predictions = []
    for item, labels in zip(values, all_labels):
        predictions.append({"value": item, "label": labels})

    # return predictions as list of dicts
    return predictions

The inference part does not only need access to the trained model but to the fitted MultiLabelBinarizer as well. Is this supported by pipeline_ml?

@Galileo-Galilei
Copy link
Owner

Galileo-Galilei commented Jul 19, 2020

Hello @laurids-reichardt, glad to see that you're playing around with the plugin!

Regarding your question, the access to other data (encoder, binarizer, ...) than the ml model is not only "possible" but exactly what pipeline_ml is designed for, because it is a very common pattern in ml applications. It enables even more complex pipelines (your prediction pipeline is composed of a single node, but you can have split the train_model scikit-learn pipeline in several kedro nodes, one for the vectorizer, one for tf-id and one for your classifier and it should still be able to handle this more complex use case).

A few remarks on pipeline_ml:

  • You are right that the condition training_pipeline.outputs() = predict_pipeline.inputs()-{input_name} is too restrictive: actually, we only need that predict_pipeline.inputs() are included in training_pipeline.outputs() training_pipeline.all_outputs()(but the pipeline may have other outputs, like metrics for instance). Nevertheless, I think it has no impact on what you are trying to do.
  • Mlflow models only accept one entry in their predict function, which should be a pandas.DataFrame. This is not a requirement of the plugin but of core mlflow, and I cannot change this (even if I find it too restrictive). Moreover, you cannot passs parameters at the runtime in the API, which is very restrictive too (e.g. you need to decide when you log the API if the labels are "inverse encoded" or not, you cannot let it as a parameter for the user).

The good news are that I think your code should almost work "as is". Can you check the following items:

  • Are both classifier and mlb persisted in the catalog.yml (as pickle.PicleDataSet for instance)? It is necessary to keep track of the objects for further reuse. They should appear in the mlflow ui in model/artifacts folder.
  • You will need to turn the features arguments of the make_prediction to a pandas.DataFrame to ensure to be able to serve your pipeline as an API (see above)
  • Is your pipeline properly declared as a pipeline_ml with input_name=features in your create_pipelines function?
  • Your make_predictions does not return anything in the pipeline: it must return an object (predictions?) which is not persisted in the catalog.yml and is retruened in a MemoryDataSet at the end of the pipeline.

In case these elements are not enough to solve your problem, what is the error message you get ? Can you share a sample of the data you use to make the problem reproducible?

EDIT: It is a bug. The pipeline_ml considers only "terminal" outputs (i.e. which are not inputs of other nodes), and since your mlb is reused in your training pipeline (in the transform label), it crashes. This behaviour is very strange, I was sure that there was a test for it. I try to commit a fix ASAP. To make your pipeline work in current state, just return the binarizer as a terminal output (for example, make transform_labels return it unmodified).

EDIT2: It should be fixed. Do all above modifications and install hotfix-pipeline-ml version of kedro-mlflow with below command. I've tried with the example code you give above, and everything seems to be fine. I'll add some tests in coming weeks and I will merge it to develop.

pip install --upgrade git+https://github.com/Galileo-Galilei/kedro-mlflow.git@hotfix-pipeline-ml

@laurids-reichardt
Copy link

Yeah, thanks for the quick answer! The hotfix works. For reference, the url is: git+https://github.com/Galileo-Galilei/kedro-mlflow.git@hotfix-pipeline-ml

@Galileo-Galilei
Copy link
Owner

Indeed, wrong copy pasting, sorry.

Does It "works" only means that it is properly stored in mlflow or did you set the API up and tried it out?

@laurids-reichardt
Copy link

Here's my current implementation: https://github.com/laurids-reichardt/kedro-examples/blob/kedro-mlflow-hotfix2/text-classification/src/text_classification/pipelines/pipeline.py

kedro run --pipeline=kedro_mlflow runs without issues and logs the mlb and classifier as artifacts inside the mlruns folder.

However I get the following error while trying to make some predictions:

❯ mlflow models predict -i ./predict_input.csv -m ./mlruns/1/887715ed7000444fae966f99376ea870/artifacts/text_classification --no-conda
2020/07/19 23:46:55 INFO mlflow.models.cli: Selected backend for flavor 'python_function'
Traceback (most recent call last):
  File "/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/bin/mlflow", line 8, in <module>
    sys.exit(cli())
  File "/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/lib/python3.7/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/lib/python3.7/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/lib/python3.7/site-packages/mlflow/models/cli.py", line 93, in predict
    json_format=json_format)
  File "/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/lib/python3.7/site-packages/mlflow/pyfunc/backend.py", line 65, in predict
    json_format)
  File "/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/lib/python3.7/site-packages/mlflow/pyfunc/scoring_server/__init__.py", line 222, in _predict
    pyfunc_model = load_model(model_uri)
  File "/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/lib/python3.7/site-packages/mlflow/pyfunc/__init__.py", line 466, in load_model
    model_impl = importlib.import_module(conf[MAIN])._load_pyfunc(data_path)
  File "/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/lib/python3.7/site-packages/mlflow/pyfunc/model.py", line 209, in _load_pyfunc
    python_model = cloudpickle.load(f)
ModuleNotFoundError: No module named 'text_classification'

My current guess would be that the issue stems from the fact that I use pyenv+venv to resolve my dependencies instead of conda. I'll investigate further and report back.

@Galileo-Galilei
Copy link
Owner

It sounds like your own package is not installed as a python package. What does pip show text_classification return? In your active environment and at the root of your kedro project , can you try:

cd src
pip install -e . 

@laurids-reichardt
Copy link

laurids-reichardt commented Jul 20, 2020

You're right, pip install -e ./src did the trick. I should read up on Python modules. 😃

Now it works without issues. Thanks for your great support!

❯ model_id=5372a2d2198b443382922a67c3dc3a40; mlflow models predict -i ./predict_input.csv -m ./mlruns/1/${model_id}/artifacts/text_classification --no-conda -t 'csv'
2020/07/20 11:49:40 INFO mlflow.models.cli: Selected backend for flavor 'python_function'
/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/lib/python3.7/site-packages/fsspec/implementations/local.py:33: FutureWarning: The default value of auto_mkdir=True has been deprecated and will be changed to auto_mkdir=False by default in a future release.
  FutureWarning,
{"predictions": [{"value": "The weather in london is rainy as always.", "label": ["new york"]}, {"value": "England just lost the football game.", "label": ["london"]}, {"value": "New York is called the Big Apple.", "label": ["new york"]}]}%

@laurids-reichardt
Copy link

I converted the scikit-learn classifier pipeline to a kedro pipeline as well: https://github.com/laurids-reichardt/kedro-examples/blob/master/text-classification/docs/kedro-pipeline.svg

@Galileo-Galilei Galileo-Galilei self-assigned this Jul 21, 2020
@Galileo-Galilei Galileo-Galilei added bug Something isn't working documentation Improvements or additions to documentation labels Jul 21, 2020
@Galileo-Galilei Galileo-Galilei changed the title Understending MLPipeline. Understanding PipelineML and pipeline_ml Jul 21, 2020
Galileo-Galilei added a commit that referenced this issue Jul 21, 2020
…uced by the training pipeline was an intermediary output instead of a terminal one
@Galileo-Galilei
Copy link
Owner

Galileo-Galilei commented Jul 21, 2020

I have just added unit tests, updated the changelog and merged this fix to develop. It will be released to pypi soon. I let the issue opened because apart from this bugfix, it is the best documentation of the pipeline_ml function so far.

Short digression on module

Regarding the module, I always recommend to install your kedro package as a module. In you don't, you can perform relative import between your scripts, but they depend on your working directory. This may lead to annoying issues because:

  • the working directory may be set by your IDE (typically, VSCode set it to the location of the first script you launch the interactive window from). It is very error prone because it changes across your different VSCode sessions (if you launch a notebook at the root of the kedro project, or a script inside the src/pipelines/ folder it will define you working directory for the whole session...).
  • It even depends on the launcher you use (the Vscode Interactive window, the python of your active environment in the console and Jupyter may have different behaviours)
  • Even Kedro itself (in the first versions, I haven't check if they changed this) was messing up with your working directory when calling some commands and it have lead to bug in the past (when a command failed and did not restore the path)

For all these reasons, I found it much more stable to install your project as a python package with pip.

The -e flag means that you install the package in editable mode, which means that it is not installed in the directory of your virtual env (C:\Users\YOU\Anaconda3\envs\YOUR_ENV) but it points towards your local project (try pip freeze and look for your package to see what I mean). This means that any changes in your code is immediately taken into account without reinstalling the package.

A better way to specify environment in kedro-mlflow

Note that with all this in mind, you can specifiy the conda_env arg of MlflowPipelineHook to ensure that your model will be reproducible for anyone which can pip install your module (from github for instance)

from your_package import __version__as pkg_version # <-- add this
from kedro_mlflow.framework.hooks import MlflowNodeHook, MlflowPipelineHook
from pk.pipeline import create_pipelines

class ProjectContext(KedroContext):
    """Users can override the remaining methods from the parent class here,
    or create new ones (e.g. as required by plugins)
    """

    project_name = "YOUR PROJECT NAME"
    project_version = "0.16.X"
    hooks = (
        MlflowNodeHook(flatten_dict_params=False),
        MlflowPipelineHook(model_name="YOUR_PYTHON_PACKAGE",
                           conda_env={ "python": YOUR_PYTHON_VERSION,
                                       "dependencies": {f"your_package=={pkg_version}"}) # <-- and this
    )

@Galileo-Galilei Galileo-Galilei added good first issue and removed bug Something isn't working labels Jul 21, 2020
Galileo-Galilei added a commit that referenced this issue Aug 6, 2020
…act produced by the training pipeline was an intermediary output instead of a terminal one"

This reverts commit c0c0b32.
Galileo-Galilei added a commit that referenced this issue Aug 6, 2020
…act produced by the training pipeline was an intermediary output instead of a terminal one"

This reverts commit c0c0b32.
@Galileo-Galilei Galileo-Galilei added this to the Release 0.4.0 milestone Sep 29, 2020
@Galileo-Galilei Galileo-Galilei removed this from the Release 0.4.0 milestone Oct 19, 2020
@Galileo-Galilei
Copy link
Owner

This issue is closed since :

Feel free to reopen if needed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation
Projects
Status: ✅ Done
Development

Successfully merging a pull request may close this issue.

3 participants