k-means in Clickhouse

Algorithm k-means well known and used when you need to quickly divide a data array into groups or so-called. “clusters”. It is assumed that each data element has a set of numerical metrics, and we can talk about both the position of a point in a certain multidimensional space, and about their mutual proximity.

k-means belongs to the category of EM-algorithms (Expectanion-Maximization), where we alternately determine how correct the current division of points into clusters is, and then slightly improve it.

This rather simple algorithm was formulated back in the 1950s, and has since been implemented in a variety of programming languages. There are implementations for MySQL and Postgress, and even Excel.

Why Clickhouse

There are hundreds and thousands of k-means implementations. Why not take some popular python library, pull the data through a database driver, and process it in Python memory?

It is also possible if there is little data, and the task is to find some special area in the image in order to apply a more complex algorithm to it. But it happens that there is a lot of data. Or even VERY MUCH. Yes, Clickhouse is about Big Data.

Big Data is when there is so much data that it doesn’t fit anywhere. Not only in RAM – you don’t even have to dream about it. Often the entire data array does not fit even on one server, and you have to put several (shard). Just writing to sklearn.cluster.KMeans is to read 1Tb of data somewhere into the memory of a Python process, and then again and again, and in 1 thread. Not a great idea. Therefore, on large amounts of data, solutions such as MapReduce, Apache AirFlow, Sparc and other expensive and solid tools are used.

I will try to do something comparable, just simple and cheap, using the capabilities of Сlickhouse. And so as not to slow down. I’ll show you how you can efficiently work with big data using one simple algorithm as an example. After all, Clickhouse is a full-fledged environment for executing processing jobs, and not a simple storehouse of indexed data.

The main difficulty in this task is to repeatedly read a huge data set, then calculate the Euclidean distance and group the results at each step. Everything is provided in Clickhouse to make these operations work quickly and transparently for the developer:

  • data is in the form of columns – we read only what we need

  • the data is in compressed form (lz4) – we read several times less, reduce the load on the disk

  • data is read in 8 streams (by default, configurable) from separate files. Without any developer effort, the multi-core architecture of modern servers is involved.

  • Tuple type efficiently stores points of multidimensional space

  • a special function quickly calculates the Euclidean distance

  • SQL code is compiled (LLVM) and works at the speed of C ++ code

  • the group by operation has over 40 different optimizations and is very efficient. If grouping in memory is not possible, then the disc will automatically be used.

  • if the data is sharded across several servers, then the request will be sent to all shards, partial grouping will be done distributed, and the results will be combined. Without any effort on the part of the developer.

Initial data

Data can be in any table, and in order for the algorithm to have a standard interface to them, it is proposed to prepare a regular SQL View:

create or replace view YH as
select i, (toInt32(x),toInt32(y)) as Y
from sourceData;

Data is selected from the sourceData table, two columns are formed – the primary key and Tuple from the required number of numeric coordinates. In this case, two integer coordinates of the 2D space are used. If the data is sharded, then such a view must be created on all cluster nodes

Storage of centroids

Centroids is the key concept of the k-means algorithm. These are the centers of the clusters that we are looking for and refining by successive approximations. We will keep it in a plate.

create table WCR (
   ts DateTime,
   j Int32,
   C Tuple(Int32,Int32)
) engine = MergeTree
order by ts;

We store all iterations, but since we do not have an auto-increment, our choice is a simple timestamp. Cluster number, and centroid coordinates in 2D space. Sorted by time.

Initial initialization

In the simplest case, the centroids are initialized with the coordinates of randomly selected points in the original dataset. How many clusters you want – there are so many centroids and you need to initialize. The algorithm is very vulnerable to the initial location of the centroids – it may not converge.

Therefore, in a more complex version of initialization (k-means ++) centroids are initialized, albeit probabilistically, but taking into account the distance between points in space. It is argued that this is better. So we will do it.

The first centroid is randomly selected (here the number is 40):

insert into WCR select now(), 1, Y
from YH
limit 40,1; 

Subsequent centroids:

insert into WCR
select now(), (select j from WCR order by ts desc limit 1)+1 as j, y
from (
	select y,
	sum(d) over () as total,
	sum(d) over (rows between unbounded preceding and current row ) as cum
	from (
		select argMin(Y, L2Distance(Y,C) as dx2) as y, min(dx2) as d
		from YH
		cross join (select * from WCR order by ts desc limit 1 by j) as WCR
		where Y not in (select C from WCR)
		group by Y
	)
)
where total * (select rand32()/4294967295) < cum
order by cum
limit 1;

This insert is run several times until the required number of centroids is reached.

Recalculation of centroids

  • calculate the distance from each point to all centroids

  • for each point we find the nearest centroid, combine the points into groups

  • assign the center of this group to the new centroid position

  • repeat until the centroid movement stops (almost)

INSERT INTO WCR SELECT
now(),  j,
tuple(COLUMNS('tupleElement') APPLY avg) AS C
FROM (WITH ( SELECT  groupArray(j), groupArray(C)
			FROM (select j,C from WCR order by ts desc limit 1 by j) ) AS jC
			SELECT  untuple(Y), i,
			arraySort((j, C) -> L2Distance(C, Y), jC.1, jC.2)[1] AS j
			FROM YH
		 )
GROUP BY j
  • we take the centroids of the previous step: select j,C from WCR order by ts desc limit 1 by j

  • make a tuple of them with two arrays inside (number and position)

  • passing these arrays to the function arraySort, which, for each point, sorts the centroids in ascending order of the distance from the point to the centroid. We take only the first element of the array – the number of the centroid closest to the point.

  • untuple(Y) expands the coordinates of a point into separate columns. Their names will be like tupleElement(Y.1)

  • COLUMNS('tupleElement') – takes all columns at the specified regex

  • group by j – grouping all points by centroid number

  • APPLY avg – applies an aggregate function to the selected columns – takes the average over each coordinate, calculating the new position of the centroid.

  • tuple() – the resulting list of separate (but already aggregated) columns is folded back into a single tuple.

  • insert into – we write new centroids and timestamp in the table.

Result

All steps of the algorithm are saved in the WCR table. There is a timestamp for each step, there is a centroid position. To get the data necessary for visualization, you can write a query similar to a regular iteration, which will calculate the belonging of each point to the cluster:

with tuple(COLUMNS('tupleElement')) as a
select a.1 as x,
		   if(j=1,a.2,null) as p1,
			 if(j=2,a.2,null) as p2,
			 if(j=3,a.2,null) as p3,
			 if(j=4,a.2,null) as p4,
			 if(j=5,a.2,null) as p5
from ( WITH ( SELECT  groupArray(j), groupArray(C)
			 FROM (select j,C from WCR order by ts desc limit 1 by j) ) AS jC
			 SELECT  untuple(Y), i,
			 arraySort((j, C) -> L2Distance(C, Y), jC.1, jC.2)[1] AS j
			 FROM YH
		 )

Conclusion

The algorithm itself works quickly, with the usual for k-means accuracy – it solves difficult situations for a person (as in KDPV) logically, but on clearly separated groups of points it gets confused. There are newer and more interesting algorithms.

The goal of the article was to show how to use some of the Clickhouse tools for relatively complex work with big data.

All the SQL queries described above are collected in a bash script that sequentially initializes the centroids, refines them in a loop, and ends when the wanderings are stopped. If desired, you can rewrite it to any convenient PL, taking into account the specifics of the task and the specific architecture of data storage.

The project is available on Github

The idea of ​​implementing k-means in SQL was taken from an article by Terradata employees – Programming the K-means Clustering Algorithm in SQL However, everything is written there in classic SQL. It turned out very difficult, with additional generators of SQL code. I tried to make it simpler and more concise, using a specific SQL dialect and Clickhouse functions.

Variable names in SQL queries generally correspond to the names in the article. It offers an interesting naming scheme, and to some extent I tried to match it. Useful if you want to understand how an algorithm and its implementations work.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *