Apache Kafka – Producer and Consumer. Simple example of Nodejs application

Hello! Continuing the topic of studying microservices, I decided to understand the interaction of these very “services” and write a simple example of interaction between two services.

Before reading this article, I strongly recommend that you read this article on the topic of kafka (Kafka in 20 minutes. Mental model and how to work with it)

An example of implementation can be found find here…

So, the example is this:

In the user service there is a method for registering these very users, where after registration, it is necessary to create a profile for the user.

Of course, in the example presented, there is no logic of “answering questions” like:

The example is aimed solely at demonstrating the operation of communication between two services using Apache Kafka.

Implementation

So, in this example user-service acts as producer — that is, the sender of events, and profile-service – acts as consumer – that is, listening to incoming events.

Let's create 2 absolutely identical services with the following files

  1. Let's create a directory microservices — where we will place our services

  2. Let's create files for the profile service:
    They should be placed in a separate directory. profile

    profile.service.js
    import Fastify from 'fastify'
    
    const fastify = Fastify({
        logger: true,
    })
    
    fastify.listen({ port: 3001, host: "0.0.0.0" }, (err, address) => {
        if (err) throw err
        console.log("Profile service: Start Up!")
    })
    Dockerfile
    FROM node:22
    
    WORKDIR /profile-microservice
    
    COPY package.json .
    COPY yarn.lock .
    
    RUN yarn install
    
    COPY . .
    
    EXPOSE 3001
    
    CMD ["node", "profile.service.js"]
    package.json
    {
      "name": "microservice-kafka-learn_profile",
      "version": "1.0.0",
      "license": "MIT",
      "type": "module",
      "dependencies": {
        "@fastify/kafka": "^3.0.0",
        "fastify": "^5.0.0"
      }
    }
  3. Then follow the same principle as for the service profilelet's create a directory and file structure for the user service

Next, at the root of our project we will create docker-compose in which we will raise our previously created services, and also, we will immediately place ours there Apache Kafka

version: "3.8"

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka_broker:
    image: confluentinc/cp-kafka:latest
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  user_microservice:
    build: "./microservices/user"
    ports:
      - "3000:3000"
    depends_on:
      - kafka_broker

  profile_microservice:
    build: "./microservices/profile"
    ports:
      - "3001:3001"
    depends_on:
      - kafka_broker

At this stage, the launch docker-compose should lead you to the fact that your services user, profileand also kafka And zookeeper should work correctly.

Scenario: “User registered successfully”

Next step, let's add to our user.service.js file connection to Kafka

// ...

const fastify = Fastify({
    logger: true,
})

fastify.register(kafka, {
    producer: {
        'metadata.broker.list': 'kafka:9092',
        'fetch.wait.max.ms': 10,
        'fetch.error.backoff.ms': 50,
        'client.id': 'user-service',
        'dr_cb': true
    },
});

// ...

We have successfully connected to our kafkanow let's add the “registration” method for users

fastify.post('/user/register', async (request, reply) => {
    // .....логика регистрации пользователя.....
    // Считаем что тут у нас логика создания пользователя, которая прошла успешно
    // ...

    /*
    * Отправьте событие для создания профиля для успешно зарегистрированного пользователя.
    * */    
    fastify.kafka.push({
        topic: "user_register",
        payload: JSON.stringify({
            id: Date.now(),
            email: "user@example.com",
            username: "imbatman"
        }),
        key: 'user_register_key'
    })

    reply.send({
        message: "User successfully created!"
    })
})

What's going on here?

  1. In this code, we consider that we have successfully registered the user and proceed to sending the message to kafka

  2. Create a name topic'A, it is advisable to put these topic names into constants for their further use, for example, in other services, as we will see in the example below

  3. We are forming payload which we want to convey to the potential recipient of our event

  4. We form the key of our message

  5. We're shipping!

  6. Great, your data has reached the broker!

Scenario: “Creating a user profile”

Let's add a listener to our event to create a profile in the service profile.service.js

We are the same as in the service useryou need to connect to our Kafka in service profile

import crypto from "node:crypto"

const groupId = crypto.randomBytes(20).toString('hex')

fastify.register(kafka, {
    consumer: {
        'metadata.broker.list': 'kafka:9092',
        'fetch.wait.max.ms': 10,
        'fetch.error.backoff.ms': 50,
        'topic.metadata.refresh.interval.ms': 1000,
        'group.id': groupId,
    },
})

Before connecting, we need to generate a group identifier, it serves to identify a group of consumers (consumer). All consumers with the same group.id form one group and jointly consume messages from topics, sharing partitions among themselves. (If you read the article I recommended above, you understand what I'm talking about).

Next, we need to subscribe to our user registration event, which is – “user_register”

Let's add the following code:

fastify.register(kafka, {
    consumer: {
        'metadata.broker.list': 'kafka:9092',
        'fetch.wait.max.ms': 10,
        'fetch.error.backoff.ms': 50,
        'topic.metadata.refresh.interval.ms': 1000,
        'group.id': groupId,
    },
}).after((err) => {
    if (err) throw err

    fastify.kafka
        .subscribe(["user_register"])
        .on("user_register", (msg, commit) => {
            const registeredUser = JSON.parse(msg.value.toString());

            console.log(registeredUser) // Тут наш зарегистрированный юзверь
            commit()
        })

    fastify.kafka.consume()
})

Let's figure out what's going on here?

  1. To begin with, we subscribe to all the events required for this service in our service. .subscribe(["user_register"]). In this case, this event is “user_register”

    Again, I repeat that topic keys must be stored outside of all services for their general access. In this case, it is done for the sake of simplicity of the example.

  2. Next, we set up the event handler
    .on("user_register", (msg, commit) => {

  3. In the body of this event we receive the result of the event we sent to the service userthat is:

    {
      id: "1726957487909",
      email: "user@example.com",
      username: "imbatman"
    }
  1. We just have to process it in our profile service this event, and create a user profile based on the provided data

  2. Next, by calling the commit function provided in the argument, we confirm the receipt and processing of the message. Kafkasignaling to the broker that this message can be marked as “read” in the current consumer group, preventing it from being received again.

fastify.kafka.consume() in turn, activates the process of consuming messages from subscribed topics, in this case “user_register”. After calling this method, Fastify will start processing incoming messages and passing them to the appropriate event handlers, such as on("user_register", ...)

Conclusion

I hope that with such a simple example, I, first of all, as a person who has just become acquainted with this technology, and you, as readers of this article, managed to grasp the first simple steps and understand the interaction of entities in the form of services. (as in this example) among themselves!

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *