speeding up Embedding on Spark

My name is Dmitry Kurgansky, I am Tech Lead of the MLOps team at Banki.ru. We are working to intelligently organize and speed up the stages of the ML life cycle. In this article I will share our experience in using Embedding: from launching a Yandex Data Proc cluster via Airflow to optimizing the stage of applying Embedding using Spark. The material as a whole will be relevant for the stage of application (inference) of any models for large data sets working in batch mode according to a schedule.

What's in the article:

  • why did you decide to choose Apache Spark and Yandex Data Proc

  • what alternatives to the tool and platform were you considering?

  • details of the cluster configuration, the Airflow provider library and the DAGs themselves that are not obvious from the documentation

  • code examples with application wrapper in Spark

Who will benefit from this article?

First of all, MLOps and ML teams who want to try Spark for inference tasks with minimal effort and time. If you are at least a little familiar with Spark and Airflow, the contents of the article will not cause much difficulty. If you think about implementation, experience with Yandex services and engineers who can help with setup will be useful.

Our stack and why we need Apache Spark

Our ML infrastructure is primarily built on Yandex Cloud services. First of all, this is Kubernetes. Airflow is deployed on top of it, which launches Batch ML models on a schedule via KubernetesPodOperator. Initially, to solve the inference optimization problem, we ran the operator on a dedicated auto-scaling group of Kubernetes nodes in several instances in parallel. And while it was possible to speed up the process, making it convenient was not quite the case. It was necessary to manage the processed batches of data in each operator, to do this, create additional fields in tables and logical conditions in applications that run the ML model.

To streamline the process, we decided to use Apache Spark. A popular tool, relatively simple to implement the task, technically accessible.

Yes, to speed up Embeddings, it is primarily recommended to use a GPU. But in our case, it is generally cheaper to achieve acceptable process acceleration on Spark due to parallel computing than by accelerating the calculation itself on the GPU. The situation is similar with the inference stage of our other models.

Which platform did you choose to deploy Apache Spark?

Spark itself as a framework, of course, will not solve the problem. It requires computing power. We immediately set our sights on the cloud: we want to start quickly (try it) and pay only for use, as we do with cloud Kubernetes. Yandex Data Proc also considered it.

The obvious advantages of the option with Spark on Kuberentes include the ability to select any available version of the tools (Spark, Python), whereas in DataProc the versions are limited from above. But for us this was not critical. And at the Union All 2024 conference, Yandex once again reminded about the imminent release of Spark on Kubernetes as a separate service, which, we hope, will be a golden mean. At least they promised to update tool versions more quickly.

Both options allow you to pay only for use. But it turned out to be faster to deploy the solution, and therefore start testing, with Data Proc. So far it seems that support too. So we stopped there.

By the way, there is interesting article on the topic of comparing these approaches, but relative to the analogue of Data Proc from Amazon (EMR).

If we consider analogues of a similar product as part of the platform of other vendors, then in the domestic market, it seems that the only full-fledged analogue will be the cloud services of VK Cloud. We didn’t take a closer look at them, since we already had experience using Yandex services.

Motivation for writing an article

Since Yandex Data Proc is a relatively new product and is more focused on the domestic market, little is written about it. Finding more information about work practices is difficult. Except in the Yandex documentation, there is practically no information on the Internet, including the beloved stackoverflow. Life hack: sometimes it can help to search for answers to questions for Google Data Proc, which may also be suitable for the analogue from Yandex. The products have a similar structure, including the libraries for Airflow.

There are several articles on Habr about Google Data Proc, but I'm not sure of their relevance. But we found only one about an analogue from Yandex, but an interesting one: Magnit IT talks about the experience of using a tool for data processing.

What tools did you use?

We used Data Proc version image 2.1.15. This is the latest version available without a support request at the time of publication of the article. It uses fairly fresh Spark 3.3.2but far from the most relevant Python 3.8. Therefore, you may need other versions of libraries and/or changes in the code of applications that you plan to run on Spark if you use features of newer versions of Python in them.

For Airflow 2.3.4 used the provider library apache-airflow-providers-yandex versions 3.3.0depending on which library you took yandexcloud versions 0.227.0.

For Embeddings we have Sentence Transformers versions 2.2.2.

Running Data Proc via Airflow

This step in documentation described in sufficient detail, including the stages of preparing the cloud infrastructure. At least this volume was enough for our task.

I will focus only on two nuances of the setup:

  1. When creating an Amazon Elastic MapReduce connection for Airflow, the instructions mention the field Run Job Flow Configurationbut before the Airflow version 2.5.0 it's called Extra.

  2. Documentation does not always keep up with updates to cluster versions, so before choosing cluster_image_version It’s better to first check the latest Yandex. available through the web interface (console). Moreover, it’s worth trying to create a test cluster with it, since the latest version available for selection can only be provided upon request to support. But this will become clear only after clicking on the “Create cluster” button.

It is more interesting to consider in this chapter approaches to the strategy of organizing resources on demand. There are two of them:

  1. creating/deleting a new cluster;

  2. start/launch of one constant.

Specifically for Airflow, in the documentation we see only an example of a DAG with the sequential creation of a Data Proc cluster, launching PySpark operators and deleting the cluster. Nevertheless, let's look at the implementation details, the pros and cons of each option for a number of points in the table below.

Create/Delete

Start/Stop

Cluster raising speed

Slower

Faster

Organizing network availability of Web UI monitoring tools: Spark History Server, YARN Resource Manager, etc.

It’s more difficult – without crutches third-party automation cannot be done, since the url for the Web UI is tied to the FQDN Master Node, and for each new cluster it will be different.

Easier: the settings from the documentation are enough.

Release of quota for computing resources (CPU, RAM) during idle phase

Eat

No

Payment for disk storage during idle phase

Eat

No

Ease of organization via Airflow

Everything you need is available out of the box

In the library provider Yandex. Cloud Data Proc for Airflow there are basically no operators for stopping and starting the cluster. If necessary, you can fix it yourself: make operators based on the Data Proc API, which has such functionality.

We have not yet finally decided which option suits us best. We temporarily stopped at creating/deleting, but not for each model separately, but just once to run them all at once. Since they are performed at approximately the same time. To do this, I had to learn how to transfer the current cluster_id from the DAG in which the cluster starts to DAGs using models that use the PySpark operator, and at the end to the DAG that deletes the cluster. More on this in the next chapter.

Wrapping Embedding in Spark

The algorithm is something like this:

  • We parallelize the data across tasks, either in a distributed manner by unloading it using a connector from the required storage, or from a pandas dataframe.

  • We create a broadcast model based on executors.

  • Create a pandas UDF function. It will be applied to the data set in each task in chunks if the data set inside is too large. By default, the size of one chunk is 10,000 records, it is regulated by the parameter maxRecordsPerBatch. Link good material in English about the purpose and difference between Pandas and Python UDF.

  • We are writing a lazy calculation pipeline ending with a Spark Action. Let's say to write the result to the database.

Code distributing stage encode for the Embedding framework Sentence Transformers, looks like this:

broadcasted_model = spark.sparkContext.broadcast(model)
spark_df = spark.createDataFrame(df).repartition(num_partitions)

@spark_functions.pandas_udf(returnType=ArrayType(FloatType()))
def encode_sentences(x: pd.Series) -> pd.Series:
    return broadcasted_model.value.encode(
        x,
        batch_size=encode_batch_size,
    ).tolist()

(
    spark_df
    .withColumn('encoded_sentences', encode_sentences(spark_functions.column('sentences')))
    .write
    .format('bigquery')
    .option('writeMethod', 'direct')
    .option('table', 'result_table_name')
    .mode('overwrite')
    .save()
)

For the encode method, do not forget to specify (select) batch_size. It will significantly improve speed compared to line-by-line encoding.

Setting up DataprocCreatePysparkJobOperator

  • First of all, prepare the launch file, which will contain the initialization SparkSessionas well as importing and launching the functionality of our Python application using the ML model.

  • Next, we assemble the Python application in the form zip or egg archive. The second one can be assembled by the command python setup_tools bdist.

  • We send it all to S3.

  • If necessary, there are also jar archives of additional Spark libraries. For example, connector's to databases.

Now we configure the operator itself:

spark_encode = DataprocCreatePysparkJobOperator(
    task_id='task_id',
    name="task_name",
    cluster_id="'{{ ti.xcom_pull(dag_ids="dag_id", task_ids="task_id", key='cluster_id') }}'",
    connection_id=ycSA_connection.conn_id,
    main_python_file_uri=f's3a://{YANDEX_BUCKET}/{APP_PATH}/main.py',
    python_file_uris=[f's3a://{YANDEX_BUCKET}/{APP_PATH}/ml_app-1.0.0-py3.8.egg'],
    jar_file_uris=[f's3a://{YANDEX_BUCKET}/jars/spark-connector-xxx.jar'],
    args=[
        '--arg_1', 'arg_1_value',
        '--arg_2', 'arg_2_value',
    ],
    properties={
        'spark.submit.master': 'yarn',
        'spark.submit.deployMode': 'cluster',
        'spark.executor.memory': '4g',
        'spark.executor.instances': '4',
        'spark.executor.cores': '2',
        'spark.driver.cores': '2',
        'spark.task.cpus': '2',
    },
)

Through args we pass parameters for our Python ML application, if required. Through properties — parameters for Spark. We use a dedicated cluster on demand, so we should configure it taking into account the load. Afterwards, you need to use the available resources in such a way that there is no downtime. The corresponding properties help with this. This topic may be useful next article.

If you are using a model that weighs a lot, do not forget to calculate the sufficient amount of RAM for this model specifically for each task, and not for the executor. You can read more about Spark parameters useful for analytics and ML in this article.

About connection_id is well written in the Yandex instructions that we mentioned earlier, so let's go back to cluster_id.

In the example from the documentation, the value of this parameter for DataprocCreatePysparkJobOperator not specified because DataprocCreateClusterOperator transmits it through xcom into subsequent statements. They, in turn, read xcom via method _setupdefined in the parent class DataprocBaseOperator:

def _setup(self, context: Context) -> DataprocHook:
    if self.cluster_id is None:
        self.cluster_id = context["task_instance"].xcom_pull(key="cluster_id")

and used as the default value.

If DataprocCreateClusterOperator will be in a separate DAG, then, depending on the chosen strategy, at least the following options are possible:

  1. When starting/stopping a cluster, you can simply specify or save a fixed value in Variables, since it will not change.

  2. With the creation/deletion option, you can get the current cluster_id Also via xcom. Just in contrast to the implementation that is used by default in DataprocBaseOperator, we also indicate dag_id DAG, in which we create a cluster. We have a similar example in the code above.

Spark Data Proc logs in Airflow

An unpleasant moment: logs are not forwarded to Airflow operators, only the general status of the task (not) being completed. For example, the provider library for Google Data Proc has a button in the operator UI, the so-called ExtraLink. It leads to the Google Cloud management console, where you can find logs. Yandex does not yet have such an option.

A nice moment: you can make your own implementation for ExtraLink DataprocCreatePysparkJobOperator. Moreover, to lead she can immediately Spark UI. Actually, that's what we did. Learn more about how to create and add ExtraLink to statements here.

To do this, with our “cluster creation/deletion” approach, we dynamically update the FQDN Master Node binding to CNAME through a third-party script after each cluster restart. To get the FQDN you can use the utility yc:

yc dataproc cluster list-hosts --id="${cluster_id}" | grep MASTERNODE | awk '{print $2}'

passing cluster_id from the DAG in which we create the cluster. Next, we add a link to this CNAME and the port on which, for example, YARN Resource Manager Web UI. Ideally, you can try to make a button link directly to the log of a specific Spark application.

Secrets

The topic of working with secrets deserves special attention. Unlike the Docker container, forward names, passwords, turnout In Spark, an application running in Yandex Data Proc will not work without using a password manager. If your company does not yet have such a tool, then you can use, for example, a Yandex cloud product: LockBox. Unfortunately, there is no good documentation for the Python API, but here is here there is a clear example.

Conclusion

The general process of applying ML models on large data sets has, at first glance, definitely become more universal. It has become more convenient to configure it. It’s still difficult to say about speed, and especially about its relationship with costs: our implementation on Kubernetes also consumed resources on demand in a dedicated group of nodes, and also made it possible to organize parallel calculations.

However, Spark itself is promising for us not only from the point of view of using Embeddings or ML models. It was easier to start with these tasks. If the need arises, we will adapt Spark for data preparation. Moreover, this is its main functionality. We’ll also think about how to use it to train models for which there are already ready-made implementations on Spark. In terms of speed/cost ratio, this may not be cheaper than using a GPU, but it will definitely provide acceptable acceleration for less money. In addition, with further growth in the volume of data, nothing prevents the use of a Spark cluster together with a GPU.

We are looking forward to testing Spark on Kubernetes from Yandex.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *