Infrastructure for data engineer Kafka
In this article I want to show how Kafka can be used in data engineering and how it “touch“.
I don't want to repeat important points regarding Kafka architecture, so I recommend that you read this video.
It does a good job of covering the main concepts that will be used later in the article, such as:
What's happened
producer
.What's happened
consumer
.What's happened
topic
.What's happened
offset
.What's happened
commit
.What's happened
partition
.What's happened
replication
.
All code that will be used in the article will be available in my repository.
Deploying the service
Let's start by deploying Kafka locally in Docker. To do this, we'll create docker-compose.yaml
with the following code:
version: '3.8'
services:
zookeeper:
image: 'confluentinc/cp-zookeeper:7.7.0'
hostname: zookeeper
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- '2181:2181'
kafka:
image: 'confluentinc/cp-kafka:7.7.0'
hostname: kafka
container_name: kafka
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:19092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ports:
- '9092:9092'
- '19092:19092'
kafka-ui:
image: 'provectuslabs/kafka-ui:v0.7.2'
container_name: kafka-ui
ports:
- '8080:8080'
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
depends_on:
- kafka
networks:
default:
name: kafka-network
To start all services, run the command:
docker-compose up -d
After that, we will launch Kafka, ZooKeeper and UI for Apache Kafka.
UI for Apache Kafka will be available at http://localhost:8080/ through which you can: create topic
delete topic
view messages in topic
and more. A very convenient tool for working with Kafka.
Creating and deleting a topic
In this section we will try to create and delete topic
.
Creating and deleting a topic via CLI
To create topic
you need to run the commands below.
Enter the container with Kafka:
docker exec -it kafka /bin/bash
Creation topic
test
in Kafka:
kafka-topics --create --topic test --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
View all available topic
in Kafka:
kafka-topics --list --bootstrap-server kafka:9092
Removal topic
test
in Kafka:
kafka-topics --delete --topic test --bootstrap-server kafka:9092
Creating and Deleting Topics via Python
If you are more comfortable interacting with Kafka via Python, that's not a problem.
To work with Kafka we will need a library confluent-kafka. In the examples below I use version 2.5.0. The entire code and list of all dependencies is in my repository.
Likewise, these operations can be performed without connecting to a Kafka container, but through Python.
To create topic
via Kafka:
from confluent_kafka.admin import AdminClient, NewTopic
admin_client = AdminClient({'bootstrap.servers': 'localhost:19092'})
def example_create_topics(a: AdminClient = None, topics: list[str] = None) -> None:
"""
Функция для создания `topic` в Kafka
:param a: AdminClient с параметрами инициализации. Default `None`. :param topics: Список `topic` для создания. Default `None`. :return: Ничего не возвращает
"""
new_topics = [NewTopic(topic, num_partitions=1, replication_factor=1) for topic in topics]
try:
f.result() # The result itself is None
print("Topic {} created".format(topic))
except Exception as e:
print("Failed to create topic {}: {}".format(topic, e))
example_create_topics(
a=admin_client,
topics=['test'],
)
Important: The IDE may complain that the module NewTopic
does not exist, but it is. This is the official package. This applies to version 2.5.0.
To remove topic
:
from confluent_kafka.admin import AdminClient
admin_client = AdminClient({'bootstrap.servers': 'localhost:19092'})
def example_delete_topics(a: AdminClient = None, topics: list[str] = None) -> None:
"""
Функция для удаления `topic` в Kafka.
:param a: AdminClient с параметрами инициализации. Default `None`. :param topics: Список `topic` для удаления. Default `None`. :return: Ничего не возвращает.
"""
fs = a.delete_topics(topics, operation_timeout=30)
# Wait for operation to finish.
for topic, f in fs.items():
try:
f.result() # The result itself is None
print("Topic {} deleted".format(topic))
except Exception as e:
print("Failed to delete topic {}: {}".format(topic, e))
example_delete_topics(
a=admin_client,
topics=['test'],
)
More examples of using the library confluent_kafka
V official GitHub of the project.
Kafka CLI
The CLI is a popular option for interacting with Kafka. It is not on your device by default, so you need to download it with the following command:
wget https://archive.apache.org/dist/kafka/3.8.0/kafka_2.13-3.8.0.tgz
Then unpack:
tar -xzf kafka_2.13-3.8.0.tgz
After running these commands, we can use the CLI to interact with Kafka.
Important: All executable files are located in the folder bin
. Therefore, it is worth paying attention that all scripts will be executed from it.
To go to the folder bin
you need to run the command:
cd kafka_2.13-3.8.0/bin/
Writing to Kafka via CLI
To write to Kafka, run the command:
echo 'Hello, Kafka!' | sh kafka-console-producer.sh --broker-list localhost:19092 --topic test
Or like this:
echo 'Hello, Kafka!' | ./kafka-console-producer.sh --broker-list localhost:19092 --topic test
Important: I'm more used to calling the script with the command sh
but it is also possible through ./
.
You can still create producer
in interactive mode with the command:
sh kafka-console-producer.sh --broker-list localhost:19092 --topic test
After creating such producer
We have the opportunity to write all the messages we want.
After executing the command, we will have [>
и после чего мы сможем вводить сообщения для Kafka.
Для выхода из интерактивного режима несколько раз нажмите CTRL + C
.
Чтение из Kafka через CLI
Важно: topic
в Kafka можно читать “с конца” и “с начала“.
Чтобы начать читать с самого начала:
sh kafka-console-consumer.sh --bootstrap-server localhost:19092 --topic test --from-beginning
Чтобы начать читать с конца и получать только новые сообщения:
sh kafka-console-consumer.sh --bootstrap-server localhost:19092 --topic test
Kafka Python
Как было описано выше мы можем взаимодействовать с Kafka через Python. Поэтому сейчас рассмотрим также операции записи и чтения с использованием Python.
Запись в Kafka через Python
Я приведу пример той записи, которая может появиться в вашей Kafka – это информация о пользователе.
Запись будет содержать: uuid
, first_name
, last_name
, middle_name
.
Вы можете запустить код ниже и в topic
my_topic
начнут записываться значения.
import json
import time
from confluent_kafka import Producer
from faker import Faker
import uuid_utils as uuid
def generate_list_of_dict() -> dict[str, str]: fake = Faker(locale="ru_RU") return { 'uuid': str(uuid.uuid7()), 'first_name': fake.first_name(), 'last_name': fake.last_name(), 'middle_name': fake.middle_name(), } # Define the Kafka configuration conf = {'bootstrap.servers': "localhost:19092"} # Create a Producer instance with the above configuration producer = Producer(conf) while True: # Define some data to send to Kafka data = generate_list_of_dict() # Convert the data to a JSON string data_str = json.dumps(data) # Produce a message to the "my_topic" topic producer.produce(topic="my_topic", value=data_str) # Flush the producer to ensure all messages are sent producer.flush() # Sleep for a second before producing the next set of messages time.sleep(3)
Important: If topic
If it has not been created before, it will be created the first time it is written.
Reading from Kafka via Python
In order to read values from Kafka we need to create consumer
. The function below has the ability to read topic
from the very beginning and from a certain point offset
.
from confluent_kafka import Consumer, KafkaError, TopicPartition
def consume_messages(topic: str = None, offset: int = None) -> None:
conf = {
'bootstrap.servers': 'localhost:19092',
'group.id': 'mygroup',
'auto.offset.reset': 'earliest'
}
consumer = Consumer(conf)
if offset is not None:
partitions = consumer.list_topics(topic).topics[topic].partitions
for partition in partitions:
consumer.assign([TopicPartition(topic, partition, offset)])
else:
consumer.subscribe([topic])
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError:
print('Reached end of partition')
else:
print(f'Error: {msg.error()}')
else:
print(f'Received message: {msg.value().decode("utf-8")}')
except KeyboardInterrupt:
pass
finally:
consumer.close()
# Читать с начала
consume_messages('test')
# Читать с определенного offset
# consume_messages('test', offset=5)
We read earlier topic
in Kafka without using groups and therefore the attribute --from-beginning
triggered every time it was called (a new group was created each time).
But when creating consumer
via Python indication group.id
is mandatory and therefore we may face the following problem: if we read once topic
then when we restart the code we will start reading only new messages and even the attribute auto.offset.reset
won't help.
And all this is happening because we have produced commit
(fixation) offset
for the group.
To check what it is now offset
the group is located, you need to run the command in Kafka:
sh kafka-consumer-groups.sh --bootstrap-server localhost:19092 --group mygroup --describe
And we will see that we have read all the messages. Therefore offset
stands on the last message in topic
.
Actually, this is not a problem, because this offset
Can “reset“, to do this you need to run the command:
sh kafka-consumer-groups.sh --bootstrap-server localhost:19092 --group mygroup --to-earliest --reset-offsets --execute --topic test
You can also read topic
having changed again group.id
. But to do this not recommended.
Using Kafka in Data Engineering
Kafka is a frequent guest in data engineering because Kafka allows you to quickly and cheaply cover many business tasks, such as:
CDC
When implementing CDC, you may encounter Kafka because it is “standard” when working with this type of events.
If you want to understand what CDC is and what role Kafka plays there, you can read my article: CDC on Primitives.
Event-driven
Because Kafka allows us to receive changes “instantly“There are certain nuances in this definition, but that is a topic for another conversation.
If we return to the thought above, then receiving all the events “instantly“we can react to them.
For example: a customer visits our online store website and when entering a category or section, we can make him an offer or rebuild the page for him, depending on his preferences or previously set conditions.
Real-time Analytics
Kafka is also quite often used for real-time analytics. If we receive event messages constantly and “instantly“, then we can respond to them and monitor our metrics.
For example: marketing campaigns. We launch some campaign and immediately look at the indicators that are important to us. Depending on the values we receive, we can change the terms of the campaign, the placement conditions, etc.
Resume
Kafka is a popular tool, so finding literature, videos and usage examples is not a problem. In this article, I have shown only the tip of the iceberg, which can be studied and studied.
When it comes to interaction with Kafka, CLI and Python are not the only tools, you can add PySpark, ClickHouse, Java, etc.
By the way, how to read from Kafka using ClickHouse was described in my article: CDC on primitives.
For a more in-depth study of the tool, I recommend reading the book: Apache Kafka. Stream processing and data analysis” (authors – N. Narkhid, G. Shapira, T. Palino, year of publication – 2019). It describes many subtleties and pitfalls when working with Kafka. The second edition has already been published, I have not read it, but judging by the content; new points are taken into account, so I would recommend studying the second edition.
Well, and the most important thing – Theory without practice is dead, practice without theory is blind. So try Kafka, even on pet projects or within the framework of this article.
Also, if you need a consultation/mentoring/mock interview and other questions on data engineering, you can contact me. All contacts are listed by link.