How to implement MLOps in datascience projects using the example of forecasting stock movements using MLRun

MLOps: structuring the path from model concept to real-world impact

In today’s data-driven world, machine learning is a symbol of innovation. However, transforming a model from a theoretical construct into a ready-to-use tool is a challenging task. Discover MLOps, a seamless blend of “machine learning” and “operations” designed to solve this complex problem.

Taking inspiration from DevOps in software development, MLOps coalesces around core objectives, offering a structured path for ML projects throughout their lifecycle.

Now let’s dive deeper into the world of MLOps with MLRun.

Key aspects of MLOps include:

  • Model Development & Training: MLOps starts at the inception of the model, bringing together best practices and tools to help you effectively build and train models.

  • Feature Store. The MLOps Central Feature Repository acts as a single repository for feature management. It makes it easier to find, store, and share features, ensuring consistency and speeding up model development.

  • Validation & Testing. Before deployment, models undergo rigorous automated testing to ensure they meet performance criteria and are ready to solve real-world problems.

  • Continuous Integration & Continuous Deployment & Continuous Training (CI/CD/CT): . Thanks to the dynamic nature of ML models, CI/CD/CT pipelines ensure smooth updates and retraining processes, keeping models optimized and up-to-date.

  • Scaling & Orchestration. Once models are released into production, changing requirements may need to be met. MLOps emphasizes scalability using technologies such as containerization (like Docker) and orchestration (like Kubernetes).

  • Model Versioning. As models evolve, tracking their iterations is of paramount importance. MLOps provides versioning tools that allow you to easily access and manage different versions of a model.

  • Monitoring & Feedback Loopsare olated. MLOps tools track performance metrics, offer feedback channels, and include mechanisms for refinement and retraining based on real-time information.

  • End-to-End Automation: By minimizing manual intervention, MLOps aims to automate the steps from data pre-processing to model validation, ensuring efficiency and reducing potential errors.

  • Collaboration & Communication:. By supporting a holistic approach, MLOps facilitates collaboration between scientists, machine learning engineers, and IT professionals, ensuring synergy and alignment of goals.

Introduction to MLRun: Innovative Open Source MLOps Solutions for the Modern Era

In the complex MLOps space, there is growing demand for platforms that comprehensively cover the machine learning lifecycle. One of the outstanding contenders to answer this call is MLRun.

MLRun is an open source MLOps platform carefully designed to simplify the complexities associated with machine learning projects. Its multifaceted capabilities set it apart from others:

  • Feature store. The MLRun function store acts as a centralized hub, streamlining the management, storage and retrieval of data functions. This speeds up model development and ensures consistent iterations and experiments.

  • Experiment Tracking. The iterative nature of machine learning modeling requires robust tracking. The MLRun experiment carefully tracks the details of each run, recording parameters, configurations, and results. This ensures clarity of model evolution and simplifies comparative analysis of experiments.

  • Model Deployment. Key to MLRun’s deployment strategy is its integration with Docker, an application containerization platform. Models, along with their dependencies, are packaged in Docker containers, ensuring consistency and portability between environments.

  • Workflows MLRun recognizes the complexity of machine learning projects, which often involve a number of interdependent tasks. It supports organized workflows by enabling seamless connectivity and structured execution of tasks such as data provisioning, training, validation and deployment.

  • Automation and CI/CD. At its core, MLRun recognizes the importance of automation in modern MLOps. It seamlessly integrates with continuous integration and continuous deployment (CI/CD) pipelines, automating tasks from code integration to model deployment, providing flexibility and reducing manual tasks.

  • Kubernetes integration. The requirements of modern machine learning projects often require scalable and flexible infrastructure. MLRun’s native integration with Kubernetes ensures efficient workload orchestration, enabling dynamic scaling based on project requirements.

Overall, MLRun is not just an MLOps solution, it is a comprehensive set of tools tailored for the modern machine learning workflow. Its open source ethos ensures its adaptability and promotes a community-focused approach, making it an invaluable asset in today’s data-driven environment.

Purpose of the article

Our main goal is to talk about the use of MLRun as an MLOps tool, using data from a current competition on Kaggle: “Optiver – Trading at the Close” This competition is dedicated to predicting the closing dynamics of stocks in the American market.

Don’t worry: you don’t need in-depth investment knowledge. We will operate the code from one notebook and integrate it into an automated pipeline using MLRun.

Installing MLRun locally using Docker

MLRun is a universal platform that supports installation on a Kubernetes cluster, cloud servers, or even on premises. Now we will look at how to set up MLRun locally using Docker Compose.

Steps for setup:

  1. Download compose. yaml file from the official MLRun documentation [Download link].

  2. Set up environment variables:

export HOST_IP=<адрес вашего хоста>
export SHARED_DIR=~/mlrun-data
mkdir $SHARED_DIR -p

Here shared_DIR — the directory on your computer in which MLRun artifacts and the database will be saved.

  1. Run docker compose to install the necessary services:

docker-compose -f compose.yaml up -d

After this, you will have the following services active:

First steps

First, download the competition training dataset by link.

We focus on the transition from a model-centric approach to a data-centric one. And first of all, let’s create a feature store.

Feature Set

A Feature Set is a group of features that can be combined to form a logical unit (for example, “stocks” or “user_events”).

When creating a Feature Set, you can define a transformation graph for incoming features, data for which can come from both offline storage and streaming services, for example, Kafka. When data passes through the created Feature Set with a given transformation graph, the resulting features are stored in offline storage for subsequent training and in online format for inference.

This approach to creating and registering a transformation graph allows us to guarantee that the features used in inference are identical to those used in training.

Using MLRun to Process Feature Set

Let’s start by importing mlrun:

import mlrun

Let’s create a project:

project = mlrun.get_or_create_project("stock-prediction", context="./", user_project=True)

A project in MLRun is a container for all your work, which stores all the code, all the functions (mlrun functions. See below.), pipelines, artifacts like models, dataset and metrics, features and configurations.

The context parameter is where all the project code is stored. The user_project parameter makes the project name unique for each user.

Let’s load and preprocess our data:

stocks_df = pd.read_csv("./optiver-trading-at-the-close/train.csv")
# Урежем данные для ускорения процесса тестирования:
stocks_df = stocks_df[:50000]

If we look at the signs, we will see several related to a particular price and its conditional amount *_size. It is these signs that will interest us, since on their basis we will make feature engineering.

stock_df. head()

Feature Engineering:

Based on code from Kaggle notebook competition we will create a function imbalance_calculatorwhich will generate new features.

def imbalance_calculator(x: pd.DataFrame, context=None) -> pd.DataFrame:
    x_copy = x.copy()

    
    x_copy['imb_s1'] = x.eval('(bid_size - ask_size) / (bid_size + ask_size)')
    x_copy['imb_s2'] = x.eval('(imbalance_size - matched_size) / (matched_size + imbalance_size)')
    
    prices = ['reference_price','far_price', 'near_price', 'ask_price', 'bid_price', 'wap']
    
    for i,a in enumerate(prices):
        for j,b in enumerate(prices):
            if i>j:
                x_copy[f'{a}_{b}_imb'] = x.eval(f'({a} - {b}) / ({a} + {b})')
                    
    for i,a in enumerate(prices):
        for j,b in enumerate(prices):
            for k,c in enumerate(prices):
                if i>j and j>k:
                    max_ = x[[a,b,c]].max(axis=1)
                    min_ = x[[a,b,c]].min(axis=1)
                    mid_ = x[[a,b,c]].sum(axis=1)-min_-max_

                    x_copy[f'{a}_{b}_{c}_imb2'] = (max_-mid_)/(mid_-min_)
    
    return x_copy

Now that we have a feature engineering function and a dataset, we can create Feature Set:

import mlrun.feature_store as fstore

stocks_set = fstore.FeatureSet("stocks", entities=[fstore.Entity("row_id")])

The first argument is the name, and Entity is the unique value of the string, the index. In this dataset, this is row_id.

And now we’ll simply add our imbalance_calculator to the graph in the form feature set transformation:

stocks_set.graph.to(name="imbalance_calculator", handler="imbalance_calculator")

We also need to add targets where the signs will be saved. We need offline and online. For offline purposes, for simplicity, we will use the file. parquet in our directory, and as online redis:

from mlrun.datastore.targets import ParquetTarget, RedisNoSqlTarget

offline_target = ParquetTarget(
    name="stocks", path=f"./stocks.parquet"
)
online_target = RedisNoSqlTarget(path ="redis://0.0.0.0:6379")

stocks_set.set_targets([offline_target, online_target], with_defaults=False)

By default, offline and online are on the Iquazio platform – V3IO frames service, from the creators of mlrun. To use them, you need to create an account and set credentials.

You can build redis using redis-stack, which contains a ui in which you can view the attributes:

docker run -d --name redis-stack -p 6379:6379 -p 8001:8001 redis/redis-stack:latest

RedisInsight – http://localhost:8001

Now we can visualize our stock_set graph:

stocks_set.plot(rankdir="LR", with_targets=True)

Thus, during the append process, the data first passes through `imbalance_calculator`, after which the results are saved in . parquet for subsequent training and in redis for inference.

Now let’s do it ingestthat is, we will send our data set to this graph:

stocks_df = fstore.ingest(stocks_set, stocks_df)

The result can be seen in MLRun UI localhost:8060 in the project’s Feature Store, in RedisInsight and, accordingly, in the parquet file in the project directory.

Creating and Using Feature Vectors

After generating the Feature Set, the next step is to create the Feature Vector. A Feature Vector represents the input to the model and consists of a group of features collected from different Feature Set’s. In our case, we only have one Feature Set, so we will use all of its features.

An example of how to create a Feature Vector from a Feature Set can be found in the MLRun user interface:

fv_name = "stocks_fv"
features = ["stocks.*"]

stocks_fv = fstore.FeatureVector(
    fv_name,
    features=features,
    label_feature="stocks.target",
    description="Predict US stocks closing movements"
)

Here label_feature indicates what we are trying to predict. Once created, the Feature Vector can be saved and viewed in the MLRun user interface:

stocks_fv.save()

Automation of model training and deployment

Now, having the Feature Vector in hand, let’s automate the model training process. Our goal is to regularly update and deploy the model based on the latest data that comes into our offline and online repositories. We will use Kafka to simulate the data flow.

Let’s choose XGBRegressor as a model for predicting stock closures. While model ensembles can improve performance in many cases, in this example we’ll focus on a simple solution to better understand the underlying concept.

Learn function with MLRun

First, let’s create a file trainer. py with learning function:

%%writefile trainer.py

import mlrun
from mlrun.execution import MLClientCtx
from mlrun.frameworks.xgboost import apply_mlrun
from xgboost import XGBRegressor
from sklearn.model_selection import train_test_split

def train(context: MLClientCtx, dataset: mlrun.DataItem, objective: str, tree_method: str, n_estimators: int, label_column: str = "target", model_name: str = "stock_prediction_xgboost"):
    # Получение данных
    df = dataset.as_df()
    X = df.drop(label_column, axis=1)
    y = df[label_column]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    # Инициализация модели
    model = XGBRegressor(objective=objective, tree_method=tree_method, n_estimators=n_estimators)

    # Интеграция MLOps
    apply_mlrun(model=model, model_name=model_name, x_test=X_test, y_test=y_test)

    context.logger.info(f"training '{model_name}'")
    model.fit(X_train, y_train)

MLRun provides native integration with many frameworks, including XGBoost. The apply_mlrun function automatically saves training artifacts such as the model, test dataset, metrics and training statistics specific to XGBoost.

Using Functions in MLRun

MLRun uses the concept of “Functions”, which are Docker containers that execute custom code in real time or in batch mode. To run our training function, we need such a Function.

Let’s convert our code to MLRun”job” batch function that runs code in the Kubernetes Pod:

regressor_fn = mlrun.code_to_function(
    name="custom-trainer",
    filename="trainer.py",
    kind="job",
    image="mlrun/mlrun",
    handler="train",
)

Now we have a Function with its own image and code. To use this Function later, it must be added to the project:

project.set_function(regressor_fn)

And accordingly, save the project itself, which will change project. yaml file in the working directory, which is responsible for the entire project configuration:

project.save()

Now we can run this Function using run_function from mlrun by specifying its name:

train_run = mlrun.run_function(
        "custom-trainer",
        name="custom-trainer-run",
        handler="train",
        params={
            "label_column": "target", 
            "objective": "reg:absoluteerror", 
            "tree_method": "hist", 
            "n_estimators": 5000
        },
        inputs={"dataset": project.get_artifact_uri("stocks_fv", "feature-vector")},
        local=True
    )

In this function, dataset is a reference to our Feature Vector. You can also get this link directly from the UI.

We use local=Truewhich instead of running the Function in the container, runs it locally.

Once completed, you can view the output of the function:

outputs are artifacts that are saved to the project database, which can then be retrieved for use.

When we look at the pipeline below, the outputs of the Function will be used to pass as parameters to the next one.

Model Serving: From Batch to Real-time

After training the model, the next step in our pipeline is to “serve” it. To do this, we will need to create a custom function to deploy the model.

%%writefile serving.py

import numpy as np
from cloudpickle import load
from mlrun.serving.v2_serving import V2ModelServer
import mlrun

class RegressorModel(V2ModelServer):
    def load(self):
        """Load and initialize the model and other components."""
        model_file, _ = self.get_model(".pkl")
        self.model = load(open(model_file, "rb"))
        self.feature_service = mlrun.feature_store.get_online_feature_service('stocks_fv')
        
    def preprocess(self, body: dict) -> list:
        """Preprocess input data before prediction."""
        vectors = self.feature_service.get([{'row_id': row_id} for row_id in body['inputs']])
        vectors[0].pop('target', None)
        body['inputs'] = list(vectors[0].values())
        return body

    def predict(self, body: dict) -> list:
        """Predict using the model."""
        feats = np.asarray([body["inputs"]])
        result = self.model.predict(feats)
        return result.tolist()

For training, we used the MLRun function of the “job” type, which works in batch mode. Now we need a function of the “serving” type, which is real-time and works based on Nuclio – Serverless Functions.

When we use the serving Function with custom model inference logic, we need to create a class that inherits from mlrun. serving. v2_serving. V2ModelServer and implements at least two methods load and predict. Above we added preprocess, you can also add postprocess, validate and explain.

To create a serving Function for the class above, one option is to also call code_to_function, but with kind=`serving`:

serving_fn = mlrun.code_to_function(
    "stocks_serving",
    filename="serving.py",
    kind="serving",
    image="mlrun/mlrun"
)

For serving_fn to work, a model must be passed into it. To do this you need to call add_model:

serving_fn.add_model(
    "stock_prediction_xgboost", 
    model_path=train_run.outputs[“model”],
    class_name="RegressorModel"
)

The result of the previous training function, the uri of the model, which is already an artifact, is passed here as model_path.

You can see graph serving_fn:

serving_fn.spec.graph.plot()

Now we can run a mock server to access the model via the api:

local_server = serving_fn.to_mock_server()

Since we added a model, we can also list the models that a given Function or service has:

local_server.test("/v2/models/", method="GET")

Creating MLRun Workflow

Now that we have all the components, let’s move on to creating the MLRun Workflow. These workflows are Directed Acyclic Graph (DAG) that run various MLRun functions sequentially.

MLRun pipelines can be run locally or as part of Kubeflow Pipelines. They automate the execution of various stages of the ML process, such as data collection, model training, and deployment. The pipeline can be launched either on a schedule or through triggers, for example, during data or model drift.

%%writefile workflow.py

import mlrun
from kfp import dsl

@dsl.pipeline(
    name="Stock prediction pipeline",
    description="Predict US stocks closing movements"
)
def pipeline(vector_name: str = "stocks_fv"):
    project = mlrun.get_current_project()

    train_run = mlrun.run_function(
        "custom_trainer",
        name="custom_trainer_run",
        handler="train",
        params={
            "label_column": "target", 
            "objective": "reg:absoluteerror", 
            "tree_method": "hist", 
            "n_estimators": 500
        },
        inputs={"dataset": project.get_artifact_uri("stocks_fv", "feature-vector")},
        outputs=["model"]
    )

    serving_fn = mlrun.code_to_function(
        "stocks-serving",
        filename="serving.py",
        kind="serving",
        image="mlrun/mlrun"
    )
    serving_fn.add_model(
        "stock_prediction_xgboost", 
        model_path=train_run.outputs["model"],
        class_name="RegressorModel"
    )

    serving_fn.deploy()

Here we do not collect and process data, since we have a Feature Store, which will receive data through an event streaming platform such as kafka. A little later we will launch a real-time function that will receive events from kafka and transfer it to the Feature Store to perform transformations and save the results in the offline (parquet) and online (redis) store.

In our Worklow, we add the @dsl.pipeline(…) decorator so that it works on Kubeflow.

In the inputs for the training function, we pass the uri Feature Vector – project.get_artifact_uri(“stocks_fv”, “feature-vector”), which receives from the offline storage the current processed features in the required format, which are added there in real time.

The trained model is passed to the serving Function for its further use – train_run.outputs[“model”].

And lastly, to make the deployment, we simply run serving_fn. deploy(), which creates an API service and wraps it in a Docker container, which is automatically deployed to the Kubernetes cluster.

After completing the pipeline, we should have a working api with the model. When we ran the server simulation, we accessed the model like this:

sample_id = "4_410_107"
model_inference_path = "/v2/models/stock_prediction_xgboost/infer"
local_server.test(path=model_inference_path, body={"inputs": [sample_id]})

Now, we can create a module to access the model using requests, or use curl:

curl -X 'POST' 'http://host:32768/v2/models/stock_prediction_xgboost/infer' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{"inputs": ["4_410_107"]}'

Deploying Feature Set for real-time service with Apache Kafka

Now our task is to deploy a Feature Set that will work in real time, receiving events from Apache Kafka.

Installing Apache Kafka via Docker

  1. Download the configuration file for Apache Kafka:

curl -sSL https://raw.githubusercontent.com/bitnami/containers/main/bitnami/kafka/docker-compose.yaml > docker-kafka-compose.yaml
  1. Start Kafka using Docker Compose:

docker compose -f docker-kafka-compose.yml up -d
  1. Create a topic called `stocks_topic`:

docker exec -it chris-kafka-1 kafka-topics.sh --create --topic stocks-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

Integration with MLRun

  1. Get yours FeatureSet:

stocks_set = fs.get_feature_set("stocks", project="stock-prediction-user_name")
  1. Install KafkaSource to receive events:

from mlrun.datastore.sources import KafkaSource

kafka_source = KafkaSource(
    brokers=['0.0.0.0:9092'],
    topics="stocks-topic",
    initial_offset="earliest",
    group="my_group",
)
  1. And let’s deploy our ingestion service:

stocks_set_endpoint = fstore.deploy_ingestion_service(
    featureset=stocks_set, source=kafka_source
)

Now we can make a request to the deployed Ingestion service via the API, which will convert new data into the necessary features, allowing us to be sure that the same format is used both during training and during inference.

Navigating the Future of MLOps

We have successfully completed our journey of introducing MLRun and general MLOps concepts. I hope you found this tutorial helpful in your learning and application of machine learning tools.

For any questions, write to me in telegram – @NLavrenov00

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *