Optimizing queries in ClickHouse by creating a chain of materialized views

Optimizing queries in ClickHouse by creating a chain of materialized views

ClickHouse materialized views are a mechanism that automatically executes queries against source tables when new data arrives.

A materialized view (MV) is a special type of table that contains the result of a query on the source data. This result is actually a cached view of the data from the source tables. One of the key features of MPs in ClickHouse is their automatic updating. When new data arrives in the source tables, the MP is updated, automatically recalculated in accordance with a specific request.

Thus, materialized views in ClickHouse provide a convenient way to maintain up-to-date aggregated data, allowing you to automatically update query results when there are changes in the source information. This is especially useful for performing highly computational or data aggregation queries such as reports, statistics, and analytics.

The diagram below clearly demonstrates how this works:

Inserted rowsTransformed rowsMaterialized ViewDestination TableSource Table

Inserted rows
Transformed rows
Materialized View
Final (aggregated) table (Destination Table)
Source Table

The last couple of weeks I've been dealing with states of aggregation. As a demonstration, I created two materialized views that receive data from the same Kafka table engine (Kafka Source Table). One stored raw event data (Raw Events), and the other stored Aggregation State.

When I set an example Tom, he proposed this option: instead of both views reading data directly from the Kafka table, it would be possible to link the MP in the form of a chain. The output table of one view becomes the input table of another.

This approach can be useful in cases where you need to perform multiple data transformations before writing to the final table. For example, the first materialized view can perform data filtering, while the second can perform grouping and aggregation. In this way, a more flexible and efficient data processing system can be created.

Below is the diagram he had in mind:

In other words, instead of having the Aggregation State Materialized View read data from the Kafka Source Table, I have to make it read data from the raw events that have already been retrieved from it ( Raw Events Table).

Later in the article we will look at a practical example of an MP chain. For this we will use Wiki news feed, which provides a stream of events reflecting changes made to various Wikimedia resources (which pages were changed, by whom and when it happened, and a description of the change itself). This data is available in server-side event format. Property data An example message is shown below:

{
  "$schema": "/mediawiki/recentchange/1.0.0",
  "meta": {
    "uri": "https://en.wiktionary.org/wiki/MP3%E6%92%AD%E6%94%BE%E5%99%A8",
    "request_id": "ccbbbe2c-6e1b-4bb7-99cb-317b64cbd5dc",
    "id": "41c73232-5922-4484-82f3-34d45f22ee7a",
    "dt": "2024-03-26T09:13:09Z",
    "domain": "en.wiktionary.org",
    "stream": "mediawiki.recentchange",
    "topic": "eqiad.mediawiki.recentchange",
    "partition": 0,
    "offset": 4974797626
  },
  "id": 117636935,
  "type": "edit",
  "namespace": 0,
  "title": "MP3播放器",
  "title_url": "https://en.wiktionary.org/wiki/MP3%E6%92%AD%E6%94%BE%E5%99%A8",
  "comment": "clean up some labels; add missing space after *; {{zh-noun}} -> {{head|zh|noun}}, {{zh-hanzi}} -> {{head|zh|hanzi}} per [[WT:RFDO#All templates in Category:Chinese headword-line templates except Template:zh-noun]], [[WT:RFDO#Template:zh-noun]]; fix some lang codes (manually assisted)",
  "timestamp": 1711444389,
  "user": "WingerBot",
  "bot": true,
  "notify_url": "https://en.wiktionary.org/w/index.php?diff=78597416&oldid=50133194&rcid=117636935",
  "minor": true,
  "patrolled": true,
  "length": {
    "old": 229,
    "new": 234
  },
  "revision": {
    "old": 50133194,
    "new": 78597416
  },
  "server_url": "https://en.wiktionary.org",
  "server_name": "en.wiktionary.org",
  "server_script_path": "/w",
  "wiki": "enwiktionary",
  "parsedcomment": "clean up some labels; add missing space after *; {{zh-noun}} -&gt; {{head|zh|noun}}, {{zh-hanzi}} -&gt; {{head|zh|hanzi}} per <a href=\"/wiki/Wiktionary:RFDO#All_templates_in_Category:Chinese_headword-line_templates_except_Template:zh-noun\" class=\"mw-redirect\" title=\"Wiktionary:RFDO\">WT:RFDO#All templates in Category:Chinese headword-line templates except Template:zh-noun</a>, <a href=\"/wiki/Wiktionary:RFDO#Template:zh-noun\" class=\"mw-redirect\" title=\"Wiktionary:RFDO\">WT:RFDO#Template:zh-noun</a>; fix some lang codes (manually assisted)"
}

Let's imagine that we are developing a dashboard to track changes being made. We're not interested in individual edits, we want to track minute by minute the unique number of users making changes, the unique number of pages being changed, and the total number of changes made.

Let's start by creating and then using the database wiki:

CREATE DATABASE wiki;
USE wiki;

Creating a Kafka table engine

Next let's create a table wikiQueuewhich will receive messages from the Kafka topic wiki_eventsusing a local Kafka broker running on port 9092.

Please note that if you are using ClickHouse Cloud, then to process data coming from Kafka you will have to use ClickPipes.

CREATE TABLE wikiQueue(
    id UInt32,
    type String,
    title String,
    title_url String,
    comment String,
    timestamp UInt64,
    user String,
    bot Boolean,
    server_url String,
    server_name String,
    wiki String,
    meta Tuple(uri String, id String, stream String, topic String, domain String)
)
ENGINE = Kafka(
  'localhost:9092', 
  'wiki_events', 
  'consumer-group-wiki', 
  'JSONEachRow'
);

Table rawEvents contains information about the date and time of the event (dateTime), article title URL (title_url), event theme (topic) and the user who made the changes (user).

CREATE TABLE rawEvents (
    dateTime DateTime64(3, 'UTC'),
    title_url String,
    topic String,
    user String
) 
ENGINE = MergeTree 
ORDER BY dateTime;

Next we will create the following materialized view to write data to rawEvents.

CREATE MATERIALIZED VIEW rawEvents_mv TO rawEvents AS 
SELECT toDateTime(timestamp) AS dateTime,
       title_url, 
       tupleElement(meta, 'topic') AS topic, 
       user
FROM wikiQueue
WHERE title_url <> '';

In this case, MP will be used to record the “raw” event data extracted from the latest Wiki change stream into a table rawEvents. This will allow us to store this data in a structured form and use it for further analysis.

We use the function toDateTime to convert a timestamp in epoch seconds (the reference point for measuring time. Typically 00:00:00 UTC January 1, 1970) into an object DateTime (which represents the date and time in an easy-to-work format). We also use the function tupleElement to retrieve the topic property from the meta object.

Storing Aggregate States

Next, we will create a table to store aggregate states to ensure incremental aggregation. Aggregate states are stored in a column with type AggregateFunction(<aggregationType>, <dataType>).

When maintaining a unique count of string values String in the dataset required to track unique users and unique pages, we use the type AggregateFunction(uniq, String). So a special aggregation function is used here. For example, if we have user logs or a log of web page visits, we may be able to find out how many unique users visited our site or how many unique pages were visited over a given period of time.

To keep a continuous count of changes, which we need for general data updating, we use the type AggregateFunction(sum, UInt32). This data type UInt32 denotes an unsigned integer of size 32 bits, which gives us a maximum value of 4294967295. This is much more than the number of updates we will receive in one minute. Thus, we can use this aggregation function to continuously track the total number of data updates over a period of time without worrying about reaching the maximum value.

Let's call this table byMinute. Its definition is given below:

CREATE TABLE byMinute
(
    dateTime DateTime64(3, 'UTC') NOT NULL,
    users AggregateFunction(uniq, String),
    pages AggregateFunction(uniq, String),
    updates AggregateFunction(sum, UInt32) 
)
ENGINE = AggregatingMergeTree() 
ORDER BY dateTime;

The materialized view that populates this table will read data from rawEvents and use combinators -State to extract the intermediate state. We will use the function uniqState to count unique users and pages, as well as a function sumState to count changes.

CREATE MATERIALIZED VIEW byMinute_mv TO byMinute AS 
SELECT toStartOfMinute(dateTime) AS dateTime,
       uniqState(user) as users,
       uniqState(title_url) as pages,
       sumState(toUInt32(1)) AS updates
FROM rawEvents
GROUP BY dateTime;

The diagram below shows the chain of materialized views and tables that we have created so far:

There is currently no data in the system that needs to be transferred or written to the Kafka system. In order to start working with this table or with the data in the system, you need to perform certain commands or actions that will start the flow of information into Kafka. Once this is done, the data will start flowing into the table and we can use it for various purposes such as analysis, processing or storage.

Let's implement this by running the following commands.

curl -N https://stream.wikimedia.org/v2/stream/recentchange  |
awk '/^data: /{gsub(/^data: /, ""); print}' |
jq -cr --arg sep ø '[.meta.id, tostring] | join($sep)' |
kcat -P -b localhost:9092 -t wiki_events -Kø

Thanks to this command we retrieve the property data from the latest changes feed, create a pair key:value using jq, and then passing it to Kafka using kcat.

You can leave this command running for a while to collect enough data. Then write a query to see how many changes have been made to Wikimedia resources. This can be useful for monitoring Wiki activity and analyzing changes made to various resources.

SELECT
    dateTime AS dateTime,
    uniqMerge(users) AS users,
    uniqMerge(pages) AS pages,
    sumMerge(updates) AS updates
FROM byMinute
GROUP BY dateTime
ORDER BY dateTime DESC
LIMIT 10;
    ┌────────────────dateTime─┬─users─┬─pages─┬─updates─┐
 1. │ 2024-03-26 15:53:00.000 │   248 │   755 │    1002 │
 2. │ 2024-03-26 15:52:00.000 │   429 │  1481 │    2164 │
 3. │ 2024-03-26 15:51:00.000 │   406 │  1417 │    2159 │
 4. │ 2024-03-26 15:50:00.000 │   392 │  1240 │    1843 │
 5. │ 2024-03-26 15:49:00.000 │   418 │  1346 │    1910 │
 6. │ 2024-03-26 15:48:00.000 │   422 │  1388 │    1867 │
 7. │ 2024-03-26 15:47:00.000 │   423 │  1449 │    2015 │
 8. │ 2024-03-26 15:46:00.000 │   409 │  1420 │    1933 │
 9. │ 2024-03-26 15:45:00.000 │   402 │  1348 │    1824 │
10. │ 2024-03-26 15:44:00.000 │   432 │  1642 │    2142 │
    └─────────────────────────┴───────┴───────┴─────────┘

Everything seems to be working well.

Adding another MP to the chain

We started this system and accumulated a certain amount of data, which was written to the byMinute table at intervals of 1 minute. Now, after running this operation for some time, it would be useful to group and split the collected data into 10-minute buckets rather than just 1-minute buckets. This can be done by writing the following query against the byMinute table:

SELECT
    toStartOfTenMinutes(dateTime) AS dateTime,
    uniqMerge(users) AS users,
    uniqMerge(pages) AS pages,
    sumMerge(updates) AS updates
FROM byMinute
GROUP BY dateTime
ORDER BY dateTime DESC
LIMIT 10;

The result will be something like this, where the values ​​in the column dateTime will now be located in 10 minute increments.

    ┌────────────dateTime─┬─users─┬─pages─┬─updates─┐
 1. │ 2024-03-26 15:50:00 │   977 │  4432 │    7168 │
 2. │ 2024-03-26 15:40:00 │  1970 │ 12372 │   20555 │
 3. │ 2024-03-26 15:30:00 │  1998 │ 11673 │   20043 │
 4. │ 2024-03-26 15:20:00 │  1981 │ 12051 │   20026 │
 5. │ 2024-03-26 15:10:00 │  1996 │ 11793 │   19392 │
 6. │ 2024-03-26 15:00:00 │  2092 │ 12778 │   20649 │
 7. │ 2024-03-26 14:50:00 │  2062 │ 12893 │   20465 │
 8. │ 2024-03-26 14:40:00 │  2028 │ 12798 │   20873 │
 9. │ 2024-03-26 14:30:00 │  2020 │ 12169 │   20364 │
10. │ 2024-03-26 14:20:00 │  2077 │ 11929 │   19797 │
    └─────────────────────┴───────┴───────┴─────────┘

This works well with a small amount of information, but when you have to process large arrays, you may need another table to store the data divided into 10-minute intervals. Let's create a table like this:

CREATE TABLE byTenMinutes
(
    dateTime DateTime64(3, 'UTC') NOT NULL,
    users AggregateFunction(uniq, String),
    pages AggregateFunction(uniq, String),
    updates AggregateFunction(sum, UInt32) 
)
ENGINE = AggregatingMergeTree() 
ORDER BY dateTime;

Next, we will create a MP to fill this table. MP will query the table byMinute, using a query similar to the one we used to calculate the 10-minute intervals above. The only difference is that instead of combinators -Merge we need to use combinators -MergeStateto return the data aggregation state byMinutenot the base result.

Theoretically, we can save computation time, since MP byMinute has already aggregated data into one-minute buckets. Now, instead of aggregating raw data by seconds from scratch into 10-minute intervals, we use one-minute buckets.

The materialized view is shown below:

CREATE MATERIALIZED VIEW byTenMinutes_mv TO byTenMinutes AS
SELECT toStartOfMinute(dateTime) AS dateTime,
       uniqMergeState(users) as users,
       uniqMergeState(pages) as pages,
       sumMergeState(updates) AS updates
FROM byMinute
GROUP BY dateTime;

The following diagram shows the chain of materialized views we created:

If we query data from a table byTenMinutes, we won’t see any information in it because it hasn’t been filled in yet. When the table byTenMinutes will begin to fill, it will only collect new data that will enter the table byMinute. This means that old data that has already accumulated in byMinutewill not be automatically transferred to the table byTenMinutes.

However, everything is not as bad as it seems. You can write a request to restore previous data. It will perform the function of backloading data, that is, it will fill byTenMinutes old data from the table byMinute:

INSERT INTO byTenMinutes 
SELECT toStartOfTenMinutes(dateTime),
       uniqMergeState(users) AS users, uniqMergeState(pages) AS pages,
       sumMergeState(updates) AS updates
FROM byMinute
GROUP BY dateTime;

This will allow us to get a complete picture of the data, both new and old, in the table byTenMinutes.

We can then write the following query to byTenMinutesto return data grouped into 10-minute buckets, useful for further analysis or visualization:

SELECT
    dateTime AS dateTime,
    uniqMerge(users) AS users,
    uniqMerge(pages) AS pages,
    sumMerge(updates) AS updates
FROM byTenMinutes
GROUP BY dateTime
ORDER BY dateTime DESC
LIMIT 10;

We will get the same results as when querying the table byMinute:

    ┌────────────dateTime─┬─users─┬─pages─┬─updates─┐
 1. │ 2024-03-26 15:50:00 │   977 │  4432 │    7168 │
 2. │ 2024-03-26 15:40:00 │  1970 │ 12372 │   20555 │
 3. │ 2024-03-26 15:30:00 │  1998 │ 11673 │   20043 │
 4. │ 2024-03-26 15:20:00 │  1981 │ 12051 │   20026 │
 5. │ 2024-03-26 15:10:00 │  1996 │ 11793 │   19392 │
 6. │ 2024-03-26 15:00:00 │  2092 │ 12778 │   20649 │
 7. │ 2024-03-26 14:50:00 │  2062 │ 12893 │   20465 │
 8. │ 2024-03-26 14:40:00 │  2028 │ 12798 │   20873 │
 9. │ 2024-03-26 14:30:00 │  2020 │ 12169 │   20364 │
10. │ 2024-03-26 14:20:00 │  2077 │ 11929 │   19797 │
    └─────────────────────┴───────┴───────┴─────────┘

You can learn everything about working with ClickHouse – from installation and configuration to product solutions – in the online course at OTUS “ClickHouse for Database Engineers and Architects.”

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *