4
votes

the Spring Batch docs say of the Map-backed job repository:

Note that the in-memory repository is volatile and so does not allow restart between JVM instances. It also cannot guarantee that two job instances with the same parameters are launched simultaneously, and is not suitable for use in a multi-threaded Job, or a locally partitioned Step. So use the database version of the repository wherever you need those features.

I would like to use a Map job repository, and I do not care about restarting, prevention of concurrent job executions, etc. but I do care about being able to use multi-threading and local partitioning.

My batch application has some partitioned steps, and at first glance it seems to run just fine with a Map-backed job repository.

What is the reason it said to be not possible with MapJobRepositoryFactoryBean? Looking at the implementation of Map DAOs, they are using ConcurrentHashMap. Is this not thread-safe ?

1

1 Answers

1
votes

I would advise you to follow the documentation, rather than relying on implementation details. Even if the maps are individually thread-safe, there might be race conditions in changes than involve more than one of these maps.

You can use an in-memory database very easily. Example

@Grapes([
        @Grab('org.springframework:spring-jdbc:4.0.5.RELEASE'),
        @Grab('com.h2database:h2:1.3.175'),
        @Grab('org.springframework.batch:spring-batch-core:3.0.6.RELEASE'),
        // must be passed with -cp, for whatever reason the GroovyClassLoader
        // is not used for com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver
        //@Grab('org.codehaus.jettison:jettison:1.2'),
])
import org.h2.jdbcx.JdbcDataSource
import org.springframework.batch.core.Job
import org.springframework.batch.core.JobParameters
import org.springframework.batch.core.Step
import org.springframework.batch.core.StepContribution
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory
import org.springframework.batch.core.launch.JobLauncher
import org.springframework.batch.core.scope.context.ChunkContext
import org.springframework.batch.core.step.tasklet.Tasklet
import org.springframework.batch.repeat.RepeatStatus
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.annotation.AnnotationConfigApplicationContext
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.core.io.ResourceLoader
import org.springframework.jdbc.datasource.init.DatabasePopulatorUtils
import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator

import javax.annotation.PostConstruct
import javax.sql.DataSource

@Configuration
@EnableBatchProcessing
class AppConfig {

    @Autowired
    private JobBuilderFactory jobs

    @Autowired
    private StepBuilderFactory steps

    @Bean
    public Job job() {
        return jobs.get("myJob").start(step1()).build()
    }

    @Bean
    Step step1() {
        this.steps.get('step1')
            .tasklet(new MyTasklet())
            .build()
    }

    @Bean
    DataSource dataSource() {
        new JdbcDataSource().with {
            url = 'jdbc:h2:mem:temp_db;DB_CLOSE_DELAY=-1'
            user = 'sa'
            password = 'sa'
            it
        }
    }

    @Bean
    BatchSchemaPopulator batchSchemaPopulator() {
        new BatchSchemaPopulator()
    }
}

class BatchSchemaPopulator {
    @Autowired
    ResourceLoader resourceLoader

    @Autowired
    DataSource dataSource

    @PostConstruct
    void init() {
        def populator = new ResourceDatabasePopulator()
        populator.addScript(
                resourceLoader.getResource(
                        'classpath:/org/springframework/batch/core/schema-h2.sql'))
        DatabasePopulatorUtils.execute populator, dataSource
    }
}

class MyTasklet implements Tasklet {

    @Override
    RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        println 'TEST!'
    }
}

def ctx = new AnnotationConfigApplicationContext(AppConfig)
def launcher = ctx.getBean(JobLauncher)
def jobExecution = launcher.run(ctx.getBean(Job), new JobParameters([:]))
println "Status is: ${jobExecution.status}"