Spring Boot and Quartz in cluster mode


The purpose of this mini-tutorial

Below is a brief overview of setting up an application written in Kotlin + Spring Boot, which is deployed on a cluster in multiple instances and uses the library Quartz to execute jobs scheduled by cron only on one of the instances of this service.

Example: in OpenShift, several microservice pods are spinning, one of which should generate a report once a day. If the pod crashed during operation for any reason, this task should be intercepted and executed by another pod. If the report generation was unsuccessful, you should try to run the report generation job a few more times within the next couple of hours. After N unsuccessful attempts, you need to restore the original cron for this job. The configuration of all microservice pods must be the same.

Brief introduction

Before reading this article, I strongly advise you to read this wonderful extensive overview of the Quartz library.

Alternatives

You may not need all the functionality that Quartz provides.
In this case, if you are using Spring in a project, I advise you to look at the library shed lock. Link to the repository in GitHub.
In short shed lock is a simple library that ensures that any task will be executed no more than once.
The implementation is based on locks stored in the database – just a couple of tables. It is very convenient that all configuration can be done only with the help of annotations in the style of Spring annotations @Scheduled.
However, the main disadvantage of this library is
ShedLock doesn’t track job lifecycle (there is no way to check that the task has been completed, reschedule the task if necessary).

If you are considering alternative mechanisms for synchronizing multiple instances of your application, I advise you to read this thread on StackOverflow
(here is my post with the content of this article)

Implementation

Task description

Below is an example of setting up an application on Spring Boot, which simultaneously spins on several servers and looks at one database. Each application instance has a bean – a task that is performed by cron. This building must be completed only once (on one of the instances).
If the Pod that was running the job crashes, the job must be restarted on any other worker Pod. If the pod did not fall during the task, but
the task was not executed (we received an exception during execution), the task must be restarted 2 more times with a delay of 5 часов * количество попыток.
If the 2nd attempt to restart was unsuccessful, then you need to install the default cron
for our task:
0 0 4 L-1 * ? * – execution at 4 am on the penultimate day of each month.

Now we have decided that we will definitely use Quartz and we will use it in cluster mode

Connect the dependency:

Gradle
implementation("org.springframework.boot:spring-boot-starter-quartz")

Maven
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-quartz</artifactId>
</dependency>

Filling the base:

Before we start writing configurations, we need to fill our database with tables that Quartz will work with. I used liquibase and the official scripts – here from here.
For Quartz tasks, it is better to have a separate scheme in the database.

Job to be run by cron:

We simulate the behavior in which a job or one of the services that this job uses can return an error in 50% of cases.

@Component
@Profile("quartz")
class SomeJob(
    private val someService: SomeService
) : QuartzJobBean() {
    private val log: Logger = LoggerFactory.getLogger(SomeJob::class.java)
    
    override fun executeInternal(jobExecutionContext: JobExecutionContext) {
        try {
            log.info("Doing awesome work...")
            someService.work()
            if ((1..10).random() >= 5) throw RuntimeException("Something went wrong...")
        } catch (e: Exception) {
            throw JobExecutionException(e)
        }
    }
}

Configuration setting

(more information Here):

@Configuration
@Profile("quartz")
class JobConfig {
    //JobDetail дла задания выше
    @Bean
    fun someJobDetail(): JobDetail {
        return JobBuilder
            .newJob(SomeJob::class.java).withIdentity("SomeJob")
            .withDescription("Some job")
            // Устанавливаем данное значение в true, если хотим, чтобы джоба была перезапущена
            // в случае падения пода
            .requestRecovery(true)
            // не удаляем задание из базы даже в случае, если ни один из триггеров на задание не укаывает
            .storeDurably().build()
    }

    //Trigger
    @Bean
    fun someJobTrigger(someJobDetail: JobDetail): Trigger {
        return TriggerBuilder.newTrigger().forJob(someJobDetail)
            .withIdentity("SomeJobTrigger")
            .withSchedule(CronScheduleBuilder.cronSchedule("0 0 4 L-1 * ? *"))
            .build()

    }

    // Необходимо также при старте пересоздавать уже имеющиеся задания 
    // (нужно на случай, если вы заходите изменить cron выражение для какого-либо из ваших заданий,
    // которые уже были созданы ранее, в противном случае в базе сохранится старое cron выражение)
    @Bean
    fun scheduler(triggers: List<Trigger>, jobDetails: List<JobDetail>, factory: SchedulerFactoryBean): Scheduler {
        factory.setWaitForJobsToCompleteOnShutdown(true)
        val scheduler = factory.scheduler
        factory.setOverwriteExistingJobs(true)
        //https://stackoverflow.com/questions/39673572/spring-quartz-scheduler-race-condition
        factory.setTransactionManager(JdbcTransactionManager())
        rescheduleTriggers(triggers, scheduler)
        scheduler.start()
        return scheduler
    }

    private fun rescheduleTriggers(triggers: List<Trigger>, scheduler: Scheduler) {
        triggers.forEach {
            if (!scheduler.checkExists(it.key)) {
                scheduler.scheduleJob(it)
            } else {
                scheduler.rescheduleJob(it.key, it)
            }
        }
    }
}
    

Creating a listener that will monitor the execution of our task:

For litener to work, you need to register it in the scheduler.

@Component
@Profile("quartz")
class JobListenerConfig(
    private val schedulerFactory: SchedulerFactoryBean,
    private val jobListener: JobListener
) {
    @PostConstruct
    fun addListener() {
        schedulerFactory.scheduler.listenerManager.addJobListener(jobListener, KeyMatcher.keyEquals(jobKey("SomeJob")))
    }
}

The main logic for processing the job life cycle:

We monitor the task execution status with the help of the listener, which we registered in the scheduler earlier. The listener has 2 methods:
jobToBeExecuted(context: JobExecutionContext)
And
jobWasExecuted(context: JobExecutionContext, jobException: JobExecutionException?),
that are executed before the start and after the execution of the task (regardless of whether the task was completed successfully or not)

All logic is presented below. I’ll just add a couple of comments:

  • The Shuduler understands how many times the trigger has been restarted using the information in jobDataMap. Moreover, this data is stored in the database, so in the event of an instance restart, the previous value of the number of unsuccessful trigger launches will be subtracted.

  • If the application crashes while the task is being executed, an unexecuted trigger may remain in the database, which, in turn, can be converted into a recovery trigger during the application restart (its name will begin with recovery_ and the trigger will have a group RECOVERING_JOBS)

@Profile("quartz")
class JobListener(
    //можно вытащить из контекста выполнения, либо заинжектить напрямую из application контекста
    private val scheduler: Scheduler,
    private val triggers: List<Trigger>
): JobListenerSupport() {

    private lateinit var triggerCronMap: Map<String, String>

    @PostConstruct
    fun post(){
        //В лист триггеров будут помещены только самописные задания, recover триггеры (если 
        //они существуют на момент старта приложения в этот лист внедрены не будут)
        triggerCronMap = triggers.associate {
            it.key.name to (it as CronTrigger).cronExpression
        }
    }

    override fun getName(): String {
        return "myJobListener"
    }


    override fun jobToBeExecuted(context: JobExecutionContext) {
        log.info("Job: ${context.jobDetail.key.name} ready to start by trigger: ${context.trigger.key.name}")
    }


    override fun jobWasExecuted(context: JobExecutionContext, jobException: JobExecutionException?) {
        //либо можно использовать context.mergedJobDataMap
        val dataMap = context.trigger.jobDataMap
        val count = if (dataMap["count"] != null) dataMap.getIntValue("count") else {
            dataMap.putAsString("count", 1)
            1
        }
        //В этот блок if можно добавить следующее условие: && !context.trigger.key.name.startsWith("recover_")
        // в этом случае шедулер не будет будет перезапускать recover триггеры, которые могут образоваться
        // в случае падения приложения во время выполнения задания.
        if (jobException != null ){
            if (count < 3) {
                log.warn("Job: ${context.jobDetail.key.name} filed while execution. Restart attempts count: $count ")
                val oldTrigger = context.trigger
                var newTriggerName = context.trigger.key.name + "_retry"
                //на случай, если триггер с таким именем уже существует (остался в бд после падения инстанса)
                context.scheduler.getTriggersOfJob(context.jobDetail.key)
                    .map { it.key.name }
                    .takeIf { it.contains(newTriggerName) }
                    ?.apply { newTriggerName += "_retry" }
                val newTrigger = TriggerBuilder.newTrigger()
                    .forJob(context.jobDetail)
                    .withIdentity(newTriggerName, context.trigger.key.group)
                    //заменяем наш cron триггер simple триггером, который будет запущен 
                    // через 5 часов * количество попыток перезапуска задания
                    .startAt(Date.from(Instant.now().plus((5 * count).toLong(), ChronoUnit.HOURS)))
                    .usingJobData("count", count + 1 )
                    .build()
                val date = scheduler.rescheduleJob(oldTrigger.key, newTrigger)
                log.warn("Rescheduling trigger: ${oldTrigger.key} to trigger: ${newTrigger.key}")
            } else {
                log.warn("The maximum number of restarts has been reached. Restart attempts: $count")
                recheduleWithDefaultTrigger(context)
            }
        } else if (count > 1) {
            recheduleWithDefaultTrigger(context)
        }
        else {
            log.info("Job: ${context.jobDetail.key.name} completed successfully")
        }
        context.scheduler.getTriggersOfJob(context.trigger.jobKey).forEach {
            log.info("Trigger with key: ${it.key} for job: ${context.trigger.jobKey.name} will start at ${it.nextFireTime ?: it.startTime}")
        }
    }

    private fun recheduleWithDefaultTrigger(context: JobExecutionContext) {
        val clone = context.jobDetail.clone() as JobDetail
        val defaultTriggerName = context.trigger.key.name.split("_")[0]
        //Recovery триггеры не должны быть перешедулены
        if (!triggerCronMap.contains(defaultTriggerName)) {
            log.warn("This trigger: ${context.trigger.key.name} for job: ${context.trigger.jobKey.name} is not self-written trigger. It can be recovery trigger or whatever. This trigger must not be recheduled.")
            return
        }
        log.warn("Remove all triggers for job: ${context.trigger.jobKey.name} and schedule default trigger for it: $defaultTriggerName")
        scheduler.deleteJob(clone.key)
        scheduler.addJob(clone, true)
        scheduler.scheduleJob(
            TriggerBuilder.newTrigger()
                .forJob(clone)
                .withIdentity(defaultTriggerName)
                .withSchedule(CronScheduleBuilder.cronSchedule(triggerCronMap[defaultTriggerName]))
                .usingJobData("count", 1)
                .startAt(Date.from(Instant.now().plusSeconds(5)))
                .build()
        )
    }
}

It is worth paying attention to a couple of methods:
jobException.setRefireImmediately(true)which can be used in conjunction with context.refireCountif you don’t need to reschedule the job after receiving a runtime error. The job will restart immediately.
In one of answers on StackOverflow it was recommended to use it in a job
Thread.sleep(N-seconds) instead of reassigning the task in case of a crash – this is clearly not the best idea ☺

application-quartz.yaml file

And the last thing left to do is write the configuration yaml profile file quartzwhich we will be using. I will leave comments in the file without translation:

spring:
  quartz:
    job-store-type: jdbc #Database Mode
    jdbc:
      initialize-schema: never #Do not initialize table structure
    properties:
      org:
        quartz:
          scheduler:
            instanceId: AUTO #Default hostname and timestamp generate instance ID, which can be any string, but must be the only corresponding qrtz_scheduler_state INSTANCE_NAME field for all dispatchers
            #instanceName: clusteredScheduler #quartzScheduler
          jobStore:
#            a few problems with the two properties below: https://github.com/spring-projects/spring-boot/issues/28758#issuecomment-974628989 & https://github.com/quartz-scheduler/quartz/issues/284
#            class: org.springframework.scheduling.quartz.LocalDataSourceJobStore #Persistence Configuration
            driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate #We only make database-specific proxies for databases
#            useProperties: true #Indicates that JDBC JobStore stores all values in JobDataMaps as strings, so more complex objects can be stored as name-value pairs rather than serialized in BLOB columns.In the long run, this is safer because you avoid serializing non-String classes to BLOB class versions.
            tablePrefix: quartz_schema.QRTZ_  #Database Table Prefix
            misfireThreshold: 60000 #The number of milliseconds the dispatcher will "tolerate" a Trigger to pass its next startup time before being considered a "fire".The default value (if you do not enter this property in the configuration) is 60000 (60 seconds).
            clusterCheckinInterval: 5000 #Set the frequency (in milliseconds) of this instance'checkin'* with other instances of the cluster.Affects the speed of detecting failed instances.
            isClustered: true #Turn on Clustering
          threadPool: #Connection Pool
            class: org.quartz.simpl.SimpleThreadPool
            threadCount: 3
            threadPriority: 1
            threadsInheritContextClassLoaderOfInitializingThread: true

Local debugging was carried out as follows: I wrote a couple docker compose files in which he raised the base and several application instances that he “set” on the raised base. If interested, I can describe it separately.

Additional Information:

Here are some more interesting articles on the topic that I recommend reading:
About quartz
Spring boot using quartz in cluster mode
An interesting article from colleagues from OTUS
Cluster effectively quartz

PS I will be glad to the constructive criticism of the decision offered above and with pleasure I will study alternatives.

Thank you for your time!

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *