Extending Spark with MLflow
The purpose of this article is to talk about our first experience of using MLflow…
We’ll start reviewing MLflow from its tracking server and run all the iterations of the study. Then we will share our experience of connecting Spark to MLflow using UDFs.
Context
We are in Alpha health We use machine learning and artificial intelligence to empower people to take care of their health and well-being. This is why machine learning models are at the heart of the data products we develop, which is why our attention was drawn to MLflow, an open source platform that covers all aspects of the machine learning lifecycle.
MLflow
The main goal of MLflow is to provide an additional layer on top of machine learning that would allow data scientists to work with almost any machine learning library (h2o, keras, mleap, pytorch, sklearn and tensorflow), taking her work to a new level.
MLflow provides three components:
- Tracking – recording and querying experiments: code, data, configuration and results. It is very important to follow the process of creating the model.
- Projects – Packaging format to run on any platform (for example, SageMaker)
- Models – a common format for submitting models to various deployment tools.
MLflow (alpha at the time of this writing) is an open source platform that allows you to manage the machine learning lifecycle, including experimentation, reuse, and deployment.
Configuring MLflow
To use MLflow, you first need to set up the entire Python environment, for this we will use PyEnv (to install Python on Mac take a look here). So we can create a virtual environment where we will install all the libraries necessary to run.
```
pyenv install 3.7.0
pyenv global 3.7.0 # Use Python 3.7
mkvirtualenv mlflow # Create a Virtual Env with Python 3.7
workon mlflow
```
Install the required libraries.
```
pip install mlflow==0.7.0
Cython==0.29
numpy==1.14.5
pandas==0.23.4
pyarrow==0.11.0
```
Note: we are using PyArrow to run models like UDFs. The PyArrow and Numpy versions needed to be fixed because the latest versions were conflicting.
Launch Tracking UI
MLflow Tracking allows us to log and query experiments using Python and REST API. In addition, you can define where to store model artifacts (localhost, Amazon S3, Azure Blob Storage, Google Cloud Storage or SFTP server). Since we are using AWS at Alpha Health, S3 will be used as the storage for the artifacts.
# Running a Tracking Server
mlflow server
--file-store /tmp/mlflow/fileStore
--default-artifact-root s3:///mlflow/artifacts/
--host localhost
--port 5000
MLflow recommends using persistent file storage. File storage is where the server will store run and experiment metadata. When starting the server, make sure it points to persistent file storage. Here, for the experiment, we’ll just use /tmp
…
Remember that if we want to use the mlflow server to run old experiments, they must be present in the file store. However, even without this, we would be able to use them in the UDF, since we only need the path to the model.
Note: Keep in mind that the Tracking UI and the model client must have access to the location of the artifact. That is, regardless of the fact that the Tracking UI is located in the EC2 instance, when MLflow is launched locally, the machine must have direct access to S3 to write artifact models.
Tracking UI stores artifacts in an S3 bucket
Running models
Once the Tracking Server is running, you can start training the models.
As an example, we’ll use the wine modification from the MLflow example in Sklearn…
MLFLOW_TRACKING_URI=http://localhost:5000 python wine_quality.py
--alpha 0.9
--l1_ration 0.5
--wine_file ./data/winequality-red.csv
As we already said, MLflow allows you to log parameters, metrics and artifacts of models so that you can track how they develop as you iterate. This feature is extremely useful, as this way we can reproduce the best model by contacting the Tracking server or by understanding which code has performed the required iteration using the git hash commit logs.
with mlflow.start_run():
... model ...
mlflow.log_param("source", wine_path)
mlflow.log_param("alpha", alpha)
mlflow.log_param("l1_ratio", l1_ratio)
mlflow.log_metric("rmse", rmse)
mlflow.log_metric("r2", r2)
mlflow.log_metric("mae", mae)
mlflow.set_tag('domain', 'wine')
mlflow.set_tag('predict', 'quality')
mlflow.sklearn.log_model(lr, "model")
Wine iterations
Server part for the model
The MLflow tracking server, launched with the “mlflow server” command, has a REST API for tracking launches and writing data to the local file system. You can specify the tracking server address using the environment variable “MLFLOW_TRACKING_URI” and the MLflow tracking API will automatically contact the tracking server at this address to create / get launch information, log metrics, etc.
To provide the model with a server, we need a running tracking server (see the launch interface) and a Run ID of the model.
Run ID
# Serve a sklearn model through 127.0.0.0:5005
MLFLOW_TRACKING_URI=http://0.0.0.0:5000 mlflow sklearn serve
--port 5005
--run_id 0f8691808e914d1087cf097a08730f17
--model-path model
To serve models using the MLflow serve functionality, we need access to the Tracking UI in order to get information about the model simply by specifying --run_id
…
Once the model communicates with the Tracking Server, we can get the new model endpoint.
# Query Tracking Server Endpoint
curl -X POST
http://127.0.0.1:5005/invocations
-H 'Content-Type: application/json'
-d '[
{
"fixed acidity": 3.42,
"volatile acidity": 1.66,
"citric acid": 0.48,
"residual sugar": 4.2,
"chloridessssss": 0.229,
"free sulfur dsioxide": 19,
"total sulfur dioxide": 25,
"density": 1.98,
"pH": 5.33,
"sulphates": 4.39,
"alcohol": 10.8
}
]'
> {"predictions": [5.825055635303461]}
Running models from Spark
Despite the fact that the Tracking server is powerful enough to serve the models in real time, train them and use the serve functionality (source: mlflow // docs // models # local), the use of Spark (batch or streaming) is an even more powerful solution due to distribution.
Imagine you just did offline training and then applied the output model to all of your data. This is where Spark and MLflow will show their best.
Install PySpark + Jupyter + Spark
Source: Get started PySpark – Jupyter
To show how we apply MLflow models to Spark dataframes, we need to set up Jupyter notebooks to work together with PySpark.
Start by installing the latest stable release Apache Spark:
cd ~/Downloads/
tar -xzf spark-2.4.3-bin-hadoop2.7.tgz
mv ~/Downloads/spark-2.4.3-bin-hadoop2.7 ~/
ln -s ~/spark-2.4.3-bin-hadoop2.7 ~/spark̀
Install PySpark and Jupyter in a virtual environment:
pip install pyspark jupyter
Set up environment variables:
export SPARK_HOME=~/spark
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --notebook-dir=${HOME}/Projects/notebooks"
Having defined notebook-dir
, we can store our notebooks in the desired folder.
Launching Jupyter from PySpark
Since we were able to set up Jupiter as the PySpark driver, we can now run Jupyter notebooks in the PySpark context.
(mlflow) afranzi:~$ pyspark
[I 19:05:01.572 NotebookApp] sparkmagic extension enabled!
[I 19:05:01.573 NotebookApp] Serving notebooks from local directory: /Users/afranzi/Projects/notebooks
[I 19:05:01.573 NotebookApp] The Jupyter Notebook is running at:
[I 19:05:01.573 NotebookApp] http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
[I 19:05:01.573 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 19:05:01.574 NotebookApp]
Copy/paste this URL into your browser when you connect for the first time,
to login with a token:
http://localhost:8888/?token=c06252daa6a12cfdd33c1d2e96c8d3b19d90e9f6fc171745
As mentioned above, MLflow provides a function for logging model artifacts in S3. As soon as we have the selected model in our hands, we have the opportunity to import it as a UDF using the module mlflow.pyfunc
…
import mlflow.pyfunc
model_path="s3:///mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model"
wine_path="/Users/afranzi/Projects/data/winequality-red.csv"
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)
df = spark.read.format("csv").option("header", "true").option('delimiter', ';').load(wine_path)
columns = [ "fixed acidity", "volatile acidity", "citric acid",
"residual sugar", "chlorides", "free sulfur dioxide",
"total sulfur dioxide", "density", "pH",
"sulphates", "alcohol"
]
df.withColumn('prediction', wine_udf(*columns)).show(100, False)
PySpark – Predicted Wine Quality
Up to this point, we’ve talked about how to use PySpark with MLflow by running wine quality predictions on the entire wine dataset. But what if you need to use Python MLflow modules from Scala Spark?
We tested this too by splitting the Spark context between Scala and Python. That is, we registered MLflow UDF in Python, and used it from Scala (yes, maybe not the best solution, but what we have).
Scala Spark + MLflow
For this example, we’ll add Toree Kernel into the existing Jupiter.
Install Spark + Toree + Jupyter
pip install toree
jupyter toree install --spark_home=${SPARK_HOME} --sys-prefix
jupyter kernelspec list
```
```
Available kernels:
apache_toree_scala /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/apache_toree_scala
python3 /Users/afranzi/.virtualenvs/mlflow/share/jupyter/kernels/python3
```
As you can see from the attached notebook, UDF is shared between Spark and PySpark. We hope this part will be useful for those who love Scala and want to deploy machine learning models to production.
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.matching.Regex
val FirstAtRe: Regex = "^_".r
val AliasRe: Regex = "[\s_.:@]+".r
def getFieldAlias(field_name: String): String = {
FirstAtRe.replaceAllIn(AliasRe.replaceAllIn(field_name, "_"), "")
}
def selectFieldsNormalized(columns: List[String])(df: DataFrame): DataFrame = {
val fieldsToSelect: List[Column] = columns.map(field =>
col(field).as(getFieldAlias(field))
)
df.select(fieldsToSelect: _*)
}
def normalizeSchema(df: DataFrame): DataFrame = {
val schema = df.columns.toList
df.transform(selectFieldsNormalized(schema))
}
FirstAtRe = ^_
AliasRe = [s_.:@]+
getFieldAlias: (field_name: String)String
selectFieldsNormalized: (columns: List[String])(df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
normalizeSchema: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
Out[1]:
[s_.:@]+
In [2]:
val winePath = "~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv"
val modelPath = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
winePath = ~/Research/mlflow-workshop/examples/wine_quality/data/winequality-red.csv
modelPath = /tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
Out[2]:
/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model
In [3]:
val df = spark.read
.format("csv")
.option("header", "true")
.option("delimiter", ";")
.load(winePath)
.transform(normalizeSchema)
df = [fixed_acidity: string, volatile_acidity: string ... 10 more fields]
Out[3]:
[fixed_acidity: string, volatile_acidity: string ... 10 more fields]
In [4]:
%%PySpark
import mlflow
from mlflow import pyfunc
model_path = "/tmp/mlflow/artifactStore/0/96cba14c6e4b452e937eb5072467bf79/artifacts/model"
wine_quality_udf = mlflow.pyfunc.spark_udf(spark, model_path)
spark.udf.register("wineQuality", wine_quality_udf)
Out[4]:
.predict at 0x1116a98c8>
In [6]:
df.createOrReplaceTempView("wines")
In [10]:
%%SQL
SELECT
quality,
wineQuality(
fixed_acidity,
volatile_acidity,
citric_acid,
residual_sugar,
chlorides,
free_sulfur_dioxide,
total_sulfur_dioxide,
density,
pH,
sulphates,
alcohol
) AS prediction
FROM wines
LIMIT 10
Out[10]:
+-------+------------------+
|quality| prediction|
+-------+------------------+
| 5| 5.576883967129615|
| 5| 5.50664776916154|
| 5| 5.525504822954496|
| 6| 5.504311247097457|
| 5| 5.576883967129615|
| 5|5.5556903912725755|
| 5| 5.467882654744997|
| 7| 5.710602976324739|
| 7| 5.657319539336507|
| 5| 5.345098606538708|
+-------+------------------+
In [17]:
spark.catalog.listFunctions.filter('name like "%wineQuality%").show(20, false)
+-----------+--------+-----------+---------+-----------+
|name |database|description|className|isTemporary|
+-----------+--------+-----------+---------+-----------+
|wineQuality|null |null |null |true |
+-----------+--------+-----------+---------+-----------+
Next steps
Even though MLflow is in Alpha at the time of writing, it looks pretty promising. The mere ability to run multiple machine learning frameworks and use them from a single endpoint takes recommender systems to the next level.
In addition, MLflow brings Data Engineers and Data Scientists closer together by creating a common layer between them.
After doing this research on MLflow, we’re confident we’ll go ahead and use it for our Spark pipelines and recommender systems.
It would be nice to sync the file storage with the database instead of the file system. This way we need to get multiple endpoints that can use the same file storage. For example, use multiple instances Presto and Athena with the same Glue metastore.
To summarize, I’d like to thank the MLFlow community for making our work with data more interesting.
If you are playing with MLflow, feel free to write to us and tell us how you use it, and even more so if you use it in production.
Learn more about the courses:
Machine Learning. Basic course
Machine Learning. Advanced course
Read more:
- Risks and Caveats When Applying Principal Component Method to Supervised Learning Problems
- Deploying a Machine Learning Model with Docker – Part 1
- Deploying a Machine Learning Model with Docker – Part 2