Creating a telegram bot (Spring Boot, Kafka, PostgreSQL), part one

Ivanov Maxim
Junior Java programmer
Recipe for making your own “Telegram-Frankenstein”

Hello everyone, this article is a kind of my first, but still I will try to tell you as simply as possible about how to create a bot by screwing all the whistles promised above.
The articles will be divided into 2 parts, the first part is the creation of the main bot with sending logs (Kafka Producer) and writing them to the database, the second part is the processing of all logs (Kafka Consumer).
Ingredients:
Creating a Spring Boot project, the easiest way to do this is through the built-in configurator in IntelliJ IDEAor using Spring Initializr. (as the build system will be used gradle)
Kafka (to keep track of topics I use conductor)
PostgreSQL (for comfortable work I use DBeaver)
If you have difficulty recreating the tutorial
Please write in the comments if you have any problems, just in case – here’s mine git
Let’s start with cutting:
First you need to set up build.grable with all dependencies
build grable
buildscript {
repositories {
mavenCentral()
}
}
plugins {
id 'org.springframework.boot' version '2.4.2'
id 'io.spring.dependency-management' version '1.0.11.RELEASE'
id 'java'
}
apply from: 'build-test.gradle'
group 'com.sercetary.bot'
sourceCompatibility = '14'
configurations {
compileOnly {
extendsFrom annotationProcessor
}
}
repositories {
mavenCentral()
}
configurations.all {
exclude module: 'slf4j-log4j12'
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web:2.5.6'
implementation 'org.springframework.boot:spring-boot-starter-jdbc:2.5.6'
implementation 'org.springframework.data:spring-data-commons:2.6.0'
implementation 'org.springframework.kafka:spring-kafka:2.7.6'
implementation 'org.postgresql:postgresql:42.3.1'
implementation 'com.h2database:h2:1.4.200'
implementation group: 'org.telegram', name: 'telegrambots-abilities', version: '5.3.0'
implementation group: 'org.telegram', name: 'telegrambots', version: '5.3.0'
compile group: 'org.slf4j', name: 'slf4j-log4j12', version: '1.7.29'
compileOnly 'org.projectlombok:lombok:1.18.22'
annotationProcessor 'org.projectlombok:lombok:1.18.22'
}
Next, for Kafka to work, we will describe application.yml, which contains the settings of our kafka producer
application.yml
server:
port: 9000
spring:
kafka:
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
Now application.properties settings
application.properties
# HTTP port for incoming requests
server.port=8081
app.http.bot=change-me
telegram-bot.name=change-me
telegram-bot.token=change-me
# Bot db
app.db.bot-db.url=jdbc:postgresql://localhost:5432/change-me
app.db.bot-db.driver=org.postgresql.Driver
app.db.bot-db.user=change-me
app.db.bot-db.password=change-me
app.db.bot-db.pool-size=10
# logging
logging.level.root=INFO
logging.level.org.springframework.web=DEBUG
logging.level.ru.centerinform.webhook=TRACE
logging.file.name=change-me
Okay, after setting up our project, let’s talk about its structure:

Packages:
config – description of beans and project configuration
controller – handles the user’s request
dto – stores data, and also describes the database table model
exceptions – custom error handler package
repository – logic for working with the database
service – the main business logic of the project
Now we collect the ingredients and marinate:
Bin settings:
– First of all, we write the configuration of the beans of our application in the config package, here are the TelegramBotsApi and ObjectMapper initialization settings
AppConfig
@Configuration
public class AppConfig {
@Bean
ObjectMapper customObjectMapper() {
return new ObjectMapper();
}
@Bean
TelegramBotsApi telegramBotsApi() throws TelegramApiException{
return new TelegramBotsApi(DefaultBotSession.class);
}
}
– Inside our DbConfig class, there is a SpringDataJdbcProperties class that describes the SpringDataJdbc settings
DbConfig
@Configuration
public class DbConfig extends DefaultDbConfig {
@Bean
@Qualifier("bot-db")
@ConfigurationProperties(prefix = "app.db.bot-db")
SpringDataJdbcProperties gitlabJdbcProperties() {
return new SpringDataJdbcProperties();
}
@Bean
@Qualifier("bot-db")
public DataSource gitlabDataSource(@Qualifier("bot-db") SpringDataJdbcProperties properties) {
return hikariDataSource("db", properties);
}
@Bean
@Qualifier("bot-db")
JdbcTemplate gitlabJdbcTemplate(@Qualifier("bot-db") DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
@Data
@NoArgsConstructor
public static class SpringDataJdbcProperties {
// constants
private static final String H2_DATABASE_DRIVER = "org.h2.Driver";
/**
* JDBC URL property
*/
String url;
/**
* JDBC driver class name property
*/
String driver;
/**
* JDBC username property
*/
String user;
/**
* JDBC password property
*/
String password;
/**
* Hikari / Vertica maxPoolSize property
*/
String poolSize;
/**
* Minimum pool size
*/
int minPoolSize = 4;
/**
* Maximum pool size
*/
int maxPoolSize = 10;
/**
* This property controls the maximum amount of time (in milliseconds) that a connection is allowed to
* sit idle in the pool. A value of 0 means that idle connections are never removed from the pool.
*/
long idleTimeout;
/**
* This property controls the maximum lifetime of a connection in the pool. When a connection
* reaches this timeout, even if recently used, it will be retired from the pool.
* An in-use connection will never be retired, only when it is idle will it be removed
*/
long maxLifetime;
/**
* Bulk insert size
*/
Integer bulkSize;
/**
* All-args constructor for {@link SpringDataJdbcProperties#toString()} (logging)
*
* @param url JDBC driver class name property
* @param driver JDBC driver class name property
* @param user JDBC username property
* @param password JDBC password property
* @param poolSize Hikari / Vertica maxPoolSize property
* @param bulkSize bulk insert size
*/
public SpringDataJdbcProperties(
String url, String driver, String user, String password, String poolSize, Integer bulkSize) {
this.url = url;
this.driver = driver;
this.user = user;
this.password = password;
this.poolSize = poolSize;
this.bulkSize = bulkSize;
}
/**
* Возвращает истину, если экземпляр описывает in-memory H2 database
*
* @return истина, если экземпляр описывает in-memory H2 database
*/
public boolean isH2Database() {
return driver.equals(H2_DATABASE_DRIVER);
}
/**
* Возвращает строковое представление экземпляра объекта в формате JSON
*
* @return строковое представление экземпляра объекта в формате JSON
*/
@Override
public String toString() {
var props = new SpringDataJdbcProperties(
url, driver, user, ((password == null) || password.isEmpty()) ? "" : "*****", poolSize, bulkSize);
return Json.encode(props);
}
}
}
– Create a base class to reduce duplication of bean initialization code
DefaultDbConfig
@Slf4j
class DefaultDbConfig {
protected DataSource hikariDataSource(String tag, DbConfig.SpringDataJdbcProperties properties) {
log.info("[{}] настройки БД: [{}]", tag, properties.toString());
HikariDataSource ds = new HikariDataSource();
ds.setJdbcUrl(properties.getUrl());
ds.setDriverClassName(properties.getDriver());
ds.setUsername(properties.getUser());
ds.setPassword(properties.getPassword());
ds.setMaximumPoolSize(Integer.parseInt(properties.getPoolSize()));
return ds;
}
}
– After we write a utility class for logging
json
public class Json {
static final ObjectMapper mapper = new ObjectMapper();
/**
* Encode instance as JSON
*
* @param obj instance
* @return JSON
*/
public static String encode(Object obj) {
try {
return mapper.writeValueAsString(obj);
} catch (JsonProcessingException e) {
return obj.toString();
}
}
public static <T> T decode(String json, Class<T> clazz) throws JsonProcessingException {
return mapper.readValue(json, clazz);
}
Next, we will write a controller to access the service from outside
– Create a simple controller to get a list of records from the database
UsersController
@Slf4j
@RestController
@RequestMapping("${app.http.bot")
@RequiredArgsConstructor
@SuppressWarnings("unused")
public class UsersController {
private final UserService userService;
/**
* Возвращает список пользователей и связанных с ними планами
*/
@RequestMapping(path = "/users_idea", method = RequestMethod.GET)
public List<User> getIdeaList() {
log.debug("Method - getIdeaList was called");
return userService.getUserList();
}
}
Then we move on to creating the model.
– We create a user model User, as well as its UserMapper, which is needed to work with the database and map fields in the table
user
@Data
@RequiredArgsConstructor
public class User {
/**
* user's id
*/
@JsonProperty("id")
private final int id;
/**
* user's name
*/
@JsonProperty("name")
private final String name;
/**
* description
*/
@JsonProperty("description")
private final String description;
private String startWord = "";
@Override
public String toString() { return startWord + description; }
}
UserMapper
@Slf4j
public class UserMapper implements RowMapper<User> {
@Override
public User mapRow(ResultSet rs, int rowNum) throws SQLException {
var entity = new User(
rs.getInt("id"),
rs.getString("user_name"),
rs.getString("description")
);
log.trace("mapRow(): entity = [{}]", entity);
return entity;
}
}
Let’s move on to creating custom exceptions
What are they needed for
We use them to handle errors that may occur during the operation of the application so that the bot does not break down and continues its work.
– BaseException – a class that inherits from RuntimeException, in the constructor takes 2 parameters – the message and the body of the error
BaseException
@Slf4j
public class BaseException extends RuntimeException{
public BaseException(String msg, Throwable t) {
super(msg, t);
log.error(msg, t);
}
public BaseException(String msg) {
super(msg);
log.error(msg);
}
}
– NotFoundException – the class that is called when the answer is not found, inherited from BaseException
NotFoundException
@ResponseStatus(HttpStatus.NOT_FOUND)
public class NotFoundException extends BaseException {
private final static String MESSAGE = "Not Found";
public NotFoundException(Throwable t) {
super(MESSAGE, t);
}
public NotFoundException() {
super(MESSAGE);
}
}
– DbException – a class that handles errors related to the database, inherited from RuntimeException
DbException
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
public class DbException extends RuntimeException {
private static final String MESSAGE = "Ошибка БД";
public DbException(String message) {
super(message);
}
public DbException(Throwable cause) {
super(MESSAGE, cause);
}
}
Now to work with the database, create a repository
– Let’s create an interface that describes methods for working with records in the database
IUserRepository
public interface IUserRepository {
/**
* Возвращает список записей по id
*
* @return запрашиваемая запись
* @throws DbException в случае ошибки БД
*/
User getById(int id);
/**
* Возвращает список записей
*
* @return список всех записей
* @throws DbException в случае ошибки БД
*/
List<User> getUserList();
/**
* Вставка новой записи
*
* @param entity новая запись
* @throws DbException в случае ошибки БД
*/
void insert(User entity);
/**
* Удаление записи
*
* @param entity удаляемая запись
* @throws DbException в случае ошибки БД
*/
void delete(User entity);
}
– Now let’s write a class that implements interface methods
UserRepository
@Slf4j
@Repository
public class UserRepository implements IUserRepository {
// constants
private static final String SQL_SELECT_BY_NAME = "" +
"SELECT id, user_name, description FROM user_table WHERE id=?";
private static final String SQL_SELECT_LIST = "" +
"SELECT id, user_name, description FROM user_table";
private static final String SQL_INSERT = "" +
"INSERT INTO user_table (user_name, description) VALUES (?, ?)";
private static final String SQL_DELETE = "" +
"DELETE FROM user_table WHERE id = ?";
protected final static UserMapper USER_MAPPER = new UserMapper();
// beans
protected final JdbcTemplate template;
/**
* Req-args constructor for Spring DI
*/
public UserRepository(@Qualifier("bot-db") JdbcTemplate template) {
this.template = template;
}
/**
* Возвращает список записей по id
*
* @return запрашиваемая запись
* @throws DbException в случае ошибки БД
*/
@Override
public User getById(int id) throws DbException {
try {
return DataAccessUtils.singleResult(
template.query(SQL_SELECT_BY_NAME, USER_MAPPER, id));
} catch (DataAccessException exception) {
throw new DbException(exception);
}
}
/**
* Возвращает список записей
*
* @return запрашиваемая запись
* @throws DbException в случае ошибки БД
*/
@Override
public List<User> getUserList() throws DbException {
try {
return template.query(SQL_SELECT_LIST, USER_MAPPER);
} catch (DataAccessException exception) {
throw new DbException(exception);
}
}
/**
* Вставка новой записи
*
* @param entity новая запись
* @throws DbException в случае ошибки БД
*/
@Override
public void insert(User entity) throws DbException {
try {
// В параметры запроса все поля сущности кроме идентификатора, т.к. он serial и генерируется автоматом
var result = template.update(SQL_INSERT,
entity.getName(),
entity.getDescription());
if (result != 1) log.trace("UserRepository.update() with {} rows inserted", entity);
log.info("insert({}) result={}", entity, result);
} catch (DataAccessException exception) {
throw new DbException(exception);
}
}
/**
* Удаление записи
*
* @param entity удаляемая запись
* @throws DbException в случае ошибки БД
*/
@Override
public void delete(User entity) throws DbException {
try {
var result = template.update(SQL_DELETE, entity.getId());
if (result != 1) log.trace("UserRepository.delete() with {} rows inserted", entity);
log.info("delete({}) result={}", entity, result);
} catch (DataAccessException exception) {
throw new DbException(exception);
}
}
}
– Next, we have the logic of the bot, everything is trivial here, in the inherited onUpdateReceived method from the parent class TelegramLongPollingBot we write the behavior that happens when the chat with the user is updated, more about this herealso in the message processing method there is a call to our producer and writing data to the database
TelegramBot
@Slf4j
@Getter
@Component
public class TelegramBot extends TelegramLongPollingBot {
private Message requestMessage = new Message();
private final SendMessage response = new SendMessage();
private final Producer producerService;
private final UserService userService;
private final String botUsername;
private final String botToken;
public TelegramBot(
TelegramBotsApi telegramBotsApi,
@Value("${telegram-bot.name}") String botUsername,
@Value("${telegram-bot.token}") String botToken,
Producer producerService, UserService userService) throws TelegramApiException {
this.botUsername = botUsername;
this.botToken = botToken;
this.producerService = producerService;
this.userService = userService;
telegramBotsApi.registerBot(this);
}
/**
* Этот метод вызывается при получении обновлений через метод GetUpdates.
*
* @param request Получено обновление
*/
@SneakyThrows
@Override
public void onUpdateReceived(Update request) {
requestMessage = request.getMessage();
response.setChatId(requestMessage.getChatId().toString());
var entity = new User(
0, requestMessage.getChat().getUserName(),
requestMessage.getText());
if (request.hasMessage() && requestMessage.hasText())
log.info("Working onUpdateReceived, request text[{}]", request.getMessage().getText());
if (requestMessage.getText().equals("/start"))
defaultMsg(response, "Напишите команду для показа списка мыслей: \n " + "/idea - показать мысли");
else if (requestMessage.getText().equals("/idea"))
onIdea(response);
else
defaultMsg(response, "Я записал вашу мысль :) \n ");
log.info("Working, text[{}]", requestMessage.getText());
if (requestMessage.getText().startsWith("/")) {
entity.setStartWord("команда: ");
producerService.sendMessage( entity);
} else {
entity.setStartWord("мысль: ");
producerService.sendMessage( entity);
userService.insert(entity);
}
}
/**
* Метод отправки сообщения со списком мыслей - по команде "/idea"
*
* @param response - метод обработки сообщения
*/
private void onIdea(SendMessage response) throws TelegramApiException {
if (userService.getUserList().isEmpty()) {
defaultMsg(response, "В списке нет мыслей. \n");
} else {
defaultMsg(response, "Вот список ваших мыслей: \n");
for (User txt : userService.getUserList()) {
response.setText(txt.toString());
execute(response);
}
}
}
/**
* Шабонный метод отправки сообщения пользователю
*
* @param response - метод обработки сообщения
* @param msg - сообщение
*/
private void defaultMsg(SendMessage response, String msg) throws TelegramApiException {
response.setText(msg);
execute(response);
}
}
Code snippet with sending to Kafka and writing to the database
if (requestMessage.getText().startsWith("/")) {
entity.setStartWord("команда: ");
producerService.sendMessage( entity);
} else {
entity.setStartWord("мысль: ");
producerService.sendMessage( entity);
userService.insert(entity);
}
Let’s move on to creating the business logic of the application
– BaseService – implements the basic methods of project services
Base Service
public class BaseService {
/**
* Обёртка результата
*
* @param result результат
* @return результат
* @throws NotFoundException если результат null
*/
public <T> T wrapResult(T result) {
if(result == null)
throw new NotFoundException();
return result;
}
/**
* Обёртка результата
*
* @param result результат
* @return результат
* @throws NotFoundException если результат null или пустой
*/
public <T> List<T> wrapResults(List<T> result) {
if(result == null || result.size() == 0)
throw new NotFoundException();
return result;
}
}
– The UserService class works with our IUserRepository repository and contains the business logic for working with event records in the database
User Service
@Service
@Slf4j
@RequiredArgsConstructor
public class UserService extends BaseService {
//beans
protected final IUserRepository repo;
/**
* Возвращает список записей
*
* @return список записей
* @throws DbException в случае ошибки БД
*/
public List<User> getUserList() {
log.trace("#### getUserList() - working");
return wrapResults(repo.getUserList());
}
/**
* Возвращает список записей по id
*
* @throws DbException в случае ошибки БД
*/
public User getById(int id) {
log.trace("#### getById() [id={}]", id);
return wrapResult(repo.getById(id));
}
/**
* Вставка новой записи
*
* @param entity новая запись
* @throws DbException в случае ошибки БД
*/
public void insert(User entity) {
log.trace("#### insert() [entity={}]", entity);
repo.insert(entity);
}
/**
* Удаление записи
*
* @param entity удаляемая запись
* @throws DbException в случае ошибки БД
*/
public void delete(User entity) {
log.trace("#### delete() [entity={}]", entity);
repo.delete(entity);
}
}
– The Producer class, just the class that sends messages to the users topic, and here we can change the format of the message itself and the data that it sends
Producer
@Service
@Slf4j
public class Producer {
private static final String TOPIC = "users";
protected final IUserRepository repo;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public Producer(IUserRepository repo) {
this.repo = repo;
}
public void sendMessage(User user) {
if (user.getName() == null || user.getDescription().isEmpty()) log.info("#### Empty name/description message");
log.info("#### Producing message [user={}]", user);
kafkaTemplate.send(TOPIC, "Writing in log -> " + user);
}
}
At the end of the class, which actually launches our entire application
webhookapp
@Slf4j
@SpringBootApplication
public class WebHookApp {
public static void main(String[] args) {
SpringApplication.run(WebHookApp.class, args);
}
}
Now we have marinated all the ingredients and prepared the dish for baking:
– First check if Kafka is running

– After that, we launch Conductor and see that we have a message broker running, after launching our application, a users topic will appear here, to which messages sent by our producer will fly

– Next, run DBeaver and create 2 tables (log and user_table), here is the scheme for creating tables:
CREATE TABLE public.log (
id serial4 NOT NULL,
message varchar(500) NOT NULL,
date_time date NOT NULL,
topic varchar(100) NOT NULL,
CONSTRAINT log_pkey PRIMARY KEY (id)
);
CREATE TABLE public.user_table (
id serial4 NOT NULL,
user_name varchar(100) NOT NULL,
description varchar(500) NULL,
CONSTRAINT user_table_pkey PRIMARY KEY (id)
);



Great, the dish is baked and ready to serve:
– We start the project, check that everything is configured and works correctly

– Open telegrams and taste our “Frankenstein”

– Let’s see what Spring wrote to us in the logs and whether the data was written to Kafka and the database?
Logs of our bot, no errors observed
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.4.2)
2022-01-15 16:46:19.248 INFO 412498 --- [ main] com.secretary.bot.WebHookApp : The following profiles are active: bot
2022-01-15 16:46:19.291 WARN 412498 --- [kground-preinit] o.s.h.c.j.Jackson2ObjectMapperBuilder : For Jackson Kotlin classes support please add "com.fasterxml.jackson.module:jackson-module-kotlin" to the classpath
2022-01-15 16:46:19.882 INFO 412498 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8081 (http)
2022-01-15 16:46:19.887 INFO 412498 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2022-01-15 16:46:19.887 INFO 412498 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.41]
2022-01-15 16:46:19.956 INFO 412498 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2022-01-15 16:46:19.957 INFO 412498 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 678 ms
2022-01-15 16:46:20.013 INFO 412498 --- [ main] c.secretary.bot.config.DefaultDbConfig : [db] настройки БД: [{"url":"jdbc:postgresql://localhost:5432/postgres","driver":"org.postgresql.Driver","user":"*****","password":"*****","poolSize":"10","minPoolSize":4,"maxPoolSize":10,"idleTimeout":0,"maxLifetime":0,"bulkSize":null,"h2Database":false}]
2022-01-15 16:46:20.565 INFO 412498 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2022-01-15 16:46:20.574 DEBUG 412498 --- [ main] s.w.s.m.m.a.RequestMappingHandlerAdapter : ControllerAdvice beans: 0 @ModelAttribute, 0 @InitBinder, 1 RequestBodyAdvice, 1 ResponseBodyAdvice
2022-01-15 16:46:20.598 DEBUG 412498 --- [ main] s.w.s.m.m.a.RequestMappingHandlerMapping : 3 mappings in 'requestMappingHandlerMapping'
2022-01-15 16:46:20.619 DEBUG 412498 --- [ main] o.s.w.s.handler.SimpleUrlHandlerMapping : Patterns [/webjars/**, /**] in 'resourceHandlerMapping'
2022-01-15 16:46:20.627 DEBUG 412498 --- [ main] .m.m.a.ExceptionHandlerExceptionResolver : ControllerAdvice beans: 0 @ExceptionHandler, 1 ResponseBodyAdvice
2022-01-15 16:46:20.702 INFO 412498 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8081 (http) with context path ''
2022-01-15 16:46:20.709 INFO 412498 --- [ main] com.secretary.bot.WebHookApp : Started WebHookApp in 1.65 seconds (JVM running for 1.962)
SSS2022-01-15 16:52:33.916 INFO 412498 --- [legram Executor] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
internal.auto.downgrade.txn.commit = true
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
2022-01-15 16:52:33.947 INFO 412498 --- [legram Executor] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.6.0
2022-01-15 16:52:33.948 INFO 412498 --- [legram Executor] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 62abe01bee039651
2022-01-15 16:52:33.948 INFO 412498 --- [legram Executor] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1642254753947
2022-01-15 16:52:34.056 INFO 412498 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: faKjxP6CTvGFeeVKJw
2022-01-15 16:54:01.115 INFO 412498 --- [legram Executor] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...
2022-01-15 16:54:01.188 INFO 412498 --- [legram Executor] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
– As we can see, the messages sent to the Bot appeared in the database

– Having opened the conductor, go to the topics tab, then click on our users topic

– Next, in the tab of our topic, click on the CONSUME DATA button

– In the window that opens, set the same settings (the most important of them is Start From – indicates from what moment to show messages in Kafka, our setting – shows all messages, including those sent earlier)

– That’s all, now we have made sure that the messages arrived safely in Kafka, registered in the database and did not cause errors in the application

Well, thank you all so much for taking the time to read this article, I look forward to seeing you in second part See this tutorial where we use Consumer Kafka to handle incoming messages.