Building an ETL pipeline for machine learning using Kafka, Clickhouse and Go

In this article, I will share with you my experience using Golang, Kafka and Clickhouse using a simple ETL pipeline for parallel transfer of JSON data to a database and subsequent temperature prediction based on machine learning.

As a continuation of my two previous articles about Kafka And Clickhouse I decided to write a small application demonstrating how we can feed real-time data through Kafka with validation and transformation to Clickhouse, which can then be used to generate time series for machine learning – in particular, how we can use meteorological data past years to predict temperature. I'll be honest, I'm not very well versed in the mathematics behind machine learning and can only share observations from trial and error, but the final model was much more accurate than I could have expected.

A few words about the technologies involved:

Kafka is a distributed messaging system that supports multiple producers/consumers. The Kafka broker splits the “topics” to which you can actually subscribe/publish into several sections (partitions) that are replicated throughout the cluster. Although it doesn't have the built-in ETL tools of Apache Spark, Kafka has surprisingly low latency and can stream data from multiple sources. I use Zookeeper to manage the cluster. This program reads data from a file because it is easier to control now. Ideally, we would feed Kafka data directly from the National Weather Service and other sources.

Clickhouse is a columnar relational database. In addition to SQL-like commands, it also includes a simple stochastic linear regression method, which is what we'll use here. I didn't specifically ask for, and in fact didn't expect, the weather data to be linear, but I wanted the machine learning part to be fairly simple and done entirely within Clickhouse.

Both of these tools are open source and free to use. I use their Docker images: https://hub.docker.com/r/wurstmeister/kafka And https://hub.docker.com/r/clickhouse/clickhouse-server/. I also use the API segmentio/kafka-go And clickhouse/clickhouse-go for Go.

I decided to use Go because of its simple concurrency model, which I wanted to draw your attention to in this project. Channels in Go provide synchronization between threads (goroutines) and allow you to prevent race conditions without the explicit use of locks and mutexes (which are also present in Go), while maintaining the flexibility and efficiency of parallel execution. If you are interested in learning more about this, then Here a simple practical example of using channels.

Architecture:

Essentially, we split data extraction, transformation, and loading into multiple goroutines that pass data between each stage via channels. The data received from Kafka in the form of JSON strings is transmitted via a channel to the structure (for verification and unpacking), which in turn is transmitted via another channel to Clickhouse. The structure here serves instead of a schema to check the data type of each field.

We read data from Kafka through consumer groups (rather than partition readers) to have more flexibility in the number of threads. Kafka's internal broker takes care of the offsets for each partition to ensure that we read each message only once, even from parallel threads.

Then the data is transformed into Clickhouse, because we need to prepare it for machine learning time series. We predict the temperature for a given hour based on the temperatures of previous hours, so we organize the data so that it can be easily fed into the Clickhouse machine learning method, which determines the weights of each parameter based on the corresponding columns. We use a window of the last 20 temperatures with their time offsets from the start of the window in addition to the month to determine what the next temperature will be on any given date. I took inspiration for this example from the article https://towardsdatascience.com/ml-approaches-for-time-series-4d44722e48fe.

Implementation:

set-up.sh (ETL)

#!/user/bin/bash

topic="temperature_data"
echo "kafka-topics.sh --delete  --topic ${topic} --bootstrap-server localhost:9092" |docker exec -i kafka /bin/bash
echo "kafka-topics.sh --create --topic ${topic} --partitions 10 --bootstrap-server localhost:9092" |docker exec -i kafka /bin/bash


table="temperatures"
# echo "show tables;" | docker exec -i clickhouse clickhouse-client

echo "truncate table if exists ${table};" |docker exec -i clickhouse clickhouse-client
ch_createtable="create table if not exists ${table} (
 Date_time DateTime('America/Los_Angeles'),
 Air_temp Float32,
 ) engine=MergeTree() ORDER BY Date_time;"

 # "create table if not exists ${table} (
 # Date_time DateTime('America/Los_Angeles'),
 # Sea_level_pressure Float32,
 # Altimeter Float32,
 # Air_temp Float32,
 # Relative_humidity Float32,
 # Dew_point_temperature Float32,
 # Wind_direction UInt16,
 # Wind_speed Float32,) engine=MergeTree() ORDER BY Date_time;"

echo ${ch_createtable} | docker exec -i clickhouse clickhouse-client

Create a Kafka topic with as many partitions as you plan to run goroutines reading from Kafka. Next, create a table in Clickhouse to store the temperature data, which we will later read from Kafka. It's not necessary to include the time zone in the data, I kept it just to remind myself how to do it. We will set the date as the primary key to organize the data.

json-to-kafka.go (ETL)

The meteorological data I downloaded was in csv format, so I ran it through an online converter and got a json file (https://csvjson.com/csv2json).

// https://www.weather.gov/lox/observations_historical
// https://csvjson.com/csv2json
// https://pkg.go.dev/github.com/segmentio/kafka-go#section-documentation
// https://pkg.go.dev/encoding/json#Marshal

package main

import (
   "github.com/segmentio/kafka-go"
   "context"
   "encoding/json"
   "os"
   "fmt"
   "time"
)

type Entry struct {
   Date_time               string
   Sea_level_pressure      float32
   Altimeter               float32
   Air_temp                float32
   Relative_humidity       float32
   Dew_point_temperature   float32
   Wind_direction          uint16
   Wind_speed              float32
}

func main() {
   const topic = "temperature_data"
   const fileName = "KLAX-data_temp-only.json"
   var read_data []byte
   var entries []Entry
   var limit int = 10000
   done_ch:=make(chan bool)
   var threads int= 10

   read_data = readData(fileName)
   json.Unmarshal(read_data, &entries) //формируем слайс со структурами из json

   fmt.Printf("How many entries to use? (/%d)", len(entries))
   fmt.Scan(&limit)
   entries = entries[:limit]

   for i:=0; i<threads; i++ {
       go writeMessages(entries, threads, i, topic, done_ch) //записываем в топик

   }

   for i:=0; i<threads; i++{ //ждем, пока все потоки не завершатся

       <-done_ch
   }
} //конец main

//читаем заданный файл и выводим []byte его содержимого

func readData(fileName string) ([]byte){
   data, err := os.ReadFile(fileName)
   if err != nil {
       panic(err)
   }
   return data
}
//продолжение ниже...

As mentioned earlier, we can use the structure to validate the data types of each record. Invalid fields are discarded. Capitalizing field names is mandatory – although it doesn't break anything in this program, in my last code I encountered problems due to the fact that the fields were not exported properly.

Either way, we can use the Unmarshal method from the json package to convert the json file into a slice of our “Entry” structure. After trimming the slice to the required length, we can use goroutines to evenly distribute the work of writing to a Kafka topic. Each goroutine opens a Writer, which will write all the records it received after unpacking the record structures back into json strings.

//записываем свою часть слайса Entry в топик и подаем сигнал о завершении

func writeMessages(entries []Entry, threads int, thread_num int, topic string, done_ch chan bool){
   //смоделировано на основе документации
   writer := &kafka.Writer{
       Addr:   kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
       Topic:  topic,
       Balancer: &kafka.LeastBytes{}, //равномерно распределяем сообщения по партициям

       BatchTimeout: 10 * time.Millisecond, //по умолчанию 1 секунда

   }
   var batch_size int = 0
   msgs := []kafka.Message{}

   var count int = 0
   for i:=thread_num; i < len(entries); i+=threads {
       count++
       //преобразуем из структуры обратно в json
       entry_bytes, _ := json.Marshal(entries[i]) //формируем в json, как []byte

       batch_size++
       if batch_size <1000 { //отправка пакетов отдельными сообщениями является слишком медленной
           msgs = append(msgs, kafka.Message{ Value:   entry_bytes,})
       }else{
           batch_size = 0
           //https://stackoverflow.com/questions/37570665/what-does-mean-when-coming-directly-after-a-slice
           err := writer.WriteMessages(context.Background(), msgs...)
           if err!= nil {
               panic(err)
           }
           msgs = []kafka.Message{}
           msgs = append(msgs, kafka.Message{ Value:   entry_bytes,})
       }
   }
   err := writer.WriteMessages(context.Background(), msgs...) //запись оставшейся части последнего батча

   fmt.Printf("WROTE(%d): %v\n", count, thread_num)

   if err = writer.Close(); err != nil {
       panic(err)
   }
   done_ch <- true //сигнал о завершении записи
}

There are two key points to note:

  • Do not use batches that are too small.

  • By default, batches are deleted every second (BatchTimeout option).

If you ignore these points, the recording will be very slow. Let's compare what we get for 10 threads that should write 1000 messages each:

go run json-to-kafka.go 0.60s user 0.58s system 10% cpu 11.726 total

with a default timeout of 1 second and 100 messages in a batch, and:

go run json-to-kafka.go 0.52s user 0.53s system 63% cpu 1.659 total

with a timeout of 10 milliseconds and 1000 messages per batch. Now imagine waiting for 100,000 messages to be written with the first set of parameters (that's about 15 minutes 🙁 ).

We signal main when each goroutine has completed recording using channels.

kafka-to-ch.go (ETL)

This is the main part of our ETL pipeline.

While our weather data is being written to Kafka, we can start reading it from Kafka to write to the database. Here, in order not to complicate the example, I parsed only the fields with date and temperature into the structure, since in the source meteorological data there are quite a lot of lines that do not contain one or more ignored fields, which is why the program discards these records during validation.

// https://pkg.go.dev/github.com/ClickHouse/clickhouse-go/v2
// https://pkg.go.dev/os/signal

package main

import (
   "fmt"
   "github.com/ClickHouse/clickhouse-go/v2"
   "github.com/segmentio/kafka-go"
   "time"
   "context"
   "encoding/json"
   "errors"
   "os/signal"
   "os"
)

// json-to-kafka.go
type Entry struct {
   Date_time               string
   // Sea_level_pressure       float32
   // Altimeter                float32

   Air_temp                float32
   // Relative_humidity        float32
   // Dew_point_temperature    float32
   // Wind_direction           uint16
   // Wind_speed               float32

}

func main() {
   conn := connect_ch() //устанавливаем соединение с clickhouse

   const number_routines_kafka = 10
   const number_routines_ch = 10
   const number_routines_tr = 10
   const topic = "temperature_data"
   const table = "temperatures"
   var readers []*kafka.Reader = []*kafka.Reader{}

   signal_ch := make(chan os.Signal)
   msgs := make(chan []byte)
   entries := make(chan Entry)

   for i:=0; i<number_routines_kafka; i++ {
       reader := kafka.NewReader(kafka.ReaderConfig{
           Brokers:   []string{"localhost:9092", "localhost:9093", "localhost:9094"},
           GroupID:   "go-consumer-group",
           Topic:     topic,
           MaxBytes:  10e6, //на батч
           QueueCapacity: 1000,
           ReadBatchTimeout: 10 * time.Millisecond,
           // это не все доступные опции
           })
       readers = append(readers, reader)
   }

   go func (signal_ch chan os.Signal){
       signal.Notify(signal_ch, os.Interrupt) // ловим нажатие Ctrl-C

   } (signal_ch)



   go func (readers []*kafka.Reader, msgs chan []byte){
       for i:=0; i<number_routines_kafka; i++ {
           go readWithReader(readers[i], msgs, i) // чтение из Kafka
       }
   } (readers, msgs)

   for i:=0; i<number_routines_tr; i++ {
       go transform(msgs, i, entries) // валидация данных
   }

   for i:=0; i<number_routines_ch; i++ {
       go batchInsert(conn, entries, table, i) //запись в Clickhouse

   }

   <-signal_ch //висит до тех пор, пока соединения не будут готовы к очистке

   for i,reader := range readers{
       fmt.Printf("Closing reader: %d\n",i)
       if err := reader.Close(); err != nil {
           fmt.Println("failed to close reader:", err)
       }
   }
   if err := conn.Close(); err != nil {
       fmt.Println("failed to close clickhouse connection:", err)
   }

   fmt.Println("yay")
} //конец main

//Подключение к Clickhouse
func connect_ch() (clickhouse.Conn) {
   conn, err := clickhouse.Open(&clickhouse.Options{
       Addr: []string{fmt.Sprintf("%s:%d", "127.0.0.1", 19000)},
       Auth: clickhouse.Auth{
           Database: "default",
           Username: "default",
           Password: "",
       },
       Debug: false,
   })
   if err != nil {
       panic(err)
   }
   fmt.Println(conn.Ping(context.Background()))
   v, err := conn.ServerVersion()
   fmt.Println(v)
   if err != nil {
       panic(err)
   }
   return conn
}

//Получаем сообщения как группа потребителей и отправляем их по каналу []byte

func readWithReader(reader *kafka.Reader, ch chan []byte, routine_num int){
   var count int = 0
   for {
       entry, err := reader.ReadMessage(context.Background())
       if err != nil {
           break
       }
       ch <- entry.Value
       count++
       fmt.Printf("READ(%d): %v\n", count, routine_num)
   }
   fmt.Printf("DONE READ(%d): %v\n", count, routine_num)
}
//продолжение ниже...

This program is a little more complicated than the previous ones. Here's a quick overview of what's going on here:

  • The readWithReader() goroutines use multiple Kafka readers of the same consumer group to pass raw messages over the msgs channel as a byte array. At the same time, the transform() goroutines read each message from this channel to validate and transform it into an Entry structure for passing through the entries channel to the final set of goroutines, batchInsert(), where they will be entered into Clickhouse.

  • The program also runs a goroutine that catches the ctrl-c (SIGINT) signal and closes the connection between the readers and Clickhouse for a graceful shutdown. After all the goroutines have been created, main will stop waiting for this signal to pass through the signal_ch channel before cleaning up.

//(скопировано из main)
go func (signal_ch chan os.Signal){
       signal.Notify(signal_ch, os.Interrupt) // ловим нажатие клавиши Ctrl-C

   } (signal_ch)

//...

<-signal_ch // ожидает до тех пор, пока соединения не будут готовы к очистке

   for i,reader := range readers{
       fmt.Printf("Closing reader: %d\n",i)
       if err := reader.Close(); err != nil {
           fmt.Println("failed to close reader:", err)
       }
   }
// остальная часть очистки...

A few points to note:

  • When you run this program, a reader consumer group will be automatically created if it has not already been created. You can run

kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --all-groups

in the shell of the running kafka container to see the offsets of messages consumed from the topic.

  • Validation at the transformation stage consists solely of checking that the fields we need are non-zero (missing fields during transformation are initialized with default values). Depending on the data, some records may be discarded incorrectly (for example, if the original data contains a temperature of 0 degrees).

//Разбиваем сообщения из канала []byte на Entries и проверяем их
//перед отправкой их по каналу Entry
func transform(ch chan []byte, routine_num int, entries chan Entry){
   var count int = 0
   for val := range ch{
       var temp Entry
       err := json.Unmarshal(val, &temp) //в структуру
       if err != nil{
           //отмечаем, но двигаемся дальше
           fmt.Println("Error unmarshaling: " + string(val))
           continue
       }
       if validify(&temp) != nil{
           fmt.Printf("Error transforming data: %v\n", temp)
           continue
       }
       entries <- temp
       count++
       fmt.Printf("TRANSFORMED(%d): %v\n", count, routine_num)
   }
}

//Проверяем, что json-строка, преобразованная в структуру, содержит корректные данные (не нули)
func validify(entry *Entry) (error){
   //преобразование даты и времени в то, что clickhouse может преобразовать в DateTime
   res, err := time.Parse("01/02/06-03:04PM", entry.Date_time)
   if err != nil{
       return err
   }
   //если какая-либо запись равна 0 (отсутствующие данные), отбрасываем ее

   // if(entry.Sea_level_pressure==0||entry.Altimeter==0||
   //  entry.Air_temp==0||entry.Relative_humidity==0||entry.Dew_point_temperature==0||
   //  entry.Wind_direction==0||entry.Wind_speed==0){
       if(entry.Air_temp==0){
           return errors.New("invalid entry")
       }
   entry.Date_time = res.Format(time.DateTime)
   return nil
}
// продолжение ниже...
  • To parse non-standard date formats in strings, we will need Parse() from the time package. This is what a string representing 3:04 PM on January 2, 2006 would look like in the format needed for parsing (for example, “01/02/06-03:04PM”) (https://pkg.go.dev/time#Parse).

//скопировано
res, err := time.Parse("01/02/06-03:04PM", entry.Date_time)
  • The AppendStruct() function from the Clickhouse-go API will help us with splitting our Entry structure into rows and adding them to the batch (as an alternative to calling Append() and explicitly passing each column).

// Вставка батчей в таблицу из канала Entry
func batchInsert(conn clickhouse.Conn, ch chan Entry, table string, routine_num int){
 var count int = 0
 for val := range ch{
       batch, _ := conn.PrepareBatch(context.Background(), "INSERT INTO " + table)
       batch.AppendStruct(&val) // ПОЛЯ СТРУКТУРЫ ДОЛЖНЫ НАЧИНАТЬСЯ С ВЕРХНЕГО РЕГИСТРА, иначе это не будет работать
       count++;
       batch.Send()
       fmt.Printf("INSERTED(%d): %d\n", count, routine_num)
 }
}
// конец программы

Machine learning in Clickhouse

predict-set-up.sh (ML)

In addition to the table that we filled in in the previous steps, we need to generate a table from this data for training the model.

This is what the table with our data looks like (this is just part of the table):

And here is part of what we want (and will) feed to our model (model_data table):

The temp array stores a window with previous temperatures (30 pieces) for each date, then the temperature of a specific date, and the time array stores the time offset from the beginning of the window for each of them. For example, the date 8/8/24 3:53 (the first row in the table) has in its temp array the temperatures of the dates 8/7/24 2:53, 8/7/24 3:53, 8/7/24 4:53 etc. Since this range starts at 8/7/24 2:53, the time offset for each entry starts at 0, then increases by 60 minutes (3600) from 2:53 -> 3:53, etc. e. It is important to maintain the time offsets because, as can be seen from the original data, the temperature was not always measured at equal intervals.

To create this table, we need to create a helper table to look up the values ​​for each window. Here's what we need:

Min_bound represents the start of each window. This format will allow you to easily group dates and temperatures into each window.

Window functions come to our aid.

-- (скопировано)


MIN(Date_time) OVER (
 ORDER BY Date_time ASC ROWS BETWEEN ${window_size} PRECEDING AND CURRENT ROW
) AS min_time,

This will get the entry date for the window_size entries before it. Then, for each date and its min_time, all dates and their temperatures that fall within the range will be added (using the WHERE clause). Although the result is multiple copies of each date, each of them belongs to a separate window, designated min_bound.

#!/user/bin/bash


window_size=30 #НЕ ИЗМЕНЯТЬ без обновления комманды модели
table="temperatures"


min_bound_table="CREATE TABLE IF NOT EXISTS helper
ENGINE = Memory
AS SELECT
   B.min_time AS min_bound,
   A.Date_time,
   A.Air_temp
FROM
(
   SELECT
       Date_time,
       Air_temp
   FROM ${table}
) AS A,
(
   SELECT
       min_time,
       Date_time,
       Air_temp,
       delta,
       a.Date_time
   FROM
   (
       SELECT
           Date_time,
           Air_temp,
           MIN(Date_time) OVER (ORDER BY Date_time ASC ROWS BETWEEN ${window_size} PRECEDING AND CURRENT ROW) AS min_time,
           Date_time - min_time AS delta
       FROM ${table}
   ) AS a
) AS B
WHERE (A.Date_time >= B.min_time) AND (A.Date_time <= B.Date_time)
ORDER BY Date_time ASC"


echo ${min_bound_table} | docker exec -i clickhouse clickhouse-client
#cont ...

After creating the auxiliary table, we can use GROUP BY min_bound to split the records into appropriate windows, from which we can then transpose the rows into a single column using groupArray(). As a result, we will receive an array of temperatures and time offsets for each date.

CREATE TABLE IF NOT EXISTS model_data
ENGINE = Memory
AS SELECT
   MAX(Date_time) AS time,
   groupArray(Air_temp) AS temp,
   groupArray(Date_time - min_bound) AS delta_t
FROM helper
GROUP BY min_bound
ORDER BY time ASC

However, we have to be careful with the earliest 30 records in our final table: there weren't enough records to fill their windows, so the oldest record was simply duplicated instead. These records can be identified by zero time offsets for records outside the minimum window boundary.

ALTER TABLE model_data (DELETE WHERE (delta_t[2]) = 0)

Our data is now in a suitable format to feed into our model.

Stochastic linear regression function Clickhouse takes a step size, a regularization factor, a batch size, and a stochastic gradient method, then target value columns, and then each parameter. Any method other than “Adam” produces NaN, and regularization doesn't seem to have much of an impact. I chose a step of 0.00004, a coefficient of 0.15 and a batch size of 3.

step=0.00004 # 0.00050
norm=0.15
batch=3 # 20
method="Adam"
train=75000 # количество записей для обучения
make_model="CREATE TABLE IF NOT EXISTS temp_model
ENGINE = Memory
AS SELECT stochasticLinearRegressionState(${step}, ${norm}, ${batch}, '${method}')
(temp[31], temp[1], temp[2], temp[3], temp[4], temp[5], temp[6], temp[7], temp[8], temp[9], temp[10],
temp[11], temp[12], temp[13], temp[14], temp[15], temp[16], temp[17], temp[18], temp[19], temp[20],
temp[21], temp[22], temp[23], temp[24], temp[25], temp[26], temp[27], temp[28], temp[29], temp[30],
delta_t[1], delta_t[2], delta_t[3], delta_t[4], delta_t[5], delta_t[6], delta_t[7], delta_t[8], delta_t[9], delta_t[10],
delta_t[11], delta_t[12], delta_t[13], delta_t[14], delta_t[15], delta_t[16], delta_t[17], delta_t[18], delta_t[19], delta_t[20],
delta_t[21], delta_t[22], delta_t[23], delta_t[24], delta_t[25], delta_t[26], delta_t[27], delta_t[28], delta_t[29], delta_t[30],
delta_t[31], toMonth(time)) AS state
FROM (SELECT temp, delta_t, time FROM model_data ORDER BY time ASC LIMIT ${train})"


echo ${make_model} | docker exec -i clickhouse clickhouse-client

I also decided to include month as a parameter due to the likely correlation between month and temperature. In the example code, I only train the model on the first 75,000 of the 100,000 records.

Now let's test the model on the full data set. The last records were not part of the training set.

run_model="WITH (
       SELECT state
       FROM temp_model
   ) AS model
SELECT
time,
evalMLMethod(model, temp[1], temp[2], temp[3], temp[4], temp[5], temp[6], temp[7], temp[8], temp[9], temp[10],
temp[11], temp[12], temp[13], temp[14], temp[15], temp[16], temp[17], temp[18], temp[19], temp[20],
temp[21], temp[22], temp[23], temp[24], temp[25], temp[26], temp[27], temp[28], temp[29], temp[30],
delta_t[1], delta_t[2], delta_t[3], delta_t[4], delta_t[5], delta_t[6], delta_t[7], delta_t[8], delta_t[9], delta_t[10],
delta_t[11], delta_t[12], delta_t[13], delta_t[14], delta_t[15], delta_t[16], delta_t[17], delta_t[18], delta_t[19], delta_t[20],
delta_t[21], delta_t[22], delta_t[23], delta_t[24], delta_t[25], delta_t[26], delta_t[27], delta_t[28], delta_t[29], delta_t[30],
delta_t[31], toMonth(time)) AS predicted,
   temp[31] AS actual
FROM model_data ORDER BY time DESC LIMIT 200"

echo ${run_model} | docker exec -i clickhouse clickhouse-client

The predicted values ​​are in the middle and the actual values ​​are on the right.

Pretty good for this interval, although there was a 10-degree deviation for 8/7 18:53. I saved all the results into a .csv file so I could plot it later using the following command:

echo "sql command..." | docker exec -i clickhouse clickhouse-client --format CSV> result.csv

Results:

Above are the predictions of my best model. Her maximum and minimum predictions were very accurate, and the average deviations between each predicted temperature and the actual temperature were almost zero.

Before showing his graph in Excel, here are some of my previous results for context:

step=0.00055, norm=0.1, batch=10

First try

First try

Wow, these are deviations. If you ignore the obvious sharp deviations, the upper limit of the predicted values ​​looks almost like a flat line.

step=0.00010, norm=0.15, batch=5, method=”Adam”
Second try

Second try

This option looks better. The deviations are less extreme and the predicted values ​​follow the general trend.

And now my final model:

step=0.000040, norm=0.15, batch=3, method=”Adam”
Last try

Last try

Deviations are reduced and forecasts more closely follow the sinusoidal trend. If you separate the data plots from the last attempt in Excel, you will notice that the peaks are indeed present in the original data, but the model is simply exaggerating them, as, for example, with the sudden dip in June 2016. Ignore the y-axis values ​​in this case:

In these three graphs, I reduced both the step size and the batch size. Large step sizes led to wild fluctuations in results between successive runs and extreme outliers. Increasing the batch size tended to reduce the magnitude of these emissions, but it also averaged out the forecasts, resulting in a flat line that varied minimally throughout the year. (Interestingly, only the upper bound was smoothed out; the minimum values ​​continued to follow the correct trend in the colder months of the year). Reducing the degree of “flattening” by reducing the batch size, as well as reducing the amount of emissions by reducing the step size, allowed us to obtain the most accurate results.

Adding more relevant factors such as humidity or barometric pressure would probably improve the accuracy of the model, but my main focus was on the ETL pipeline anyway, so it's good enough.

Thanks for your time! I hope you learned something useful.

You can learn everything about working with ClickHouse: from installation and configuration to product solutions in the online course “ClickHouse for Database Engineers and Architects” under the guidance of experts.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *