Optimizing queries in ClickHouse by creating a chain of materialized views
Optimizing queries in ClickHouse by creating a chain of materialized views
ClickHouse materialized views are a mechanism that automatically executes queries against source tables when new data arrives.
A materialized view (MV) is a special type of table that contains the result of a query on the source data. This result is actually a cached view of the data from the source tables. One of the key features of MPs in ClickHouse is their automatic updating. When new data arrives in the source tables, the MP is updated, automatically recalculated in accordance with a specific request.
Thus, materialized views in ClickHouse provide a convenient way to maintain up-to-date aggregated data, allowing you to automatically update query results when there are changes in the source information. This is especially useful for performing highly computational or data aggregation queries such as reports, statistics, and analytics.
The diagram below clearly demonstrates how this works:
The last couple of weeks I've been dealing with states of aggregation. As a demonstration, I created two materialized views that receive data from the same Kafka table engine (Kafka Source Table). One stored raw event data (Raw Events), and the other stored Aggregation State.
When I set an example Tom, he proposed this option: instead of both views reading data directly from the Kafka table, it would be possible to link the MP in the form of a chain. The output table of one view becomes the input table of another.
This approach can be useful in cases where you need to perform multiple data transformations before writing to the final table. For example, the first materialized view can perform data filtering, while the second can perform grouping and aggregation. In this way, a more flexible and efficient data processing system can be created.
Below is the diagram he had in mind:
In other words, instead of having the Aggregation State Materialized View read data from the Kafka Source Table, I have to make it read data from the raw events that have already been retrieved from it ( Raw Events Table).
Later in the article we will look at a practical example of an MP chain. For this we will use Wiki news feed, which provides a stream of events reflecting changes made to various Wikimedia resources (which pages were changed, by whom and when it happened, and a description of the change itself). This data is available in server-side event format. Property data
An example message is shown below:
{
"$schema": "/mediawiki/recentchange/1.0.0",
"meta": {
"uri": "https://en.wiktionary.org/wiki/MP3%E6%92%AD%E6%94%BE%E5%99%A8",
"request_id": "ccbbbe2c-6e1b-4bb7-99cb-317b64cbd5dc",
"id": "41c73232-5922-4484-82f3-34d45f22ee7a",
"dt": "2024-03-26T09:13:09Z",
"domain": "en.wiktionary.org",
"stream": "mediawiki.recentchange",
"topic": "eqiad.mediawiki.recentchange",
"partition": 0,
"offset": 4974797626
},
"id": 117636935,
"type": "edit",
"namespace": 0,
"title": "MP3播放器",
"title_url": "https://en.wiktionary.org/wiki/MP3%E6%92%AD%E6%94%BE%E5%99%A8",
"comment": "clean up some labels; add missing space after *; {{zh-noun}} -> {{head|zh|noun}}, {{zh-hanzi}} -> {{head|zh|hanzi}} per [[WT:RFDO#All templates in Category:Chinese headword-line templates except Template:zh-noun]], [[WT:RFDO#Template:zh-noun]]; fix some lang codes (manually assisted)",
"timestamp": 1711444389,
"user": "WingerBot",
"bot": true,
"notify_url": "https://en.wiktionary.org/w/index.php?diff=78597416&oldid=50133194&rcid=117636935",
"minor": true,
"patrolled": true,
"length": {
"old": 229,
"new": 234
},
"revision": {
"old": 50133194,
"new": 78597416
},
"server_url": "https://en.wiktionary.org",
"server_name": "en.wiktionary.org",
"server_script_path": "/w",
"wiki": "enwiktionary",
"parsedcomment": "clean up some labels; add missing space after *; {{zh-noun}} -> {{head|zh|noun}}, {{zh-hanzi}} -> {{head|zh|hanzi}} per <a href=\"/wiki/Wiktionary:RFDO#All_templates_in_Category:Chinese_headword-line_templates_except_Template:zh-noun\" class=\"mw-redirect\" title=\"Wiktionary:RFDO\">WT:RFDO#All templates in Category:Chinese headword-line templates except Template:zh-noun</a>, <a href=\"/wiki/Wiktionary:RFDO#Template:zh-noun\" class=\"mw-redirect\" title=\"Wiktionary:RFDO\">WT:RFDO#Template:zh-noun</a>; fix some lang codes (manually assisted)"
}
Let's imagine that we are developing a dashboard to track changes being made. We're not interested in individual edits, we want to track minute by minute the unique number of users making changes, the unique number of pages being changed, and the total number of changes made.
Let's start by creating and then using the database wiki
:
CREATE DATABASE wiki;
USE wiki;
Creating a Kafka table engine
Next let's create a table wikiQueue
which will receive messages from the Kafka topic wiki_events
using a local Kafka broker running on port 9092.
Please note that if you are using ClickHouse Cloud, then to process data coming from Kafka you will have to use ClickPipes.
CREATE TABLE wikiQueue(
id UInt32,
type String,
title String,
title_url String,
comment String,
timestamp UInt64,
user String,
bot Boolean,
server_url String,
server_name String,
wiki String,
meta Tuple(uri String, id String, stream String, topic String, domain String)
)
ENGINE = Kafka(
'localhost:9092',
'wiki_events',
'consumer-group-wiki',
'JSONEachRow'
);
Table rawEvents
contains information about the date and time of the event (dateTime
), article title URL (title_url
), event theme (topic
) and the user who made the changes (user
).
CREATE TABLE rawEvents (
dateTime DateTime64(3, 'UTC'),
title_url String,
topic String,
user String
)
ENGINE = MergeTree
ORDER BY dateTime;
Next we will create the following materialized view to write data to rawEvents
.
CREATE MATERIALIZED VIEW rawEvents_mv TO rawEvents AS
SELECT toDateTime(timestamp) AS dateTime,
title_url,
tupleElement(meta, 'topic') AS topic,
user
FROM wikiQueue
WHERE title_url <> '';
In this case, MP will be used to record the “raw” event data extracted from the latest Wiki change stream into a table rawEvents
. This will allow us to store this data in a structured form and use it for further analysis.
We use the function toDateTime
to convert a timestamp in epoch seconds (the reference point for measuring time. Typically 00:00:00 UTC January 1, 1970) into an object DateTime
(which represents the date and time in an easy-to-work format). We also use the function tupleElement
to retrieve the topic property from the meta object.
Storing Aggregate States
Next, we will create a table to store aggregate states to ensure incremental aggregation. Aggregate states are stored in a column with type AggregateFunction(<aggregationType>, <dataType>)
.
When maintaining a unique count of string values String
in the dataset required to track unique users and unique pages, we use the type AggregateFunction(uniq, String)
. So a special aggregation function is used here. For example, if we have user logs or a log of web page visits, we may be able to find out how many unique users visited our site or how many unique pages were visited over a given period of time.
To keep a continuous count of changes, which we need for general data updating, we use the type AggregateFunction(sum, UInt32)
. This data type UInt32
denotes an unsigned integer of size 32 bits, which gives us a maximum value of 4294967295. This is much more than the number of updates we will receive in one minute. Thus, we can use this aggregation function to continuously track the total number of data updates over a period of time without worrying about reaching the maximum value.
Let's call this table byMinute
. Its definition is given below:
CREATE TABLE byMinute
(
dateTime DateTime64(3, 'UTC') NOT NULL,
users AggregateFunction(uniq, String),
pages AggregateFunction(uniq, String),
updates AggregateFunction(sum, UInt32)
)
ENGINE = AggregatingMergeTree()
ORDER BY dateTime;
The materialized view that populates this table will read data from rawEvents
and use combinators -State
to extract the intermediate state. We will use the function uniqState
to count unique users and pages, as well as a function sumState
to count changes.
CREATE MATERIALIZED VIEW byMinute_mv TO byMinute AS
SELECT toStartOfMinute(dateTime) AS dateTime,
uniqState(user) as users,
uniqState(title_url) as pages,
sumState(toUInt32(1)) AS updates
FROM rawEvents
GROUP BY dateTime;
The diagram below shows the chain of materialized views and tables that we have created so far:
There is currently no data in the system that needs to be transferred or written to the Kafka system. In order to start working with this table or with the data in the system, you need to perform certain commands or actions that will start the flow of information into Kafka. Once this is done, the data will start flowing into the table and we can use it for various purposes such as analysis, processing or storage.
Let's implement this by running the following commands.
curl -N https://stream.wikimedia.org/v2/stream/recentchange |
awk '/^data: /{gsub(/^data: /, ""); print}' |
jq -cr --arg sep ø '[.meta.id, tostring] | join($sep)' |
kcat -P -b localhost:9092 -t wiki_events -Kø
Thanks to this command we retrieve the property data
from the latest changes feed, create a pair key:value
using jq, and then passing it to Kafka using kcat.
You can leave this command running for a while to collect enough data. Then write a query to see how many changes have been made to Wikimedia resources. This can be useful for monitoring Wiki activity and analyzing changes made to various resources.
SELECT
dateTime AS dateTime,
uniqMerge(users) AS users,
uniqMerge(pages) AS pages,
sumMerge(updates) AS updates
FROM byMinute
GROUP BY dateTime
ORDER BY dateTime DESC
LIMIT 10;
┌────────────────dateTime─┬─users─┬─pages─┬─updates─┐
1. │ 2024-03-26 15:53:00.000 │ 248 │ 755 │ 1002 │
2. │ 2024-03-26 15:52:00.000 │ 429 │ 1481 │ 2164 │
3. │ 2024-03-26 15:51:00.000 │ 406 │ 1417 │ 2159 │
4. │ 2024-03-26 15:50:00.000 │ 392 │ 1240 │ 1843 │
5. │ 2024-03-26 15:49:00.000 │ 418 │ 1346 │ 1910 │
6. │ 2024-03-26 15:48:00.000 │ 422 │ 1388 │ 1867 │
7. │ 2024-03-26 15:47:00.000 │ 423 │ 1449 │ 2015 │
8. │ 2024-03-26 15:46:00.000 │ 409 │ 1420 │ 1933 │
9. │ 2024-03-26 15:45:00.000 │ 402 │ 1348 │ 1824 │
10. │ 2024-03-26 15:44:00.000 │ 432 │ 1642 │ 2142 │
└─────────────────────────┴───────┴───────┴─────────┘
Everything seems to be working well.
Adding another MP to the chain
We started this system and accumulated a certain amount of data, which was written to the byMinute table at intervals of 1 minute. Now, after running this operation for some time, it would be useful to group and split the collected data into 10-minute buckets rather than just 1-minute buckets. This can be done by writing the following query against the byMinute table:
SELECT
toStartOfTenMinutes(dateTime) AS dateTime,
uniqMerge(users) AS users,
uniqMerge(pages) AS pages,
sumMerge(updates) AS updates
FROM byMinute
GROUP BY dateTime
ORDER BY dateTime DESC
LIMIT 10;
The result will be something like this, where the values in the column dateTime
will now be located in 10 minute increments.
┌────────────dateTime─┬─users─┬─pages─┬─updates─┐
1. │ 2024-03-26 15:50:00 │ 977 │ 4432 │ 7168 │
2. │ 2024-03-26 15:40:00 │ 1970 │ 12372 │ 20555 │
3. │ 2024-03-26 15:30:00 │ 1998 │ 11673 │ 20043 │
4. │ 2024-03-26 15:20:00 │ 1981 │ 12051 │ 20026 │
5. │ 2024-03-26 15:10:00 │ 1996 │ 11793 │ 19392 │
6. │ 2024-03-26 15:00:00 │ 2092 │ 12778 │ 20649 │
7. │ 2024-03-26 14:50:00 │ 2062 │ 12893 │ 20465 │
8. │ 2024-03-26 14:40:00 │ 2028 │ 12798 │ 20873 │
9. │ 2024-03-26 14:30:00 │ 2020 │ 12169 │ 20364 │
10. │ 2024-03-26 14:20:00 │ 2077 │ 11929 │ 19797 │
└─────────────────────┴───────┴───────┴─────────┘
This works well with a small amount of information, but when you have to process large arrays, you may need another table to store the data divided into 10-minute intervals. Let's create a table like this:
CREATE TABLE byTenMinutes
(
dateTime DateTime64(3, 'UTC') NOT NULL,
users AggregateFunction(uniq, String),
pages AggregateFunction(uniq, String),
updates AggregateFunction(sum, UInt32)
)
ENGINE = AggregatingMergeTree()
ORDER BY dateTime;
Next, we will create a MP to fill this table. MP will query the table byMinute
, using a query similar to the one we used to calculate the 10-minute intervals above. The only difference is that instead of combinators -Merge
we need to use combinators -MergeState
to return the data aggregation state byMinute
not the base result.
Theoretically, we can save computation time, since MP byMinute
has already aggregated data into one-minute buckets. Now, instead of aggregating raw data by seconds from scratch into 10-minute intervals, we use one-minute buckets.
The materialized view is shown below:
CREATE MATERIALIZED VIEW byTenMinutes_mv TO byTenMinutes AS
SELECT toStartOfMinute(dateTime) AS dateTime,
uniqMergeState(users) as users,
uniqMergeState(pages) as pages,
sumMergeState(updates) AS updates
FROM byMinute
GROUP BY dateTime;
The following diagram shows the chain of materialized views we created:
If we query data from a table byTenMinutes
, we won’t see any information in it because it hasn’t been filled in yet. When the table byTenMinutes
will begin to fill, it will only collect new data that will enter the table byMinute
. This means that old data that has already accumulated in byMinute
will not be automatically transferred to the table byTenMinutes
.
However, everything is not as bad as it seems. You can write a request to restore previous data. It will perform the function of backloading data, that is, it will fill byTenMinutes
old data from the table byMinute
:
INSERT INTO byTenMinutes
SELECT toStartOfTenMinutes(dateTime),
uniqMergeState(users) AS users, uniqMergeState(pages) AS pages,
sumMergeState(updates) AS updates
FROM byMinute
GROUP BY dateTime;
This will allow us to get a complete picture of the data, both new and old, in the table byTenMinutes
.
We can then write the following query to byTenMinutes
to return data grouped into 10-minute buckets, useful for further analysis or visualization:
SELECT
dateTime AS dateTime,
uniqMerge(users) AS users,
uniqMerge(pages) AS pages,
sumMerge(updates) AS updates
FROM byTenMinutes
GROUP BY dateTime
ORDER BY dateTime DESC
LIMIT 10;
We will get the same results as when querying the table byMinute
:
┌────────────dateTime─┬─users─┬─pages─┬─updates─┐
1. │ 2024-03-26 15:50:00 │ 977 │ 4432 │ 7168 │
2. │ 2024-03-26 15:40:00 │ 1970 │ 12372 │ 20555 │
3. │ 2024-03-26 15:30:00 │ 1998 │ 11673 │ 20043 │
4. │ 2024-03-26 15:20:00 │ 1981 │ 12051 │ 20026 │
5. │ 2024-03-26 15:10:00 │ 1996 │ 11793 │ 19392 │
6. │ 2024-03-26 15:00:00 │ 2092 │ 12778 │ 20649 │
7. │ 2024-03-26 14:50:00 │ 2062 │ 12893 │ 20465 │
8. │ 2024-03-26 14:40:00 │ 2028 │ 12798 │ 20873 │
9. │ 2024-03-26 14:30:00 │ 2020 │ 12169 │ 20364 │
10. │ 2024-03-26 14:20:00 │ 2077 │ 11929 │ 19797 │
└─────────────────────┴───────┴───────┴─────────┘
You can learn everything about working with ClickHouse – from installation and configuration to product solutions – in the online course at OTUS “ClickHouse for Database Engineers and Architects.”