Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Specifying run_id in catalog rewrites history #566

Open
JaimeArboleda opened this issue Jul 23, 2024 · 1 comment
Open

Specifying run_id in catalog rewrites history #566

JaimeArboleda opened this issue Jul 23, 2024 · 1 comment

Comments

@JaimeArboleda
Copy link

Hello!

First of all thanks for open sourcing your plugin, which is very well documented and a great addition to the MLOps ecosystem. We work for a medium sized organization and we are building our MLOps toolset around kedro, mlflow and your plugin.

There is a problem that we are facing now and we are unsure about the way to solve it. Maybe you have thought about it and there is a clean solution within your plugin, but we don't see it and we are trying to add some extra functionality. But I want to expose the problem because it looks like something that should be common and maybe there is a better way to deal with it.

Typically we have the following pipelines:

  • preprocessing: where we join and clean our data sources.
  • train: which includes some transformation steps (like feature engineering) and actually training the model.
  • inference: for doing predictions.
  • evaluate: for evaluating the model...

Of course, the output of preprocessing is the input of train. They are different pipelines and in fact, in many of our projects, even the runtime environment is different (preprocessing uses spark and train uses pandas/numpy/xgboost and other python libraries that use in-memory computation).

But in many projects we have several versions of preprocessing (because we might have different ways of cleaning the data, we might discard or not some particular data source and so on). We connect the two pipelines using run_id.

So let's say that we are happy with a particular execution of preprocessing. Then, in our catalog, we will add the run_id to specify that our training dataset is the one generated by that particular run:

df_train:
    type: kedro_mlflow.io.artifacts.MlflowArtifactDataset
    dataset:
        type: pandas.ParquetDataset
        filepath: /home/f0099337/project/df_train.pq
    run_id: 29309b32d57b49dbb89d51254e055960

model:
    type: kedro_mlflow.io.models.MlflowModelTrackingDataset
    flavor: mlflow.sklearn

Now, the problem is that the class kedro_mlflow.io.artifacts.MlflowArtifactDataset overwrites the path with this specific run_id if you try to execute the preprocessing pipeline. And this is not what we want: we would like to have the possibility of running the preprocessing pipeline again (and saving the result in the new run_id generated by mlflow), but what happens is that if you run preprocessing then the result overwrites the output of the run_id specified in the catalog, therefore "altering the history". We would like that this run_id specified in the catalog only affects the version of the df_train that is read when executing the train pipeline, but not the one that is written when running preprocessing.

For a minimal example, let me add a dumb preprocessing and train pipeline:

preprocessing pipeline:

from kedro.pipeline import Pipeline, pipeline, node
import pandas as pd

def preprocess() -> pd.DataFrame:
    df = pd.DataFrame(data={"feature": [21, 34], "label": [0, 1]})
    return df

def create_pipeline(**kwargs) -> Pipeline:
    return pipeline([node(func=preprocess, inputs=None, outputs="df_train", name="preprocessing")])

train pipeline:

from kedro.pipeline import Pipeline, pipeline, node
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline as SKPipeline
import pandas as pd


def train(df: pd.DataFrame) -> SKPipeline:
    pipe = Pipeline([('scaler', StandardScaler()), ('svc', SVC())]
    pipe.fit(X_train, y_train).score(X_test, y_test)
    return pipe


def create_pipeline(**kwargs) -> Pipeline:
    return pipeline(
        [node(func=train, inputs=["df_train"], outputs="model", name="train")]
    )

As I said, if I try to execute the train pipeline, it will correctly take the df_train corresponding to the run_id that I specified in the catalog. However, if I execute another run of the preprocessing pipeline, for example with different parameters, instead of storing the result in a new run_id, it will overwrite the run_id specified in the catalog.

Sorry for the long message, but I tried to make the issue as clear as possible.

Thanks in advance!

@Galileo-Galilei
Copy link
Owner

Hi,

first of all sorry for the long delay with no news.

This is the intended behaviour : the run_id argument is meant to specify specifically in which run_id you want to load / save a model or an artifact (here, your df_train) from / to. However, I understand that in a typicla ML workflow, you often want to "read" from a specific mlflow run, but not necessarily write to it.

Current workaround

I guess the best workaround is to use kedro envrionments to modify the run_id depending on the environment, something like

# conf/base/catalog.yml
df_train:
    type: kedro_mlflow.io.artifacts.MlflowArtifactDataset
    dataset:
        type: pandas.ParquetDataset
        filepath: /home/f0099337/project/df_train.pq
    run_id: ${globals: df_train_run_id, null} # if no df_train_run_id is specified in the environment, default to None so log in active run

and

# conf/training/globals.yml # Notice the /training folder which is a newly created environment
df_train_run_id: <29309b32d57b49dbb89d51254e055960>

Now you can :

  1. run kedro run -p preprocessing to run the preprocessing pipeline and log in a new run id
  2. run kedro run -p training -e training to run the training pipeline and use the df_train_run_id specified in the training environment

Long term implementation

It would make sense in kedro-mlflow to be able to specify a different run_id for load and save:

df_train:
    type: kedro_mlflow.io.artifacts.MlflowArtifactDataset
    dataset:
        type: pandas.ParquetDataset
        filepath: /home/f0099337/project/df_train.pq
    load_args:
        run_id: ...
    save_args:
        run_id: ...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: 🔖 Ready
Development

No branches or pull requests

2 participants