8
votes

In Spring Batch 3.0 I'm trying to use the new Job Scope functionality for beans in both partitioned and multi-threaded steps (configured with an task:executor bean), and in both cases I'm getting the exception

  Caused by: java.lang.IllegalStateException: No context holder available for job scope
        at org.springframework.batch.core.scope.JobScope.getContext(JobScope.java:153)
        at org.springframework.batch.core.scope.JobScope.get(JobScope.java:92)
        at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:338)

but if i make the beans step scope it works OK.

I noticed the comment on JobSynchronizationManager which says

N.B. it is the responsibility of every {@link Job} implementation to ensure that a {@link JobContext} is available on every thread that might be involved in a job execution, including worker threads from a pool.

so I'm wondering if I need to do something to set this up or if its a bug in the the job scope implementation that it doesn't set up the worker threads correctly?

StepSynchronizationManager has a similar comment - but in that case something is obviously setting up the threads correctly within the step.

Sample code to reproduce issue:

TestItemReader

package test;

import java.util.ArrayList;
import java.util.List;

import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.beans.factory.InitializingBean;

public class TestItemReader implements ItemReader<Integer>, InitializingBean {

    private List<Integer> items;

    @Override
    public synchronized Integer read() throws Exception, UnexpectedInputException,
            ParseException, NonTransientResourceException {

        if (items.size() > 0) {
            return items.remove(0);
        }

        return null;
    }

    @Override
    public void afterPropertiesSet() throws Exception {

        System.out.println("Initialising reader");

        items = new ArrayList<Integer>();   
        for (int i=0;i<100;i++) items.add(i);       
    }
}

TestItemWriter

package test;

import java.util.List;

import org.springframework.batch.item.ItemWriter;

public class TestItemWriter implements ItemWriter<Integer> {

    @Override
    public void write(List<? extends Integer> items) throws Exception {

        for (int i : items) {
            System.out.println(Thread.currentThread().getName() + " Writing " + i);
        }       
    }
}

test-job-context.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:batch="http://www.springframework.org/schema/batch"
    xmlns:jdbc="http://www.springframework.org/schema/jdbc"
    xmlns:util="http://www.springframework.org/schema/util"
    xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
                        http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd
                        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
                        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                         http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

    <job id="job" restartable="true" xmlns="http://www.springframework.org/schema/batch">
        <step id="index">   
            <tasklet task-executor="executor">
                <chunk reader="itemReader" writer="itemWriter" commit-interval="5"/>
            </tasklet>
        </step>
    </job>

    <bean id="itemReader" class="test.TestItemReader" scope="job"/>

    <bean id="itemWriter" class="test.TestItemWriter"/>

    <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
        <property name="dataSource" ref="dataSource"/>
    </bean>

    <bean class="org.springframework.batch.test.JobLauncherTestUtils">
        <property name="job" ref="job"/>
    </bean>

    <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository" />
    </bean>

    <batch:job-repository id="jobRepository"/>

    <jdbc:embedded-database id="dataSource" type="HSQL">
        <jdbc:script location="classpath:/org/springframework/batch/core/schema-hsqldb.sql"/>
    </jdbc:embedded-database>

    <task:executor id="executor" queue-capacity="0" pool-size="5"/>

</beans>

JobTest

package test;

import java.util.Collection;

import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
import org.springframework.test.context.ContextConfiguration;

@ContextConfiguration(locations={"test-job-context.xml"})
public class JobTest extends AbstractJUnit4SpringContextTests {

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @BeforeClass
    public static void beforeClassSetup() {    

        BasicConfigurator.configure();
        Logger.getRootLogger().setLevel(Level.WARN);
        Logger.getLogger("org.springframework.batch.core.scope.JobScope").setLevel(Level.DEBUG);
        Logger.getLogger("org.springframework.batch.core.scope.StepScope").setLevel(Level.DEBUG);
    }

    @Test
    public void testJobLaunch() throws Exception {

        JobExecution execution = jobLauncherTestUtils.launchJob();

        System.out.println("After execution "  + execution);

        Collection<StepExecution> stepExecutions = execution.getStepExecutions();
        for (StepExecution stepExecution : stepExecutions) {
            System.out.println("StepExecution " + stepExecution);
        }
    }   
}

Running the above JUnit test will reproduce the issue. If you change the scope on the reader to step, or remove the scope the test completes normally.

1
I think I know what's going on, but can you post the configuration for me to confirm (I'm thinking that you're attempting to use the job scope in the slaves?).Michael Minella
Our actual job configuration is quite complex, with a lot of steps etc, so I've added some sample code above that reproduces the problem in a simple multi-threaded step. We are using both multi-threaded steps and partitioning to see what gives the best performance, but we get the same issue on both. I've made the reader bean the one with the scope but we are getting the same issue with processors or any bean that they depend on.David Geary

1 Answers

3
votes

This is due to the fact that the current execution (JobExecution in this case) is stored in a ThreadLocal (see org.springframework.batch.core.scope.context.SynchronizationManagerSupport). That being said, adding multithreaded support for this doesn't seem unreasonable. Feel free to create a Jira issue for it (and a Pull Request if you so incline).