XGBoost distributed learning and concurrent forecasting with Apache Spark

Hello, Habr! At the end of July, Otus launches a new course. “Industrial ML on Big Data”. Traditionally, in anticipation of the start of a new course, we have prepared for you a translation of useful material.


General information

In boosting (from an ensemble of machine learning models), algorithms implement a sequential process (as opposed to bagging, where it is parallelized), which generates weak learning algorithms and combines them with strong (as in all ensemble methods). In boosting at each iteration of the process, the model tries to adaptively correct the errors of the previous iteration, unlike bagging, in which weak learning algorithms are trained independently.

One of the boosting algorithms, gradient boosting, uses gradient descent to minimize the loss function directly in these sequential models (unlike the AdaBoost algorithm, where training occurs by changing the weights of the training instances).

Weak learning algorithms created during gradient boosting during training are usually implemented as decision trees. The most ineffective in gradient boosting is the sequential process of creating trees, because in this case only one tree is created at a time.

To get around this limitation, Tianji Chen and Carlos Gestrin proposed an improvement in the gradient boosting algorithm called XGBoost, which stands for Extreme Gradient Boosting or Extreme Gradient Boosting. This is a kind of gradient boosting on steroids, which is used mainly for classification, but also sometimes for regression and ranking.

Compared to standard gradient boosting, the new method significantly increases productivity due to hyperparameters, GPU support, cross-validation and regularization of algorithms. In general, the model is more efficient, learns faster, and is less prone to retraining.

Recently, XGBoost has gained great popularity and won many machine learning competitions at Kaggle. It is believed that it has great computing power and accuracy.

XGBoost and Apache Spark

During the standard workflow, ML uses systems like Spark to create a machine learning pipeline, where you preprocess and clean the data, and then the result is passed to the machine learning phase, often using Spark MLlib if you are already using Spark.

In the context of this article, it is important that XGBoost has parallelization of the tree building process, which allows distributed training and forecasting between nodes. That is, if I, as an Apache Spark MLlib user, can use it to expand the possibilities of learning XGBoost and working on production, then, in fact, I can enjoy the high performance of XGBoost and Spark’s powerful mechanisms for feature engineering and ML-pipeline construction.

Meet XGBoost4J-Spark, a project that integrates XGBoost and Apache Spark, adding XGBoost to the Apache Spark MLlib framework.

XGBoost4J-Spark makes it possible to build a MLlib pipeline that preprocesses data before training the XGBoost model, trains it and can be used for parallel forecasting on production. Using this library, each XGBoost worker turns into a Spark task, while the training dataset from Spark memory is sent to XGBoost workers who invisibly exist in Spark artists.

To start writing a machine learning application on XGBoost4J-Spark, you first need to add the appropriate dependency:


    ml.dmlc
    xgboost4j-spark
    0.90

Data preparation (example with irises)

As mentioned earlier, XGBoost4J-Spark allows you to “fit” the data under the XGBoost interface.

As soon as we consider the Iris Flowers dataset in the DataFrame, we will need:

  • Convert columns from String to Double;
  • Combine feature columns into vectors so that the data matches the Spark machine learning framework interface.

import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.VectorAssembler
val stringIndexer = new StringIndexer().
  setInputCol("class").
  setOutputCol("classIndex").
  fit(irisDF)
val labelTransformed = stringIndexer.transform(irisDF).drop("class")
val vectorAssembler = new VectorAssembler().
  setInputCols(Array("sepal length", "sepal width", "petal length", "petal width")).
  setOutputCol("features")
val xgbInput = vectorAssembler.transform(labelTransformed).select("features", "classIndex")

In the DataFrame above, the result will be two columns, “Features”: vector – representing the signs of iris and “ClassIndex”: label type Double. Such a DataFrame can be safely fed to the XGBoost4J-Spark learning engine.

Distributed training

import ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier

val xgbClassifier = new XGBoostClassifier().
      setFeaturesCol("features").
      setLabelCol("classIndex").
      setObjective("multi:softmax")
      setMaxDepth(2).
      setNumClass(3).
      setNumRound(100).
      setNumWorkers(10).

You can find the full list of XGBoost options here. Note that in XGBoost4J-Spark you can also use camelСase, as in the example above.

Notes

  1. multi: softmax means that we are doing multiclass classification using the function softmax. To do this, set the number of classes using the parameter num_class.
  2. max_depth – This is the maximum depth of the tree created at each iteration of boosting. Increasing this value will make the model complex and prone to retraining. When learning deep trees, XGBoost consumes a lot of memory.
  3. num_rounds – the number of rounds of boosting.
  4. Parameter num_workers determines how many parallel workers we need when learning XGBoostClassificationModel. Later, this parameter will become delayed tasks in Spark, which in the future will be processed by the cluster manager (in most cases, YARN).

Early stop supported by options num_early_stopping_rounds and maximize_evaluation_metrics.

Now we can create a transformer by training the XGBoost classifier on the input DataFrame. As a result of the learning process, we get a model that can be used to make predictions.

val xgbClassificationModel = xgbClassifier.fit(xgbInput)

Parallel forecasting

XGBoost4j-Spark supports batch prediction and spot prediction.

For batch forecasting, the model takes a DataFrame with a column containing the feature vectors, makes a forecast for each feature vector and displays a new DataFrame with the results. In this process, XGBoost4J-Spark launches a Spark task with an XGBoost worker for each part of the input DataFrame for parallel batch prediction.

val predictionsDf = xgbClassificationModel.transform(inputDF)
predictionsDf.show()
+----------------+----------+-------------+-------------+----------+
|       features |classIndex|rawPrediction| probability |prediction|
+----------------+----------+-------------+-------------+----------+
|[5.1,3.5,1.2,.. |       0.0|[3.4556984...|[0.9957963...|       0.0|
|[4.7,3.2,1.3,.. |       0.0|[3.4556984...|[0.9961891...|       0.0|
|[5.7,4.4,1.5,.. |       0.0|[3.4556984...|[0.9964334...|       0.0|
+----------------+----------+-------------+-------------+----------+

For point prediction, the model takes one vector.

val features = xgbInput.head().getAs[Vector]("features")
val result = xgbClassificationModel.predict(features)

Spot prediction using XGBoost is not recommended due to the high overhead, since they will be comparable to a single forecast.

At the moment, the latest version (0.9) of XGBoost4J-Spark requires Spark 2.4.x., mainly because it now uses tools org.apache.spark.ml.param.sharedthat are not fully available in earlier versions of Spark.

This version also includes more consistent processing of missing values, better performance for multi-core processors, improved caching management of shared training data to reduce training time, etc.

You can learn more in the documentation. XGBoost.

Sources

XGBoost with CUDA
XGBoost in Spark with GPU and RAPIDS XGboost4J-Spark


Learn more about the course.


Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *