How we transferred the Clickhouse database between geographically distant data centers

Late last year, we wrote about the difficult move of our own service to a new data center in Detroit. Among other tasks, we were migrating Clickhouse. Let me remind you that we are talking about a loaded service that serves dozens of servers that accept hundreds of thousands of low latency requests per second.

In this article we tell you how we transferred data without being able to disable the service or use automatic replication.

The volume of data for Clickhouse is not that large – the process turned out to be resource-intensive rather than voluminous. But open sources of information on the mechanisms used are minimal, so consider this a guide to the clickhouse-copier utility (https://github.com/ClickHouse/copier) using a specific example with scripts and commands to run.

The simplest approach

Our service operates 24/7, i.e. You can’t just turn it off and transfer everything by copying it. We planned the move, but due to some limitations of Clickhouse itself (or rather its replication scheme), we were unable to implement it as planned.

Initially, we assumed that we would be able to connect a replica from the new one to each shard in the old data center, wait for synchronization, and then simply turn off the replicas in the old data center. But due to the geographical distance, we received too much delay in data transfer between data centers, despite the speed of 2-2.5 Gbits. As a result, the volume of data in ZooKeeper, which coordinated replication, increased by an order of magnitude. So the process had to be stopped, otherwise it threatened to slow down production. I had to look for other ways to move.

All requests coming to our service are stored in the database. In Clickhouse we store two types of data:

  • “Raw data” for two weeks. During this time, about 6 TB accumulates.

  • “Aggregations” are the results of raw data processing that are important to us. The units occupy about 300 GB.

Our first decision was to wait for the data aggregation process and transfer only that data to the new data center; Well, while this is happening, launch new shards and transfer services. So the task came down to the fact that it was necessary to find a way to transfer data and not lose a single byte from those 300 GB of aggregates.

We found a list of possible approaches in an article from Altiniti: https://kb.altinity.com/altinity-kb-setup-and-maintenance/altinity-kb-data-migration/. In our case, there were two options for further action: physical transfer or copying using clickhouse-copier.

Next, we'll talk about each of the options.

Physical data transfer

The first possible option is low-level (physical) data transfer.

Clickhouse stores data in the form of Parts, which can be physically copied from one server to another in the form of files. To do this, you need to run a similar query on the source server and unhook all Parts sequentially for all tables:

#DETACH PART/PARTITION
ALTER TABLE <DATABASE_NAME>.<TABLE_NAME> DETACH PARTITION|PART <PARTITION_EXPRESSION>

Then copy the files from the directories ClickHouse_data_dir/<DATABASE_NAME>/<TABLE_NAME>/detached to the new server, and then perform a reverse request:

#ATTACH PART/PARTITION
ALTER TABLE <DATABASE_NAME>.<TABLE_NAME> ATTACH PARTITION|PART <PARTITION_EXPRESSION>

We tested this option, but in one of the experiments we did not have the same number of columns in the tables on the source and destination servers. Perhaps something was merged incorrectly. We didn’t bother to figure it out, much less take risks in production, and decided to use a safer method.

Copying with clickhouse-copier

The second option is copying at a higher level using a utility from Clickhouse. In the database, a SELECT is performed on the source, and then an INSERT is inserted on the other side. At the same time, the utility stores the queue of all its tasks and progress in zookeeper, which eliminates problems.

There was very little information on this path in open sources. I had to figure it out from scratch.

The method seemed more reliable, although not without its peculiarities. For example, it turned out that if you run the utility on the source side, the data as a result of copying does not converge. Perhaps network problems were to blame – there was a feeling that some pieces of data simply did not reach the new data center and the utility did not record this. But if you run it on the receiver side, the data converged 100%.

The process of transferring data from one shard took 3-4 hours; another half hour was required for various related manipulations, in particular copying already inside the server, because We initially transferred the data to a “temporary” table. We could not copy directly to the production table. Due to the fact that during the copying process the cluster actually consists of machines in two data centers, at this moment we would have doubled statistics. So we copied the data from Miami to a temporary table in Detroit (overcoming the geographic distance), and then inside the data center in Detroit we merged with production, performing inserts of 500-600 million columns.

Despite the precautions, we still encountered a couple of incidents where clients exceeded the limits set in the admin panel because they displayed incomplete statistics during copying. But the total loss was $20.

Copying in practice

The copying process is as follows.

To get started, we need to put data for the utility into zookeeper.

zkCli.sh -server localhost:2181 create /clickhouse/copytasks ""

Next, we need to create a copy scheme – the schema.xml file.

<clickhouse>
    <!-- Configuration of clusters as in an ordinary server config -->
    <remote_servers>
        <source_cluster>
            <shard>
                <internal_replication>true</internal_replication>
                    <replica>
                        <host>IP</host>
                        <port>9000</port>
                        <user>default</user>
                        <password></password>
                    </replica>
            </shard>
        </source_cluster>

        <destination_cluster>
            <shard>
                <internal_replication>true</internal_replication>
                    <replica>
                        <host>IP</host>
                        <port>9000</port>
                        <user>default</user>
                        <password></password>
                    </replica>
            </shard>
        </destination_cluster>
    </remote_servers>

    <!-- How many simultaneously active workers are possible. If you run more workers superfluous workers will sleep. -->
    <max_workers>1</max_workers>

    <!-- Setting used to fetch (pull) data from source cluster tables -->
    <settings_pull>
        <readonly>1</readonly>
    </settings_pull>

    <!-- Setting used to insert (push) data to destination cluster tables -->
    <settings_push>
        <readonly>0</readonly>
    </settings_push>

    <!-- Common setting for fetch (pull) and insert (push) operations. Also, copier process context uses it.
         They are overlaid by <settings_pull/> and <settings_push/> respectively. -->
    <settings>
        <connect_timeout>3</connect_timeout>
        <!-- Sync insert is set forcibly, leave it here just in case. -->
        <insert_distributed_sync>1</insert_distributed_sync>
    </settings>

    <!-- Copying tasks description.
         You could specify several table task in the same task description (in the same ZooKeeper node), they will be performed
         sequentially.
    -->
    <tables>
        <!-- A table task, copies one table. -->

        <table_hits>
            <cluster_pull>source_cluster</cluster_pull>
            <database_pull>database</database_pull>
            <table_pull>table_local</table_pull>
            <cluster_push>destination_cluster</cluster_push>
            <database_push>database</database_push>
            <table_push>table_local1</table_push>
            <engine>
            ENGINE=ReplicatedSummingMergeTree('/clickhouse/tables/{shard}/ssp_report_common1', '{replica}')
            partition by toYYYYMMDD(sspRequestDate)
            order by (sspRequestDate, dspId, sspRequestCountry, endpointId)
            </engine>
            <sharding_key>rand()</sharding_key>
        </table_hits> 

    </tables>
</clickhouse>

Here:

  • section clickhouse describes the server – from where and where we make the copy;

  • section tables describes which tables we are copying;

  • V table_hits there is a description of the process itself.

After that, we send this file to zookeeper:

zkCli.sh -server localhost:2181 create /clickhouse/copytasks/description "`cat schema.xml`"

Next, go to the Clickhouse server and create the zookeeper.xml file necessary for the utility to work:

<clickhouse>
    <logger>
        <level>trace</level>
        <size>100M</size>
        <count>3</count>
    </logger>

    <zookeeper>
        <node index="1">
            <host>127.0.0.1</host>
            <port>2181</port>
        </node>
    </zookeeper>
</clickhouse>

Well, let’s launch it with the command:

clickhouse-copier --config-file=zookeeper.xml --task-path=/clickhouse/copytasks

After the utility has finished working, delete the data from zookeeper:

zkCli.sh -server localhost:2181 deleteall /clickhouse/copytasks

After we have checked that all the data matches, all that remains is to execute the SQL command:

INSERT INTO table_local SELECT * FROM table_local1;

In total, the move took 10 days (taking into account the fact that at the same time we were moving other parts of the service). We hope that this article will help you save time in finding an approach to solving such problems.

Authors of the article: Igor Ivanov and Denis Palaguta, Maxilect.

PS We publish our articles on several Runet sites. Subscribe to our page at VK or at Telegram channelto learn about all publications and other news from Maxilect.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *