I'm very new to Spring framework & Spring Batch
I'm trying setup sample spring batch remote partitioning example.
I'm using this stack Spring Boot + Spring Batch + Spring Integration + AWS SQS
Following things, I have done successfully.
1.Creating all configurations including channels, jobs, queues and other stuff.
2.Ran master process and I'm able to partition the Table and pushed partition metadata to AWS SQS.
But while running slave process I'm getting the error, In slave process, I'm able to pull the messages from the queue but getting the error while handle() method of StepExecutionRequestHandler
org.springframework.messaging.MessageHandlingException: nested exception is org.springframework.expression.spel.SpelEvaluationException: EL1004E: Method call: Method handle(java.lang.String) cannot be found on org.springframework.batch.integration.partition.StepExecutionRequestHandler type, failedMessage=GenericMessage [payload=StepExecutionRequest: [jobExecutionId=2, stepExecutionId=3, stepName=slaveStep], headers={sequenceNumber=2, aws_messageId="", SentTimestamp=1523215624042, sequenceSize=4, SenderId="", aws_receiptHandle="", ApproximateReceiveCount=2, correlationId=2:slaveStep, id="", lookupDestination=master, aws_queue=master, ApproximateFirstReceiveTimestamp=1523215634470, timestamp=1523215864910}]
@Configuration
public class JobConfiguration implements ApplicationContextAware
{
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
public DataSource dataSource;
@Autowired
public JobExplorer jobExplorer;
@Autowired
public JobRepository jobRepository;
private ApplicationContext applicationContext;
private static final int GRID_SIZE = 4;
@Bean
public PartitionHandler partitionHandler(MessagingTemplate messagingTemplate) throws Exception
{
MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();
partitionHandler.setStepName("slaveStep");
partitionHandler.setGridSize(GRID_SIZE);
partitionHandler.setMessagingOperations(messagingTemplate);
partitionHandler.setPollInterval(5000l);
partitionHandler.setJobExplorer(this.jobExplorer);
partitionHandler.afterPropertiesSet();
return partitionHandler;
}
@Bean
public ColumnRangePartitioner partitioner()
{
ColumnRangePartitioner columnRangePartitioner = new ColumnRangePartitioner();
columnRangePartitioner.setColumn("id");
columnRangePartitioner.setDataSource(this.dataSource);
columnRangePartitioner.setTable("customer");
return columnRangePartitioner;
}
@Bean
@Profile("slave")
@ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging")
public StepExecutionRequestHandler stepExecutionRequestHandler()
{
StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
BeanFactoryStepLocator stepLocator = new BeanFactoryStepLocator();
stepLocator.setBeanFactory(this.applicationContext);
stepExecutionRequestHandler.setStepLocator(stepLocator);
stepExecutionRequestHandler.setJobExplorer(this.jobExplorer);
return stepExecutionRequestHandler;
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller()
{
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(new PeriodicTrigger(10));
return pollerMetadata;
}
@Bean
@StepScope
public JdbcPagingItemReader<Customer> pagingItemReader(@Value("#{stepExecutionContext['minValue']}") Long minValue,
@Value("#{stepExecutionContext['maxValue']}") Long maxValue)
{
System.out.println("reading " + minValue + " to " + maxValue);
JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
reader.setDataSource(this.dataSource);
reader.setFetchSize(100);
reader.setRowMapper(new CustomerRowMapper());
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("id, firstName, lastName, birthdate");
queryProvider.setFromClause("from customer");
queryProvider.setWhereClause("where id >= " + minValue + " and id <= " + maxValue);
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);
reader.setQueryProvider(queryProvider);
return reader;
}
@Bean
@StepScope
public JdbcBatchItemWriter<Customer> customerItemWriter()
{
JdbcBatchItemWriter<Customer> itemWriter = new JdbcBatchItemWriter<>();
itemWriter.setDataSource(this.dataSource);
itemWriter.setSql("INSERT INTO new_customer VALUES (:id, :firstName, :lastName, :birthdate)");
itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());
itemWriter.afterPropertiesSet();
return itemWriter;
}
@Bean
public Step step1() throws Exception
{
return stepBuilderFactory.get("step1").partitioner(slaveStep().getName(), partitioner()).step(slaveStep())
.partitionHandler(partitionHandler(null)).build();
}
@Bean
public Step slaveStep()
{
return stepBuilderFactory.get("slaveStep").<Customer, Customer>chunk(1000).reader(pagingItemReader(null, null)).writer(customerItemWriter())
.build();
}
@Bean
@Profile("master")
public Job job() throws Exception
{
return jobBuilderFactory.get("job").start(step1()).build();
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException
{
this.applicationContext = applicationContext;
}
}
@Configuration
public class IntegrationConfiguration
{
@Autowired
private AmazonSQSAsync amazonSqs;
@Bean
public MessagingTemplate messageTemplate()
{
MessagingTemplate messagingTemplate = new MessagingTemplate(outboundRequests());
messagingTemplate.setReceiveTimeout(60000000l);
return messagingTemplate;
}
@Bean
public DirectChannel outboundRequests()
{
return new DirectChannel();
}
@Bean
@Profile("slave")
public MessageProducer sqsMessageDrivenChannelAdapter()
{
SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(this.amazonSqs, "master");
adapter.setOutputChannel(inboundRequests());
adapter.afterPropertiesSet();
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "outboundRequests")
public MessageHandler sqsMessageHandler()
{
SqsMessageHandler messageHandler = new SqsMessageHandler(amazonSqs);
messageHandler.setQueue("master");
return messageHandler;
}
@Bean
public PollableChannel outboundStaging()
{
return new NullChannel();
}
@Bean
public QueueChannel inboundRequests()
{
return new QueueChannel();
}
}
Thanks