Creating a telegram bot (Spring Boot, Kafka, PostgreSQL), part one

Ivanov Maxim

Junior Java programmer

Recipe for making your own “Telegram-Frankenstein”

Even a person of average ability, stubbornly studying one subject, will certainly achieve deep knowledge in it.  -
Even a person of average ability, stubbornly studying one subject, will certainly achieve deep knowledge in it. – “Frankenstein” by Mary Shelley

Hello everyone, this article is a kind of my first, but still I will try to tell you as simply as possible about how to create a bot by screwing all the whistles promised above.

The articles will be divided into 2 parts, the first part is the creation of the main bot with sending logs (Kafka Producer) and writing them to the database, the second part is the processing of all logs (Kafka Consumer).

Ingredients:

  1. Bot registration

  2. Creating a Spring Boot project, the easiest way to do this is through the built-in configurator in IntelliJ IDEAor using Spring Initializr. (as the build system will be used gradle)

  3. Kafka (to keep track of topics I use conductor)

  4. PostgreSQL (for comfortable work I use DBeaver)

If you have difficulty recreating the tutorial

Please write in the comments if you have any problems, just in case – here’s mine git

Let’s start with cutting:

First you need to set up build.grable with all dependencies

build grable
buildscript {
    repositories {
        mavenCentral()
    }
}

plugins {
    id 'org.springframework.boot' version '2.4.2'
    id 'io.spring.dependency-management' version '1.0.11.RELEASE'
    id 'java'
}

apply from: 'build-test.gradle'

group 'com.sercetary.bot'
sourceCompatibility = '14'

configurations {
    compileOnly {
        extendsFrom annotationProcessor
    }
}

repositories {
    mavenCentral()
}

configurations.all {
    exclude module: 'slf4j-log4j12'
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-web:2.5.6'
    implementation 'org.springframework.boot:spring-boot-starter-jdbc:2.5.6'
    implementation 'org.springframework.data:spring-data-commons:2.6.0'
    implementation 'org.springframework.kafka:spring-kafka:2.7.6'
    implementation 'org.postgresql:postgresql:42.3.1'
    implementation 'com.h2database:h2:1.4.200'

    implementation group: 'org.telegram', name: 'telegrambots-abilities', version: '5.3.0'
    implementation group: 'org.telegram', name: 'telegrambots', version: '5.3.0'

    compile group: 'org.slf4j', name: 'slf4j-log4j12', version: '1.7.29'
    compileOnly 'org.projectlombok:lombok:1.18.22'
    annotationProcessor 'org.projectlombok:lombok:1.18.22'
}

Next, for Kafka to work, we will describe application.yml, which contains the settings of our kafka producer

application.yml
server:
  port: 9000
spring:
  kafka:
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

Now application.properties settings

application.properties
# HTTP port for incoming requests
server.port=8081

app.http.bot=change-me
telegram-bot.name=change-me
telegram-bot.token=change-me

# Bot db
app.db.bot-db.url=jdbc:postgresql://localhost:5432/change-me
app.db.bot-db.driver=org.postgresql.Driver
app.db.bot-db.user=change-me
app.db.bot-db.password=change-me
app.db.bot-db.pool-size=10

# logging
logging.level.root=INFO
logging.level.org.springframework.web=DEBUG
logging.level.ru.centerinform.webhook=TRACE
logging.file.name=change-me

Okay, after setting up our project, let’s talk about its structure:

Project structure
Project structure

Packages:

  • config – description of beans and project configuration

  • controller – handles the user’s request

  • dto – stores data, and also describes the database table model

  • exceptions – custom error handler package

  • repository – logic for working with the database

  • service – the main business logic of the project

Now we collect the ingredients and marinate:

Bin settings:

– First of all, we write the configuration of the beans of our application in the config package, here are the TelegramBotsApi and ObjectMapper initialization settings

AppConfig
@Configuration
public class AppConfig {

    @Bean
    ObjectMapper customObjectMapper() {
        return new ObjectMapper();
    }

    @Bean
    TelegramBotsApi telegramBotsApi() throws TelegramApiException{
        return new TelegramBotsApi(DefaultBotSession.class);
    }
}

– Inside our DbConfig class, there is a SpringDataJdbcProperties class that describes the SpringDataJdbc settings

DbConfig
@Configuration
public class DbConfig extends DefaultDbConfig {

    @Bean
    @Qualifier("bot-db")
    @ConfigurationProperties(prefix = "app.db.bot-db")
    SpringDataJdbcProperties gitlabJdbcProperties() {
        return new SpringDataJdbcProperties();
    }

    @Bean
    @Qualifier("bot-db")
    public DataSource gitlabDataSource(@Qualifier("bot-db") SpringDataJdbcProperties properties) {
        return hikariDataSource("db", properties);
    }

    @Bean
    @Qualifier("bot-db")
    JdbcTemplate gitlabJdbcTemplate(@Qualifier("bot-db") DataSource dataSource) {
        return new JdbcTemplate(dataSource);
    }

    @Data
    @NoArgsConstructor
    public static class SpringDataJdbcProperties {

        // constants
        private static final String H2_DATABASE_DRIVER = "org.h2.Driver";

        /**
         * JDBC URL property
         */
        String url;
        /**
         * JDBC driver class name property
         */
        String driver;
        /**
         * JDBC username property
         */
        String user;
        /**
         * JDBC password property
         */
        String password;
        /**
         * Hikari / Vertica maxPoolSize property
         */
        String poolSize;
        /**
         * Minimum pool size
         */
        int minPoolSize = 4;
        /**
         * Maximum pool size
         */
        int maxPoolSize = 10;
        /**
         * This property controls the maximum amount of time (in milliseconds) that a connection is allowed to
         * sit idle in the pool. A value of 0 means that idle connections are never removed from the pool.
         */
        long idleTimeout;
        /**
         * This property controls the maximum lifetime of a connection in the pool. When a connection
         * reaches this timeout, even if recently used, it will be retired from the pool.
         * An in-use connection will never be retired, only when it is idle will it be removed
         */
        long maxLifetime;
        /**
         * Bulk insert size
         */
        Integer bulkSize;


        /**
         * All-args constructor for {@link SpringDataJdbcProperties#toString()} (logging)
         *
         * @param url JDBC driver class name property
         * @param driver JDBC driver class name property
         * @param user JDBC username property
         * @param password JDBC password property
         * @param poolSize Hikari / Vertica maxPoolSize property
         * @param bulkSize bulk insert size
         */
        public SpringDataJdbcProperties(
                String url, String driver, String user, String password, String poolSize, Integer bulkSize) {
            this.url = url;
            this.driver = driver;
            this.user = user;
            this.password = password;
            this.poolSize = poolSize;
            this.bulkSize = bulkSize;
        }


        /**
         * Возвращает истину, если экземпляр описывает in-memory H2 database
         *
         * @return истина, если экземпляр описывает in-memory H2 database
         */
        public boolean isH2Database() {
            return driver.equals(H2_DATABASE_DRIVER);
        }

        /**
         * Возвращает строковое представление экземпляра объекта в формате JSON
         *
         * @return строковое представление экземпляра объекта в формате JSON
         */
        @Override
        public String toString() {
            var props = new SpringDataJdbcProperties(
                    url, driver, user, ((password == null) || password.isEmpty()) ? "" : "*****", poolSize, bulkSize);
            return Json.encode(props);
        }

    }

}

– Create a base class to reduce duplication of bean initialization code

DefaultDbConfig
@Slf4j
class DefaultDbConfig {

    protected DataSource hikariDataSource(String tag, DbConfig.SpringDataJdbcProperties properties) {
        log.info("[{}] настройки БД: [{}]", tag, properties.toString());

        HikariDataSource ds = new HikariDataSource();
        ds.setJdbcUrl(properties.getUrl());
        ds.setDriverClassName(properties.getDriver());
        ds.setUsername(properties.getUser());
        ds.setPassword(properties.getPassword());
        ds.setMaximumPoolSize(Integer.parseInt(properties.getPoolSize()));
        return ds;
    }
}

– After we write a utility class for logging

json
public class Json {
    static final ObjectMapper mapper = new ObjectMapper();

    /**
     * Encode instance as JSON
     *
     * @param obj instance
     * @return JSON
     */
    public static String encode(Object obj) {
        try {
            return mapper.writeValueAsString(obj);
        } catch (JsonProcessingException e) {
            return obj.toString();
        }
    }

    public static <T> T decode(String json, Class<T> clazz) throws JsonProcessingException {
        return mapper.readValue(json, clazz);
    }

Next, we will write a controller to access the service from outside

– Create a simple controller to get a list of records from the database

UsersController
@Slf4j
@RestController
@RequestMapping("${app.http.bot")
@RequiredArgsConstructor
@SuppressWarnings("unused")
public class UsersController {

    private final UserService userService;

    /**
     * Возвращает список пользователей и связанных с ними планами
     */
    @RequestMapping(path = "/users_idea", method = RequestMethod.GET)
    public List<User> getIdeaList() {
        log.debug("Method - getIdeaList was called");
        return userService.getUserList();
    }
}

Then we move on to creating the model.

– We create a user model User, as well as its UserMapper, which is needed to work with the database and map fields in the table

user
@Data
@RequiredArgsConstructor
public class User {
    /**
     * user's id
     */
    @JsonProperty("id")
    private final int id;
    /**
     * user's name
     */
    @JsonProperty("name")
    private final String name;
    /**
     * description
     */
    @JsonProperty("description")
    private final String description;

    private String startWord = "";

    @Override
    public String toString() { return startWord + description; }
}
UserMapper
@Slf4j
public class UserMapper implements RowMapper<User> {

    @Override
    public User mapRow(ResultSet rs, int rowNum) throws SQLException {
        var entity = new User(
                rs.getInt("id"),
                rs.getString("user_name"),
                rs.getString("description")
                );
        log.trace("mapRow(): entity = [{}]", entity);
        return entity;
    }
}

Let’s move on to creating custom exceptions

What are they needed for

We use them to handle errors that may occur during the operation of the application so that the bot does not break down and continues its work.

– BaseException – a class that inherits from RuntimeException, in the constructor takes 2 parameters – the message and the body of the error

BaseException
@Slf4j
public class BaseException extends RuntimeException{

    public BaseException(String msg, Throwable t) {
        super(msg, t);
        log.error(msg, t);
    }

    public BaseException(String msg) {
        super(msg);
        log.error(msg);
    }

}

– NotFoundException – the class that is called when the answer is not found, inherited from BaseException

NotFoundException
@ResponseStatus(HttpStatus.NOT_FOUND)
public class NotFoundException extends BaseException {

    private final static String MESSAGE = "Not Found";

    public NotFoundException(Throwable t) {
        super(MESSAGE, t);
    }

    public NotFoundException() {
        super(MESSAGE);
    }
}

– DbException – a class that handles errors related to the database, inherited from RuntimeException

DbException
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
public class DbException extends RuntimeException {

    private static final String MESSAGE = "Ошибка БД";

    public DbException(String message) {
        super(message);
    }

    public DbException(Throwable cause) {
        super(MESSAGE, cause);
    }
}

Now to work with the database, create a repository

– Let’s create an interface that describes methods for working with records in the database

IUserRepository
public interface IUserRepository {

    /**
     * Возвращает список записей по id
     *
     * @return запрашиваемая запись
     * @throws DbException в случае ошибки БД
     */
    User getById(int id);

    /**
     * Возвращает список записей
     *
     * @return список всех записей
     * @throws DbException в случае ошибки БД
     */
    List<User> getUserList();

    /**
     * Вставка новой записи
     *
     * @param entity новая запись
     * @throws DbException в случае ошибки БД
     */
    void insert(User entity);

    /**
     * Удаление записи
     *
     * @param entity удаляемая запись
     * @throws DbException в случае ошибки БД
     */
    void delete(User entity);
}

– Now let’s write a class that implements interface methods

UserRepository
@Slf4j
@Repository
public class UserRepository implements IUserRepository {

    // constants
    private static final String SQL_SELECT_BY_NAME = "" +
            "SELECT id, user_name, description FROM user_table WHERE id=?";
    private static final String SQL_SELECT_LIST = "" +
            "SELECT id, user_name, description FROM user_table";
    private static final String SQL_INSERT = "" +
            "INSERT INTO user_table (user_name, description) VALUES (?, ?)";
    private static final String SQL_DELETE = "" +
            "DELETE FROM user_table WHERE id = ?";

    protected final static UserMapper USER_MAPPER = new UserMapper();

    // beans
    protected final JdbcTemplate template;


    /**
     * Req-args constructor for Spring DI
     */
    public UserRepository(@Qualifier("bot-db") JdbcTemplate template) {
        this.template = template;
    }

    /**
     * Возвращает список записей по id
     *
     * @return запрашиваемая запись
     * @throws DbException в случае ошибки БД
     */
    @Override
    public User getById(int id) throws DbException {
        try {
            return DataAccessUtils.singleResult(
                    template.query(SQL_SELECT_BY_NAME, USER_MAPPER, id));
        } catch (DataAccessException exception) {
            throw new DbException(exception);
        }
    }

    /**
     * Возвращает список записей
     *
     * @return запрашиваемая запись
     * @throws DbException в случае ошибки БД
     */
    @Override
    public List<User> getUserList() throws DbException {
        try {
            return template.query(SQL_SELECT_LIST, USER_MAPPER);
        } catch (DataAccessException exception) {
            throw new DbException(exception);
        }
    }

    /**
     * Вставка новой записи
     *
     * @param entity новая запись
     * @throws DbException в случае ошибки БД
     */
    @Override
    public void insert(User entity) throws DbException {
        try {
            // В параметры запроса все поля сущности кроме идентификатора, т.к. он serial и генерируется автоматом
            var result = template.update(SQL_INSERT,
                    entity.getName(),
                    entity.getDescription());
            if (result != 1) log.trace("UserRepository.update() with {} rows inserted", entity);
            log.info("insert({}) result={}", entity, result);
        } catch (DataAccessException exception) {
            throw new DbException(exception);
        }
    }

    /**
     * Удаление записи
     *
     * @param entity удаляемая запись
     * @throws DbException в случае ошибки БД
     */
    @Override
    public void delete(User entity) throws DbException {
        try {
            var result = template.update(SQL_DELETE, entity.getId());
            if (result != 1) log.trace("UserRepository.delete() with {} rows inserted", entity);
            log.info("delete({}) result={}", entity, result);
        } catch (DataAccessException exception) {
            throw new DbException(exception);
        }
    }
}

– Next, we have the logic of the bot, everything is trivial here, in the inherited onUpdateReceived method from the parent class TelegramLongPollingBot we write the behavior that happens when the chat with the user is updated, more about this herealso in the message processing method there is a call to our producer and writing data to the database

TelegramBot
@Slf4j
@Getter
@Component
public class TelegramBot extends TelegramLongPollingBot {

    private Message requestMessage = new Message();
    private final SendMessage response = new SendMessage();
    private final Producer producerService;
    private final UserService userService;

    private final String botUsername;
    private final String botToken;

    public TelegramBot(
            TelegramBotsApi telegramBotsApi,
            @Value("${telegram-bot.name}") String botUsername,
            @Value("${telegram-bot.token}") String botToken,
            Producer producerService, UserService userService) throws TelegramApiException {
        this.botUsername = botUsername;
        this.botToken = botToken;
        this.producerService = producerService;
        this.userService = userService;

        telegramBotsApi.registerBot(this);
    }

    /**
     * Этот метод вызывается при получении обновлений через метод GetUpdates.
     *
     * @param request Получено обновление
     */
    @SneakyThrows
    @Override
    public void onUpdateReceived(Update request) {
        requestMessage = request.getMessage();
        response.setChatId(requestMessage.getChatId().toString());

        var entity = new User(
                0, requestMessage.getChat().getUserName(),
                requestMessage.getText());

        if (request.hasMessage() && requestMessage.hasText())
            log.info("Working onUpdateReceived, request text[{}]", request.getMessage().getText());

        if (requestMessage.getText().equals("/start"))
            defaultMsg(response, "Напишите команду для показа списка мыслей: \n " + "/idea - показать мысли");
        else if (requestMessage.getText().equals("/idea"))
            onIdea(response);
        else
            defaultMsg(response, "Я записал вашу мысль :) \n ");

        log.info("Working, text[{}]", requestMessage.getText());

        if (requestMessage.getText().startsWith("/")) {
            entity.setStartWord("команда: ");
            producerService.sendMessage( entity);
        } else {
            entity.setStartWord("мысль: ");
            producerService.sendMessage( entity);
            userService.insert(entity);
        }
    }

    /**
     * Метод отправки сообщения со списком мыслей - по команде "/idea"
     *
     * @param response - метод обработки сообщения
     */
    private void onIdea(SendMessage response) throws TelegramApiException {
        if (userService.getUserList().isEmpty()) {
            defaultMsg(response, "В списке нет мыслей. \n");
        } else {
            defaultMsg(response, "Вот список ваших мыслей: \n");
            for (User txt : userService.getUserList()) {
                response.setText(txt.toString());
                execute(response);
            }
        }
    }

    /**
     * Шабонный метод отправки сообщения пользователю
     *
     * @param response - метод обработки сообщения
     * @param msg - сообщение
     */
    private void defaultMsg(SendMessage response, String msg) throws TelegramApiException {
        response.setText(msg);
        execute(response);
    }
}
Code snippet with sending to Kafka and writing to the database
        if (requestMessage.getText().startsWith("/")) {
            entity.setStartWord("команда: ");
            producerService.sendMessage( entity);
        } else {
            entity.setStartWord("мысль: ");
            producerService.sendMessage( entity);
            userService.insert(entity);
        }

Let’s move on to creating the business logic of the application

– BaseService – implements the basic methods of project services

Base Service
public class BaseService {

    /**
     * Обёртка результата
     *
     * @param result результат
     * @return результат
     * @throws NotFoundException если результат null
     */
    public <T> T wrapResult(T result) {
        if(result == null)
            throw new NotFoundException();
        return result;
    }

    /**
     * Обёртка результата
     *
     * @param result результат
     * @return результат
     * @throws NotFoundException если результат null или пустой
     */
    public <T> List<T> wrapResults(List<T> result) {
        if(result == null || result.size() == 0)
            throw new NotFoundException();
        return result;
    }

}

– The UserService class works with our IUserRepository repository and contains the business logic for working with event records in the database

User Service
@Service
@Slf4j
@RequiredArgsConstructor
public class UserService extends BaseService {

    //beans
    protected final IUserRepository repo;

    /**
     * Возвращает список записей
     *
     * @return список записей
     * @throws DbException в случае ошибки БД
     */
    public List<User> getUserList() {
        log.trace("#### getUserList() - working");
        return wrapResults(repo.getUserList());
    }

    /**
     * Возвращает список записей по id
     *
     * @throws DbException в случае ошибки БД
     */
    public User getById(int id) {
        log.trace("#### getById() [id={}]", id);
        return wrapResult(repo.getById(id));
    }

    /**
     * Вставка новой записи
     *
     * @param entity новая запись
     * @throws DbException в случае ошибки БД
     */
    public void insert(User entity) {
        log.trace("#### insert() [entity={}]", entity);
        repo.insert(entity);
    }

    /**
     * Удаление записи
     *
     * @param entity удаляемая запись
     * @throws DbException в случае ошибки БД
     */
    public void delete(User entity) {
        log.trace("#### delete() [entity={}]", entity);
        repo.delete(entity);
    }

}

– The Producer class, just the class that sends messages to the users topic, and here we can change the format of the message itself and the data that it sends

Producer
@Service
@Slf4j
public class Producer {

    private static final String TOPIC = "users";
    protected final IUserRepository repo;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public Producer(IUserRepository repo) {
        this.repo = repo;
    }

    public void sendMessage(User user) {
        if (user.getName() == null || user.getDescription().isEmpty()) log.info("#### Empty name/description message");
        log.info("#### Producing message [user={}]", user);
        kafkaTemplate.send(TOPIC, "Writing in log -> " + user);
    }
}

At the end of the class, which actually launches our entire application

webhookapp
@Slf4j
@SpringBootApplication
public class WebHookApp {
    public static void main(String[] args) {
        SpringApplication.run(WebHookApp.class, args);
    }
}

Now we have marinated all the ingredients and prepared the dish for baking:

– First check if Kafka is running

start by command - sudo su systemctl start kafka
start by command – sudo su systemctl start kafka

– After that, we launch Conductor and see that we have a message broker running, after launching our application, a users topic will appear here, to which messages sent by our producer will fly

Launched Broker
Launched Broker

– Next, run DBeaver and create 2 tables (log and user_table), here is the scheme for creating tables:

CREATE TABLE public.log (
	id serial4 NOT NULL,
	message varchar(500) NOT NULL,
	date_time date NOT NULL,
	topic varchar(100) NOT NULL,
	CONSTRAINT log_pkey PRIMARY KEY (id)
);
CREATE TABLE public.user_table (
	id serial4 NOT NULL,
	user_name varchar(100) NOT NULL,
	description varchar(500) NULL,
	CONSTRAINT user_table_pkey PRIMARY KEY (id)
);
DB schema public
DB schema public
This is what the log table looks like
This is what the log table looks like
This is what the user_table looks like
This is what the user_table looks like

Great, the dish is baked and ready to serve:

– We start the project, check that everything is configured and works correctly

Spring logs
Spring logs

– Open telegrams and taste our “Frankenstein”

Chatting with a Telegram bot
Chatting with a Telegram bot

– Let’s see what Spring wrote to us in the logs and whether the data was written to Kafka and the database?

Logs of our bot, no errors observed
  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.4.2)
2022-01-15 16:46:19.248  INFO 412498 --- [           main] com.secretary.bot.WebHookApp             : The following profiles are active: bot
2022-01-15 16:46:19.291  WARN 412498 --- [kground-preinit] o.s.h.c.j.Jackson2ObjectMapperBuilder    : For Jackson Kotlin classes support please add "com.fasterxml.jackson.module:jackson-module-kotlin" to the classpath
2022-01-15 16:46:19.882  INFO 412498 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 8081 (http)
2022-01-15 16:46:19.887  INFO 412498 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2022-01-15 16:46:19.887  INFO 412498 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.41]
2022-01-15 16:46:19.956  INFO 412498 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2022-01-15 16:46:19.957  INFO 412498 --- [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 678 ms
2022-01-15 16:46:20.013  INFO 412498 --- [           main] c.secretary.bot.config.DefaultDbConfig   : [db] настройки БД: [{"url":"jdbc:postgresql://localhost:5432/postgres","driver":"org.postgresql.Driver","user":"*****","password":"*****","poolSize":"10","minPoolSize":4,"maxPoolSize":10,"idleTimeout":0,"maxLifetime":0,"bulkSize":null,"h2Database":false}]
2022-01-15 16:46:20.565  INFO 412498 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2022-01-15 16:46:20.574 DEBUG 412498 --- [           main] s.w.s.m.m.a.RequestMappingHandlerAdapter : ControllerAdvice beans: 0 @ModelAttribute, 0 @InitBinder, 1 RequestBodyAdvice, 1 ResponseBodyAdvice
2022-01-15 16:46:20.598 DEBUG 412498 --- [           main] s.w.s.m.m.a.RequestMappingHandlerMapping : 3 mappings in 'requestMappingHandlerMapping'
2022-01-15 16:46:20.619 DEBUG 412498 --- [           main] o.s.w.s.handler.SimpleUrlHandlerMapping  : Patterns [/webjars/**, /**] in 'resourceHandlerMapping'
2022-01-15 16:46:20.627 DEBUG 412498 --- [           main] .m.m.a.ExceptionHandlerExceptionResolver : ControllerAdvice beans: 0 @ExceptionHandler, 1 ResponseBodyAdvice
2022-01-15 16:46:20.702  INFO 412498 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8081 (http) with context path ''
2022-01-15 16:46:20.709  INFO 412498 --- [           main] com.secretary.bot.WebHookApp             : Started WebHookApp in 1.65 seconds (JVM running for 1.962)
SSS2022-01-15 16:52:33.916  INFO 412498 --- [legram Executor] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
	acks = 1
	batch.size = 16384
	bootstrap.servers = [localhost:9092]
	buffer.memory = 33554432
	client.dns.lookup = use_all_dns_ips
	client.id = producer-1
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 120000
	enable.idempotence = false
	interceptor.classes = []
	internal.auto.downgrade.txn.commit = true
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metadata.max.idle.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2022-01-15 16:52:33.947  INFO 412498 --- [legram Executor] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.6.0
2022-01-15 16:52:33.948  INFO 412498 --- [legram Executor] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 62abe01bee039651
2022-01-15 16:52:33.948  INFO 412498 --- [legram Executor] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1642254753947
2022-01-15 16:52:34.056  INFO 412498 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: faKjxP6CTvGFeeVKJw
2022-01-15 16:54:01.115  INFO 412498 --- [legram Executor] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting...
2022-01-15 16:54:01.188  INFO 412498 --- [legram Executor] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed.

– As we can see, the messages sent to the Bot appeared in the database

Records in the database
Records in the database

– Having opened the conductor, go to the topics tab, then click on our users topic

topics tab
topics tab

– Next, in the tab of our topic, click on the CONSUME DATA button

Topic information users
Topic information users

– In the window that opens, set the same settings (the most important of them is Start From – indicates from what moment to show messages in Kafka, our setting – shows all messages, including those sent earlier)

Message view settings
Message view settings

– That’s all, now we have made sure that the messages arrived safely in Kafka, registered in the database and did not cause errors in the application

Messages arriving in Kafka
Messages arriving in Kafka

Well, thank you all so much for taking the time to read this article, I look forward to seeing you in second part See this tutorial where we use Consumer Kafka to handle incoming messages.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *