Infrastructure for data engineer Kafka

In this article I want to show how Kafka can be used in data engineering and how it “touch“.

I don't want to repeat important points regarding Kafka architecture, so I recommend that you read this video.

It does a good job of covering the main concepts that will be used later in the article, such as:

  • What's happened producer.

  • What's happened consumer.

  • What's happened topic.

  • What's happened offset.

  • What's happened commit.

  • What's happened partition .

  • What's happened replication .

All code that will be used in the article will be available in my repository.

Deploying the service

Let's start by deploying Kafka locally in Docker. To do this, we'll create docker-compose.yaml with the following code:

version: '3.8'  
  
services:  
  zookeeper:  
    image: 'confluentinc/cp-zookeeper:7.7.0'  
    hostname: zookeeper  
    container_name: zookeeper  
    environment:  
      ZOOKEEPER_CLIENT_PORT: 2181  
      ZOOKEEPER_TICK_TIME: 2000  
    ports:  
      - '2181:2181'  
  
  kafka:  
    image: 'confluentinc/cp-kafka:7.7.0'  
    hostname: kafka  
    container_name: kafka  
    depends_on:  
      - zookeeper  
    environment:  
      KAFKA_BROKER_ID: 1  
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181  
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9092  
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:19092  
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT  
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT  
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1  
    ports:  
      - '9092:9092'  
      - '19092:19092'  
  
  kafka-ui:  
    image: 'provectuslabs/kafka-ui:v0.7.2'  
    container_name: kafka-ui  
    ports:  
      - '8080:8080'  
    environment:  
      KAFKA_CLUSTERS_0_NAME: local  
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092  
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181  
    depends_on:  
      - kafka  
  
networks:  
  default:  
    name: kafka-network

To start all services, run the command:

docker-compose up -d

After that, we will launch Kafka, ZooKeeper and UI for Apache Kafka.

UI for Apache Kafka will be available at http://localhost:8080/ through which you can: create topicdelete topicview messages in topic and more. A very convenient tool for working with Kafka.

Creating and deleting a topic

In this section we will try to create and delete topic.

Creating and deleting a topic via CLI

To create topic you need to run the commands below.

Enter the container with Kafka:

docker exec -it kafka /bin/bash

Creation topic test in Kafka:

kafka-topics --create --topic test --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

View all available topic in Kafka:

kafka-topics --list --bootstrap-server kafka:9092

Removal topic test in Kafka:

kafka-topics --delete --topic test --bootstrap-server kafka:9092

Creating and Deleting Topics via Python

If you are more comfortable interacting with Kafka via Python, that's not a problem.

To work with Kafka we will need a library confluent-kafka. In the examples below I use version 2.5.0. The entire code and list of all dependencies is in my repository.

Likewise, these operations can be performed without connecting to a Kafka container, but through Python.

To create topic via Kafka:

from confluent_kafka.admin import AdminClient, NewTopic  
  
admin_client = AdminClient({'bootstrap.servers': 'localhost:19092'})  
  
  
def example_create_topics(a: AdminClient = None, topics: list[str] = None) -> None:  
    """  
    Функция для создания `topic` в Kafka  
    :param a: AdminClient с параметрами инициализации. Default `None`.    :param topics: Список `topic` для создания. Default `None`.    :return: Ничего не возвращает  
    """  
    new_topics = [NewTopic(topic, num_partitions=1, replication_factor=1) for topic in topics]  
        try:  
            f.result()  # The result itself is None  
            print("Topic {} created".format(topic))  
        except Exception as e:  
            print("Failed to create topic {}: {}".format(topic, e))  
  
  
example_create_topics(  
    a=admin_client,  
    topics=['test'],  
)

Important: The IDE may complain that the module NewTopic does not exist, but it is. This is the official package. This applies to version 2.5.0.

To remove topic:

from confluent_kafka.admin import AdminClient  
  
admin_client = AdminClient({'bootstrap.servers': 'localhost:19092'})  
  
  
def example_delete_topics(a: AdminClient = None, topics: list[str] = None) -> None:  
    """  
    Функция для удаления `topic` в Kafka.  
    :param a: AdminClient с параметрами инициализации. Default `None`.    :param topics: Список `topic` для удаления. Default `None`.    :return: Ничего не возвращает.  
    """  
    fs = a.delete_topics(topics, operation_timeout=30)  
  
    # Wait for operation to finish.  
    for topic, f in fs.items():  
        try:  
            f.result()  # The result itself is None  
            print("Topic {} deleted".format(topic))  
        except Exception as e:  
            print("Failed to delete topic {}: {}".format(topic, e))  
  
  
example_delete_topics(  
    a=admin_client,  
    topics=['test'],  
)

More examples of using the library confluent_kafka V official GitHub of the project.

Kafka CLI

The CLI is a popular option for interacting with Kafka. It is not on your device by default, so you need to download it with the following command:

wget https://archive.apache.org/dist/kafka/3.8.0/kafka_2.13-3.8.0.tgz

Then unpack:

tar -xzf kafka_2.13-3.8.0.tgz

After running these commands, we can use the CLI to interact with Kafka.

Important: All executable files are located in the folder bin. Therefore, it is worth paying attention that all scripts will be executed from it.

To go to the folder bin you need to run the command:

cd kafka_2.13-3.8.0/bin/

Writing to Kafka via CLI

To write to Kafka, run the command:

echo 'Hello, Kafka!' | sh kafka-console-producer.sh --broker-list localhost:19092 --topic test

Or like this:

echo 'Hello, Kafka!' | ./kafka-console-producer.sh --broker-list localhost:19092 --topic test

Important: I'm more used to calling the script with the command shbut it is also possible through ./.

You can still create producer in interactive mode with the command:

sh kafka-console-producer.sh --broker-list localhost:19092 --topic test

After creating such producer We have the opportunity to write all the messages we want.

After executing the command, we will have [> и после чего мы сможем вводить сообщения для Kafka.

Для выхода из интерактивного режима несколько раз нажмите CTRL + C.

Чтение из Kafka через CLI

Важно: topic в Kafka можно читать “с конца” и “с начала“.

Чтобы начать читать с самого начала:

sh kafka-console-consumer.sh --bootstrap-server localhost:19092 --topic test --from-beginning

Чтобы начать читать с конца и получать только новые сообщения:

sh kafka-console-consumer.sh --bootstrap-server localhost:19092 --topic test

Kafka Python

Как было описано выше мы можем взаимодействовать с Kafka через Python. Поэтому сейчас рассмотрим также операции записи и чтения с использованием Python.

Запись в Kafka через Python

Я приведу пример той записи, которая может появиться в вашей Kafka – это информация о пользователе.
Запись будет содержать: uuid, first_name, last_name, middle_name.

Вы можете запустить код ниже и в topic my_topic начнут записываться значения.

import json  
import time  
from confluent_kafka import Producer  
from faker import Faker  
import uuid_utils as uuid  
  
  
def generate_list_of_dict() -> dict[str, str]: fake = Faker(locale="ru_RU") return { 'uuid': str(uuid.uuid7()), 'first_name': fake.first_name(), 'last_name': fake.last_name(), 'middle_name': fake.middle_name(), } # Define the Kafka configuration conf = {'bootstrap.servers': "localhost:19092"} # Create a Producer instance with the above configuration producer = Producer(conf) while True: # Define some data to send to Kafka data = generate_list_of_dict() # Convert the data to a JSON string data_str = json.dumps(data) # Produce a message to the "my_topic" topic producer.produce(topic="my_topic", value=data_str) # Flush the producer to ensure all messages are sent producer.flush() # Sleep for a second before producing the next set of messages time.sleep(3)

Important: If topic If it has not been created before, it will be created the first time it is written.

Reading from Kafka via Python

In order to read values ​​from Kafka we need to create consumer. The function below has the ability to read topic from the very beginning and from a certain point offset.

from confluent_kafka import Consumer, KafkaError, TopicPartition  
  
  
def consume_messages(topic: str = None, offset: int = None) -> None:  
    conf = {  
        'bootstrap.servers': 'localhost:19092',  
        'group.id': 'mygroup',  
        'auto.offset.reset': 'earliest'  
    }  
  
    consumer = Consumer(conf)  
  
    if offset is not None:  
        partitions = consumer.list_topics(topic).topics[topic].partitions  
        for partition in partitions:  
            consumer.assign([TopicPartition(topic, partition, offset)])  
    else:  
        consumer.subscribe([topic])  
  
    try:  
        while True:  
            msg = consumer.poll(1.0)  
            if msg is None:  
                continue  
            if msg.error():  
                if msg.error().code() == KafkaError:  
                    print('Reached end of partition')  
                else:  
                    print(f'Error: {msg.error()}')  
            else:  
                print(f'Received message: {msg.value().decode("utf-8")}')  
    except KeyboardInterrupt:  
        pass  
    finally:  
        consumer.close()  
  
  
# Читать с начала  
consume_messages('test')  
  
# Читать с определенного offset  
# consume_messages('test', offset=5)

We read earlier topic in Kafka without using groups and therefore the attribute --from-beginning triggered every time it was called (a new group was created each time).

But when creating consumer via Python indication group.id is mandatory and therefore we may face the following problem: if we read once topicthen when we restart the code we will start reading only new messages and even the attribute auto.offset.reset won't help.

And all this is happening because we have produced commit (fixation) offset for the group.

To check what it is now offset the group is located, you need to run the command in Kafka:

sh kafka-consumer-groups.sh --bootstrap-server localhost:19092 --group mygroup --describe

And we will see that we have read all the messages. Therefore offset stands on the last message in topic.

Actually, this is not a problem, because this offset Can “reset“, to do this you need to run the command:

sh kafka-consumer-groups.sh --bootstrap-server localhost:19092 --group mygroup --to-earliest --reset-offsets --execute --topic test

You can also read topic having changed again group.id. But to do this not recommended.

Using Kafka in Data Engineering

Kafka is a frequent guest in data engineering because Kafka allows you to quickly and cheaply cover many business tasks, such as:

CDC

When implementing CDC, you may encounter Kafka because it is “standard” when working with this type of events.

If you want to understand what CDC is and what role Kafka plays there, you can read my article: CDC on Primitives.

Event-driven

Because Kafka allows us to receive changes “instantly“There are certain nuances in this definition, but that is a topic for another conversation.

If we return to the thought above, then receiving all the events “instantly“we can react to them.

For example: a customer visits our online store website and when entering a category or section, we can make him an offer or rebuild the page for him, depending on his preferences or previously set conditions.

Real-time Analytics

Kafka is also quite often used for real-time analytics. If we receive event messages constantly and “instantly“, then we can respond to them and monitor our metrics.

For example: marketing campaigns. We launch some campaign and immediately look at the indicators that are important to us. Depending on the values ​​we receive, we can change the terms of the campaign, the placement conditions, etc.

Resume

Kafka is a popular tool, so finding literature, videos and usage examples is not a problem. In this article, I have shown only the tip of the iceberg, which can be studied and studied.

When it comes to interaction with Kafka, CLI and Python are not the only tools, you can add PySpark, ClickHouse, Java, etc.

By the way, how to read from Kafka using ClickHouse was described in my article: CDC on primitives.

For a more in-depth study of the tool, I recommend reading the book: Apache Kafka. Stream processing and data analysis” (authors – N. Narkhid, G. Shapira, T. Palino, year of publication – 2019). It describes many subtleties and pitfalls when working with Kafka. The second edition has already been published, I have not read it, but judging by the content; new points are taken into account, so I would recommend studying the second edition.

Well, and the most important thing – Theory without practice is dead, practice without theory is blind. So try Kafka, even on pet projects or within the framework of this article.


Also, if you need a consultation/mentoring/mock interview and other questions on data engineering, you can contact me. All contacts are listed by link.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *