kmeans in Clickhouse
Algorithm kmeans well known and used when you need to quickly divide a data array into groups or socalled. “clusters”. It is assumed that each data element has a set of numerical metrics, and we can talk about both the position of a point in a certain multidimensional space, and about their mutual proximity.
kmeans belongs to the category of EMalgorithms (ExpectanionMaximization), where we alternately determine how correct the current division of points into clusters is, and then slightly improve it.
This rather simple algorithm was formulated back in the 1950s, and has since been implemented in a variety of programming languages. There are implementations for MySQL and Postgress, and even Excel.
Why Clickhouse
There are hundreds and thousands of kmeans implementations. Why not take some popular python library, pull the data through a database driver, and process it in Python memory?
It is also possible if there is little data, and the task is to find some special area in the image in order to apply a more complex algorithm to it. But it happens that there is a lot of data. Or even VERY MUCH. Yes, Clickhouse is about Big Data.
Big Data is when there is so much data that it doesn’t fit anywhere. Not only in RAM – you don’t even have to dream about it. Often the entire data array does not fit even on one server, and you have to put several (shard). Just writing to sklearn.cluster.KMeans is to read 1Tb of data somewhere into the memory of a Python process, and then again and again, and in 1 thread. Not a great idea. Therefore, on large amounts of data, solutions such as MapReduce, Apache AirFlow, Sparc and other expensive and solid tools are used.
I will try to do something comparable, just simple and cheap, using the capabilities of Сlickhouse. And so as not to slow down. I’ll show you how you can efficiently work with big data using one simple algorithm as an example. After all, Clickhouse is a fullfledged environment for executing processing jobs, and not a simple storehouse of indexed data.
The main difficulty in this task is to repeatedly read a huge data set, then calculate the Euclidean distance and group the results at each step. Everything is provided in Clickhouse to make these operations work quickly and transparently for the developer:

data is in the form of columns – we read only what we need

the data is in compressed form (lz4) – we read several times less, reduce the load on the disk

data is read in 8 streams (by default, configurable) from separate files. Without any developer effort, the multicore architecture of modern servers is involved.

Tuple type efficiently stores points of multidimensional space

a special function quickly calculates the Euclidean distance

SQL code is compiled (LLVM) and works at the speed of C ++ code

the group by operation has over 40 different optimizations and is very efficient. If grouping in memory is not possible, then the disc will automatically be used.

if the data is sharded across several servers, then the request will be sent to all shards, partial grouping will be done distributed, and the results will be combined. Without any effort on the part of the developer.
Initial data
Data can be in any table, and in order for the algorithm to have a standard interface to them, it is proposed to prepare a regular SQL View:
create or replace view YH as
select i, (toInt32(x),toInt32(y)) as Y
from sourceData;
Data is selected from the sourceData table, two columns are formed – the primary key and Tuple from the required number of numeric coordinates. In this case, two integer coordinates of the 2D space are used. If the data is sharded, then such a view must be created on all cluster nodes
Storage of centroids
Centroids is the key concept of the kmeans algorithm. These are the centers of the clusters that we are looking for and refining by successive approximations. We will keep it in a plate.
create table WCR (
ts DateTime,
j Int32,
C Tuple(Int32,Int32)
) engine = MergeTree
order by ts;
We store all iterations, but since we do not have an autoincrement, our choice is a simple timestamp. Cluster number, and centroid coordinates in 2D space. Sorted by time.
Initial initialization
In the simplest case, the centroids are initialized with the coordinates of randomly selected points in the original dataset. How many clusters you want – there are so many centroids and you need to initialize. The algorithm is very vulnerable to the initial location of the centroids – it may not converge.
Therefore, in a more complex version of initialization (kmeans ++) centroids are initialized, albeit probabilistically, but taking into account the distance between points in space. It is argued that this is better. So we will do it.
The first centroid is randomly selected (here the number is 40):
insert into WCR select now(), 1, Y
from YH
limit 40,1;
Subsequent centroids:
insert into WCR
select now(), (select j from WCR order by ts desc limit 1)+1 as j, y
from (
select y,
sum(d) over () as total,
sum(d) over (rows between unbounded preceding and current row ) as cum
from (
select argMin(Y, L2Distance(Y,C) as dx2) as y, min(dx2) as d
from YH
cross join (select * from WCR order by ts desc limit 1 by j) as WCR
where Y not in (select C from WCR)
group by Y
)
)
where total * (select rand32()/4294967295) < cum
order by cum
limit 1;
This insert is run several times until the required number of centroids is reached.
Recalculation of centroids

calculate the distance from each point to all centroids

for each point we find the nearest centroid, combine the points into groups

assign the center of this group to the new centroid position

repeat until the centroid movement stops (almost)
INSERT INTO WCR SELECT
now(), j,
tuple(COLUMNS('tupleElement') APPLY avg) AS C
FROM (WITH ( SELECT groupArray(j), groupArray(C)
FROM (select j,C from WCR order by ts desc limit 1 by j) ) AS jC
SELECT untuple(Y), i,
arraySort((j, C) > L2Distance(C, Y), jC.1, jC.2)[1] AS j
FROM YH
)
GROUP BY j

we take the centroids of the previous step:
select j,C from WCR order by ts desc limit 1 by j

make a tuple of them with two arrays inside (number and position)

passing these arrays to the function
arraySort
, which, for each point, sorts the centroids in ascending order of the distance from the point to the centroid. We take only the first element of the array – the number of the centroid closest to the point. 
untuple(Y)
expands the coordinates of a point into separate columns. Their names will be liketupleElement(Y.1)

COLUMNS('tupleElement')
– takes all columns at the specified regex 
group by j
– grouping all points by centroid number 
APPLY avg
– applies an aggregate function to the selected columns – takes the average over each coordinate, calculating the new position of the centroid. 
tuple()
– the resulting list of separate (but already aggregated) columns is folded back into a single tuple. 
insert into
– we write new centroids and timestamp in the table.
Result
All steps of the algorithm are saved in the WCR table. There is a timestamp for each step, there is a centroid position. To get the data necessary for visualization, you can write a query similar to a regular iteration, which will calculate the belonging of each point to the cluster:
with tuple(COLUMNS('tupleElement')) as a
select a.1 as x,
if(j=1,a.2,null) as p1,
if(j=2,a.2,null) as p2,
if(j=3,a.2,null) as p3,
if(j=4,a.2,null) as p4,
if(j=5,a.2,null) as p5
from ( WITH ( SELECT groupArray(j), groupArray(C)
FROM (select j,C from WCR order by ts desc limit 1 by j) ) AS jC
SELECT untuple(Y), i,
arraySort((j, C) > L2Distance(C, Y), jC.1, jC.2)[1] AS j
FROM YH
)
Conclusion
The algorithm itself works quickly, with the usual for kmeans accuracy – it solves difficult situations for a person (as in KDPV) logically, but on clearly separated groups of points it gets confused. There are newer and more interesting algorithms.
The goal of the article was to show how to use some of the Clickhouse tools for relatively complex work with big data.
All the SQL queries described above are collected in a bash script that sequentially initializes the centroids, refines them in a loop, and ends when the wanderings are stopped. If desired, you can rewrite it to any convenient PL, taking into account the specifics of the task and the specific architecture of data storage.
The project is available on Github
The idea of implementing kmeans in SQL was taken from an article by Terradata employees – Programming the Kmeans Clustering Algorithm in SQL However, everything is written there in classic SQL. It turned out very difficult, with additional generators of SQL code. I tried to make it simpler and more concise, using a specific SQL dialect and Clickhouse functions.
Variable names in SQL queries generally correspond to the names in the article. It offers an interesting naming scheme, and to some extent I tried to match it. Useful if you want to understand how an algorithm and its implementations work.