Golang TSV data generator for import into ClickHouse

While writing an article about the Data Vault for rabbit breeding, the need arose to generate a lot of data for ClickHouse. All the generators I looked at, I still couldn’t figure out how to make 50GB of data quickly and efficiently with their help. So I decided to have fun and spend Saturday afternoon with an old friend. I’ll say right away that I don’t develop in Golang. It’s more of a hobby. So please don’t judge harshly.

So, the task:

  • generate data for Data Vault from point N to current day

  • do this in TSV format, since ClickHouse simply loads this format with lightning speed

  • generation must be fast

  • there should be a lot of data

  • relationships between tables must be supported

I will also attach a diagram of Vault, for which everything is being done. The description can be read at the link at the beginning of the article.

In general, everything sounds disgusting. If dependencies mean memory. What would it be like without them in the database?

After much thought over a cup of coffee, I formulated for myself the concept of a future utility:

  • data must be written to disk in a stream at the time of formation, and disappear from memory

  • Only a short period of time should be stored in memory

Next, it was decided to implement similar pipes using Golang streams, which were supposed to generate primary keys and time series.

First of all, I sketched out the structure that will be used for generation:

type Context struct {
    pk string
    timestamp_ time.Time
    hash string
    source_ string
    RandomHash []string
}

type TableGenerator struct {
    files map[string]*os.File
    numPipe <-chan int
    datePipe <-chan time.Time
    context Context
}

The main structure of TableGenerator will actually store links to the files being written to, pipes, as well as context.

First, let’s create two methods that will allow us to initialize the structure and also add files for writing.

// Инициалиазция
func (t *TableGenerator) Init() {
    t.files = make(map[string]*os.File)
}

// Открыть файл для очередной таблицы
func (t TableGenerator) AddFile(tableName string) *os.File {
    f, err := os.Create(fmt.Sprintf("tables/%s.tsv", tableName))    
    if err != nil {
        panic(fmt.Sprintf("TableGenerator.AddFile: %s", err.Error()))
    }
    t.files[tableName] = f
    return t.files[tableName]
}

The initial initialization of the structure and opening of files in the main method of the application looks like this:

var generator TableGenerator
generator.Init()
for _, tbl := range tableList {
    f := generator.AddFile(tbl)
    defer  f.Close()
}

After the files are opened, we need a recursive method to populate the tables. At the top level we will run through the parent tables – h_cities – and go down at each new level below to h_animals. Thus, for each top-level row, a set of child rows will be obtained. And we will be able to sequentially write data going down and returning back to the parent. Thus, at the lower level of recursion it is enough to know the context of only one parentwhich is what we need – less memory consumption.

  • proud (level 1)

  • farms for cities (level 2)

  • rabbits for farms (level 3)

The method signature is simple:

func WriteTable (table Table, generator TableGenerator, pContext Context) {
  ...
}

table is some table metadata. I decided to generate them from JSON based on the following structure:

type Field struct {
    Name string `json:"name"`
    Type string `json:"type"`
    Values []string `json:"values"`
    Const string `json:"const"`
    Delta int `json:"delta"`
    ConcatWithParent int `json:"concatWithParent"`
    LinkField string `json:"linkField"`
}

type Table struct {
    Name string `json:"name"`
    LinkName string `json:"linkName"`
    Rownum int `json:"rownum"`
    MinRownum int `json:"minRownum"`
    MaxRownum int `json:"maxRownum"`
    Fields []Field `json:"fields"`
    Childs []Table `json:"childs"`
}

type DataBase struct {
    Tables []Table `json:"tables"`
}
Who is interested in an example of my JSON
{
	"tables": [
		{
			"name": "h_cities",
			"rownum": 10,
			"fields": [
				{"name": "city_code", "type": "pk"},
				{"name": "city_hsh", "type": "hash"}, 
				{"name": "timestamp_", "type": "timestamp", "delta": 3000}, 
				{"name": "source_", "type": "string", "const":"R"}
			],
			"childs": [
				{
					"name": "h_farms",
					"linkName": "l_city_farms",
					"minRownum": 1,
					"maxRownum": 10,
					"fields": [
						{"name": "farm_num", "type": "pk", "concatWithParent": 1},
						{"name": "farm_hsh", "type": "hash"}, 
						{"name": "timestamp_", "type": "timestamp"}, 
						{"name": "source_", "type": "string", "const":"R"}
					],
					"childs": [
						{
							"name": "h_animals",
							"linkName": "l_animal_farms",
							"minRownum": 1,
							"maxRownum": 100,
							"fields": [
								{"name": "animal_num", "type": "pk", "concatWithParent": 1},
								{"name": "animal_hsh", "type": "hash"}, 
								{"name": "timestamp_", "type": "timestamp", "start": "parent"}, 
								{"name": "source_", "type": "string", "const":"R"}
							],
							"childs": [
								{
									"name": "s_animal_attrs",
									"type": "satellite",
									"rownum": 1,
									"fields": [
										{"name": "city_hsh", "type": "copy", "linkField": "parent.hash"},
										{"name": "timestamp_", "type": "copy", "linkField": "parent.timestamp_"},
										{"name": "source_", "type": "copy", "linkField": "parent.source_"},
										{"name": "sex", "type": "list", "values": ["М", "Ж"]},
										{"name": "color", "type": "list", "values": ["Черный", "Белый", "Красный"]},
										{"name": "birthdate", "type": "copy", "linkField": "timestamp_"}
									]
								},
								{
									"name": "s_animal_lifecycle",
									"type": "satellite",
									"rownum": 1,
									"fields": [
										{"name": "city_hsh", "type": "copy", "linkField": "parent.hash"},
										{"name": "timestamp_", "type": "copy", "linkField": "parent.timestamp_"},
										{"name": "source_", "type": "copy", "linkField": "parent.source_"},
										{"name": "status", "type": "list", "values": ["Жив", "Мертв", "Продан", "Продан живым"]}
									]
								},
								{
									"name": "l_animal_tree",
									"type": "recursion",
									"rownum": 1,
									"fields": [
										{"name": "animal_hsh", "type": "copy", "linkField": "parent.hash"},
										{"name": "animal_mother_hsh", "type": "parentRandom"},
										{"name": "timestamp_", "type": "copy", "linkField": "parent.timestamp_"},
										{"name": "source_", "type": "copy", "linkField": "parent.source_"}
									]
								}
							]
						},
						{
							"name": "l_farm_referals",
							"rownum": 1,
							"fields": [
								{"name": "ref_farm_hsh", "type": "copy", "linkField": "parent.hash"},
								{"name": "attract_farm_hsh", "type": "parentRandom"},
								{"name": "timestamp_", "type": "copy", "linkField": "parent.timestamp_"},
								{"name": "source_", "type": "copy", "linkField": "parent.source_"}
							]
						}
					]
				},
				{
					"name": "s_city_attrs",
					"type": "satellite",
					"rownum": 1,
					"fields": [
						{"name": "city_hsh", "type": "copy", "linkField": "parent.hash"},
						{"name": "timestamp_", "type": "copy", "linkField": "parent.timestamp_"},
						{"name": "source_", "type": "copy", "linkField": "parent.source_"},
						{"name": "name_", "type": "string"}
					]
				}
			]
		}
	]
}

So the files are open, let’s start crawling the JSON. The first is to fill the top-level table – h_cities – it is the smallest.

Each table, before being written, must initialize pipes in accordance with the field types. Therefore, we bypass the table structure:

// откроем пайпы для полей разного типа
for _, fld := range table.Fields {
    switch (fld.Type) {
        case "pk":
            generator.numPipe = GenKey(1, rownum)
        case "timestamp":
            sdate := generator.context.timestamp_
            // если есть дельта - то нужно отнять от текущего ее и использовать эту дату для старта timeline
            if (fld.Delta > 0) {
                sdate = time.Now().AddDate(0, 0, -fld.Delta)
            }
            generator.datePipe = GenTimeline(1, rownum, sdate)
    }
}

That is, if the type pk (primary key) is specified for the generated field, then we start the pipe with the generation of a number from 1 to the number of rows in the table. If the type is timestamp, then we start the time series.

What rules are selected for date generation:

  • for top-level tables (h_cities), delta -N days from the current date are specified. Dates for the creation of cities begin to be generated from it

  • for tables at a lower level – h_farms, the interval from the city generation date to the current day is taken. That is, as long as the city exists to this day, farms can open

  • and so on down the hierarchy. The generator.context.timestamp_ variable stores the date of the parent table.

Well, the actual implementation of the pipes itself:

// Генерируем последовательность чисел от snum до enum
func GenKey(snum, enum int) <-chan int {
    numPipe := make(chan int)
    // функция - поток, которая будет играть роль пайпы для генерации числового ряда
    go func() { 
        for i := snum; i <= enum; i++ {
            numPipe <- i
        }
        close(numPipe)
    }()
    return numPipe
}

// Данный пайплайн нужен, чтобы реализовать механизм генерации по временному ряду
func GenTimeline(snum, enum int, sdate time.Time) <-chan time.Time {
    timePipe := make(chan time.Time)
    // функция - поток, которая будет играть роль пайпы для генерации временного ряда
    go func() {
        // первым этапом сделаем равномерный ряд (неравномеризацию будем накручивать следом)
        // просто рассчитаем шаг каждой строки и будем его прибавлять к стартовой дате
        step := (time.Now().Sub(sdate).Hours() / 24)/(float64)(enum - snum + 1)
        for i := snum; i <= enum; i++ { 
            timePipe <- sdate.AddDate(0, 0, (int)(step * (float64)(i)))
        }
        close(timePipe)
    }()
    return timePipe
}

The first method is simple – it throws out a number from 1 to N at the request of the main thread. This is done in order to relieve the main code of counters and other unnecessary rubbish.

The second one is more interesting. It generates a date from T to the current one. Does it evenly. Thus, working in pairs, both of these methods will return data for sequential recording to the main code, but at the same time the main algorithm remains free from storing any unnecessary intermediate information about the current stage of data generation for a specific table.

After the pipes are open, we proceed to forming the line

for _, fld := range table.Fields {
  ...
  switch (fld.Type) {
      case "pk":
          generator.context.pk = strconv.Itoa(<-generator.numPipe)
          str = generator.context.pk
      case "timestamp":
          generator.context.timestamp_ = <-generator.datePipe
          str = generator.context.timestamp_.Format("2006-01-02")
      case "string":
          str = GenString()
      case "list":
          str = GenStringByList(fld.Values)
      case "copy":
          str = fieldsMap[fld.LinkField]
      case "parentRandom":
          str = pContext.RandomHash[rand.Intn(len(pContext.RandomHash))]
      default:
          str = "UNKNOWN_COLUMN_TYPE"
  }
  ...
  fields = append(fields, str)
}

All generated fields are saved in the fields slice.

Some generators are just functions, not threads. This was done because they do not have to store the context throughout the entire filling of the table – therefore there is no need to complicate it. But the primary key and time series contain context and it is more convenient to store it in a separate parallel method.

Particular attention should be paid to the field type parentRandom. This is a mechanism for ensuring a connection with a random parent. The generator.context.RandomHash field stores random N hashes of the parent, and when necessary we simply read one random one. Thus, recursion + a small window of hashes helps us randomize the relationships in the database. Which gives an interesting effect. Since the data is written in order, the data hashes of the parents are always created earlier than the children. Which also makes the data logical.

After the field slice is filled, we expand it into the write method – this is simply writing data to the file system in TSV format, there is nothing interesting inside:

// записываем основную таблицу
TSVWriteLine(generator.files[table.Name], fields...)

The final fun part is putting it all into recursion to create complex structures in one universal method:

// уходим в рекурсию по дочерним таблицам
for _, tbl := range table.Childs {
    WriteTable(tbl, generator, generator.context)
}

Taken together, the entire generation method looks like this:

func WriteTable (table Table, generator TableGenerator, pContext Context) {
  ...
  generator.numPipe, generator.datePipe = table.OpenPipes()
  ...
  for i := 1; i <= rownum; i++ {
    fields := table.CreateFields(&generator.context, pContext)
    ...
    TSVWriteLine(generator.files[table.Name], fields...)
    ...
    for _, tbl := range table.Childs {
        WriteTable(tbl, generator, generator.context)
    }
  }
}

As a result. 200 lines of code capable of generating 1GB of linked data per minute. This data is loaded into ClickHouse at approximately the same speed; it seemed even faster to me.

Resource consumption:

Basically writing to disk. While 50GB of data was being generated, nothing changed.

The data itself looks something like this:

h_cities

h_cities

Let’s take city 69 and look at its farms:

h_farms

h_farms

As you can see, despite the fact that there are no huge tables in the code for creating connections, the connections were created successfully + the time for creating farms starts from the date the city was created.

As a result, we have achieved consistent generation of large volumes of related data in 200 lines of code. Due to:

  • pipes that took out the context and did not have to create a bunch of variables for each table

  • sequential generation from time series

  • recursion – each child table is created by the same code as the parent

  • placing the config in hellish JSON

Well, in the main article I promised to tell where such strange city names came from – passionate goose And passionate slipper. Everything is simple here. To get things done quickly and have a good time, I used a simple method:

// Генерируем случайную строку
func GenString() string {
    // TODO утащить это в настроечный файл
    rndTable := [][]string {{"Милый", "Красный", "Малый", "Большой", "Страстный", "Кривой", "Высокий", "Томный", "Хромой","Отличный",
                             "Ужасный", "Великий", "Нижний", "Верхний", "Суровый", "Крошечный", "Мытарский",
                             "Упоротый", "Пьяный", "Шебутной", "Воскресший", "Наивный", "Хвостатый", "Няшный"},
                            {"Рог", "Нос", "Хряк", "Жук", "Зуб", "Рот", "Утес", "Яр", "Мост","Журавль", "Слон", "Конь", "Тапок", "Танк",
                             "Люк", "Мух", "Хряк", "Гусь", "Жбан", "Клоп", "Сон", "Портвейн"}}

    n0 := rand.Intn(len(rndTable[0]))
    n1 := rand.Intn(len(rndTable[1]))

    return fmt.Sprintf("%s %s", rndTable[0][n0], rndTable[1][n1])
}

So, when I tell my non-IT friends that writing code is also fun, they don’t believe it. But in vain. Dear Jug and Terrible Bug would argue with them 🙂

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *