Need for speed. Batch processing with TiSpark

TiSpark is an Apache Spark plug-in that works with the platform TiDB and responds to complex online analytical processing (OLAP) queries. This Spark plugin is widely used for batch processing large amounts of data and for obtaining analytical insights. I am a Senior Solution Architect at PingCAP and a former TiSpark developer. In this post, I will explain how it works and why TiSpark better than traditional batch processing solutions.


Batch processing: tradition and TiSpark

Let’s take a look at the traditional batch processing architecture first.

Traditional Batching Architecture

A traditional batch processing system first receives raw data, which may be CSV files or data from TiDB, MySQL, and other heterogeneous databases. The raw data is then divided into small batches of subtasks. In each packet, the data is processed separately, and then captured and written to the TiDB. However, such a system has one fatal problem: it cannot guarantee the atomicity, consistency, isolation, and persistence (ACID) of transactions through the database.

The application must provide a sophisticated task table mechanism to track whether tasks and subtasks are successful. If the subtask is not completed, the system can rollback it all. In extreme cases, it is required to intervene with your hands. And here’s the result: this mechanism slows down the entire data processing task. Sometimes the task slows down so much that commercial banks cannot accept the transaction. This is because banking batch processing should be completed within a day; otherwise it will affect the next day’s opening. But when it comes to TiSpark, something else is happening.

Batch processing with TiSpark

TiSpark treats the downloaded raw data as a whole without breaking a large dataset into small subsets.

After processing, the data is concurrently written to the TiKV server using a two-phase commit protocol [коммита, commit]without going through the TiDB server. To summarize, we can say that batch processing with TiSpark has the following advantages:

  • He’s very fast. TiSpark bypasses TiDB and writes data to TiKV concurrently in many-to-many mode. This provides horizontal scalability. If the bottleneck is TiKV or Apache Spark, you can simply add another TiKV or Spark node to add more storage or compute power.
  • It is easy to configure. The only thing you configure is Spark’s direction on how to use TiSpark. TiSpark’s batch processing logic is mostly compatible with the DataSource API in Spark, so you can set up TiSpark easily once you understand the DataSource API and DataFrame API.
  • Transactions are guaranteed. Data writing will be successful or unsuccessful. A real-life case shows that TiSpark can write 60 million lines of TPC-H LINEITEM data in 8 minutes.

What’s under the hood?


Architecture

The figure below shows the role of TiSpark in the entire TiDB cluster:

TiDB cluster components

The components in the figure are color coded:

When TiSpark receives a task and processes data, it locks tables before writing data. This prevents TiSpark from rolling back its own transaction due to conflicts with other transactions. We don’t want any rollbacks like this, because TiSpark usually processes hundreds of millions of rows of data and it takes a lot of time. This table locking behavior only applies to TiDB 3.0.14 and up… In version 4.0.x TiDB we changed the transaction protocol and now it supports large transactions up to 10 GB… When TiSpark is compatible with the protocol modification, there is no need to lock tables. Next, TiSpark classifies, counts, samples and calculates the data for recording and estimates how many new regions should be generated during the batch recording. Then he transfers the information to TiDB. TiDB interacts with other components and is pre-divided into the required number of regions. The preliminary division of regions avoids such problems as:

  • Hot spots.
  • Degradation of TiSpark write performance caused by splitting the region at the same time.

When recording data, TiSpark also interacts with PD in two ways:

  • Gets meta information. TiKV stores key-value pairs, so TiSpark converts all data rows to key-value pairs before recording. TiSpark needs to know which region to write pairs to, that is, it needs to get the corresponding region address.
  • Requests a timestamp from PD to guarantee transactions. You can think of this timestamp as a transaction ID. To competitively write generated pairs to TiKV, TiSpark uses Spark Worker.

Implementation

Now that you understand the basics of TiSpark, let’s dive deeper to see the details of its implementation.

First, we used Java to implement the TiKV client in TiSpark. This client is rich in functionality and can be separately used by Java applications to interact with TiKV.

  • Client implements coprocessor interface It can interact with TiKV or TiFlash and perform some calculations like limit, order and aggregation calculations. The client also processes some predicates, indexes, and key-value fields. For example, it can optimize an indexed query so that the entire table is not scanned.
  • The client implements a two-phase commit protocol, ensuring that TiSpark records are ACID compliant. The client also maintains some statistics and index information that, when the execution plan is generated, helps Spark choose the best path to execute a query.

The TiKV client allows TiSpark to interact with TiKV and TiFlash. Another key issue is how to tell Spark the result of this interaction.

TiSpark uses the Extensions Point in Spark as an entry point, which reduces the cost of maintaining the full Spark code set and allows you to customize the Spark Catalyst optimizer. For example, you can easily embed TiKV or TiFlash access logic into your Spark execution plan.

TiSpark guarantees transactions ACID properties for both single and multiple table writes. For writing to a single table, TiSpark is fully compatible with the Spark DataSource API, because a Spark dataframe is like a single table. For writing multiple tables, you can use an additional interface supported by TiSpark to map database tables to Spark DataFrame. For example, you can map a table to a data frame through the database name and table name, and then put that information into the mapping. Suppose you need to write three tables, then there should be three items in the mapping.

We want this interface to remain the same no matter how many versions of TiSpark are released in the future.

If you are familiar with Spark, you may be wondering: DataFrames in Spark are like a single table. Wouldn’t it be difficult to combine them due to the incompatible table structure? Well, don’t worry. The TiKV data format is key-value pairs. When multiple tables are written, they are merged only after the DataFrames are converted to key-value pairs.

application

How does TiSpark fit into your existing distributed application system?
Suppose you have a distributed system with three parts:

  • The service application framework accepts batch tasks written by application developers.
  • An application framework for asynchronous tasks schedules batch tasks.
  • The Batch Application Framework performs batch tasks.

You can integrate TiSpark into a batch application framework to schedule and process batch tasks.

TiSpark processes data through DataFrame interfaces or Spark SQL

Let’s say you have a users table that stores user loans and interest rates. Based on this data, we need to calculate the percentage that users must pay in the current month. The following code block demonstrates how to implement batch logic using the DataFrame and Spark SQL interfaces separately:

// DataFrame API implementation
val dfWithDeducted = df.withColumn("toBeDeducted",
                    df("loan") * df("interestRate"))
val finalDF = dfWithDeducted
                    .withColumn("balance",
                        dfWithDeducted("balance")
                        - dfWithDeducted("toBeDeducted"))
                    .drop("toBeDeducted")
// Spark SQL implementation
val df = spark.sql("select *, (balance - load * interestRate) as newBala from a").drop("balance")
val finalDF = df.withColumnRenamed("newBala", "balance")

  1. Find loan and interest rate columns using the DataFrame interface.
  2. Use a simple arithmetic operation to calculate percentages.
  3. Create a new column named toBeDeducted using the withColumn interface.
  4. Subtract the toBeDeducted value from the original balance and get a new balance.
  5. Delete column toBeDeducted.

Another example is the credit card bonus points system. Three tables are used to calculate credit card bonus points:

  • Reward Points Table: Stores the user’s current points.
  • Expense Table: Stores the user’s monthly expenses.
  • Rules table: Stores discount rules. Different sellers have different discount rules. Discount in jewelry stores – 1: 2; that is, $ 1 is 2 points.

To create a new DataFrame, we can concatenate three tables in Spark. We then do some arithmetic using the appropriate DataFrame column names, such as multiplying the consumption by a factor in the rule table. After that, we execute the batch task.

When the execution is complete, we can process the DataFrame according to different table structures. Finally, the TiSpark quickly writes the processed data to the TiKV, and the TiDB does not participate in the recording.

Visualization

By submitting a task to TiSpark, you can monitor its progress. The figure below shows batch processing that writes 4 million lines of data:

Monitoring tasks in TiSpark

On the monitor page, you can see which task is being processed and that it should be completed in about 5 minutes. The table provides a summary of each job ID and task:

I hope you learned more about TiSpark through this post. If you have questions about TiSpark or its batch processing solution, email me. I will be happy to discuss with you how to integrate TiSpark into your application.


Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *