0
votes

I'm very new to Spring framework & Spring Batch

I'm trying setup sample spring batch remote partitioning example.

I'm using this stack Spring Boot + Spring Batch + Spring Integration + AWS SQS

Following things, I have done successfully.

1.Creating all configurations including channels, jobs, queues and other stuff.

2.Ran master process and I'm able to partition the Table and pushed partition metadata to AWS SQS.

But while running slave process I'm getting the error, In slave process, I'm able to pull the messages from the queue but getting the error while handle() method of StepExecutionRequestHandler

org.springframework.messaging.MessageHandlingException: nested exception is org.springframework.expression.spel.SpelEvaluationException: EL1004E: Method call: Method handle(java.lang.String) cannot be found on org.springframework.batch.integration.partition.StepExecutionRequestHandler type, failedMessage=GenericMessage [payload=StepExecutionRequest: [jobExecutionId=2, stepExecutionId=3, stepName=slaveStep], headers={sequenceNumber=2, aws_messageId="", SentTimestamp=1523215624042, sequenceSize=4, SenderId="", aws_receiptHandle="", ApproximateReceiveCount=2, correlationId=2:slaveStep, id="", lookupDestination=master, aws_queue=master, ApproximateFirstReceiveTimestamp=1523215634470, timestamp=1523215864910}]

@Configuration
public class JobConfiguration implements ApplicationContextAware
{

  @Autowired
  public JobBuilderFactory jobBuilderFactory;

  @Autowired
  public StepBuilderFactory stepBuilderFactory;

  @Autowired
  public DataSource dataSource;

  @Autowired
  public JobExplorer jobExplorer;

  @Autowired
  public JobRepository jobRepository;

  private ApplicationContext applicationContext;

  private static final int GRID_SIZE = 4;

  @Bean
  public PartitionHandler partitionHandler(MessagingTemplate messagingTemplate) throws Exception
  {
    MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();
    partitionHandler.setStepName("slaveStep");
    partitionHandler.setGridSize(GRID_SIZE);
    partitionHandler.setMessagingOperations(messagingTemplate);
    partitionHandler.setPollInterval(5000l);
    partitionHandler.setJobExplorer(this.jobExplorer);

    partitionHandler.afterPropertiesSet();

    return partitionHandler;
  }

  @Bean
  public ColumnRangePartitioner partitioner()
  {
    ColumnRangePartitioner columnRangePartitioner = new ColumnRangePartitioner();

    columnRangePartitioner.setColumn("id");
    columnRangePartitioner.setDataSource(this.dataSource);
    columnRangePartitioner.setTable("customer");
    return columnRangePartitioner;
  }

  @Bean
  @Profile("slave")
  @ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging")
  public StepExecutionRequestHandler stepExecutionRequestHandler()
  {
    StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();

    BeanFactoryStepLocator stepLocator = new BeanFactoryStepLocator();
    stepLocator.setBeanFactory(this.applicationContext);
    stepExecutionRequestHandler.setStepLocator(stepLocator);
    stepExecutionRequestHandler.setJobExplorer(this.jobExplorer);

    return stepExecutionRequestHandler;
  }

  @Bean(name = PollerMetadata.DEFAULT_POLLER)
  public PollerMetadata defaultPoller()
  {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setTrigger(new PeriodicTrigger(10));
    return pollerMetadata;
  }

  @Bean
  @StepScope
  public JdbcPagingItemReader<Customer> pagingItemReader(@Value("#{stepExecutionContext['minValue']}") Long minValue,
      @Value("#{stepExecutionContext['maxValue']}") Long maxValue)
  {
    System.out.println("reading " + minValue + " to " + maxValue);
    JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();

    reader.setDataSource(this.dataSource);
    reader.setFetchSize(100);
    reader.setRowMapper(new CustomerRowMapper());

    MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
    queryProvider.setSelectClause("id, firstName, lastName, birthdate");
    queryProvider.setFromClause("from customer");
    queryProvider.setWhereClause("where id >= " + minValue + " and id <= " + maxValue);

    Map<String, Order> sortKeys = new HashMap<>(1);

    sortKeys.put("id", Order.ASCENDING);

    queryProvider.setSortKeys(sortKeys);

    reader.setQueryProvider(queryProvider);

    return reader;
  }

  @Bean
  @StepScope
  public JdbcBatchItemWriter<Customer> customerItemWriter()
  {
    JdbcBatchItemWriter<Customer> itemWriter = new JdbcBatchItemWriter<>();

    itemWriter.setDataSource(this.dataSource);
    itemWriter.setSql("INSERT INTO new_customer VALUES (:id, :firstName, :lastName, :birthdate)");
    itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());
    itemWriter.afterPropertiesSet();

    return itemWriter;
  }

  @Bean
  public Step step1() throws Exception
  {
    return stepBuilderFactory.get("step1").partitioner(slaveStep().getName(), partitioner()).step(slaveStep())
        .partitionHandler(partitionHandler(null)).build();
  }

  @Bean
  public Step slaveStep()
  {
    return stepBuilderFactory.get("slaveStep").<Customer, Customer>chunk(1000).reader(pagingItemReader(null, null)).writer(customerItemWriter())
        .build();
  }

  @Bean
  @Profile("master")
  public Job job() throws Exception
  {
    return jobBuilderFactory.get("job").start(step1()).build();
  }

  @Override
  public void setApplicationContext(ApplicationContext applicationContext) throws BeansException
  {
    this.applicationContext = applicationContext;
  }
}

@Configuration
public class IntegrationConfiguration
{

  @Autowired
  private AmazonSQSAsync amazonSqs;

  @Bean
  public MessagingTemplate messageTemplate()
  {
    MessagingTemplate messagingTemplate = new MessagingTemplate(outboundRequests());

    messagingTemplate.setReceiveTimeout(60000000l);

    return messagingTemplate;
  }

  @Bean
  public DirectChannel outboundRequests()
  {
    return new DirectChannel();
  }


  @Bean
  @Profile("slave")
  public MessageProducer sqsMessageDrivenChannelAdapter()
  {
    SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(this.amazonSqs, "master");
    adapter.setOutputChannel(inboundRequests());
    adapter.afterPropertiesSet();
    return adapter;
  }

  @Bean
  @ServiceActivator(inputChannel = "outboundRequests")
  public MessageHandler sqsMessageHandler()
  {
    SqsMessageHandler messageHandler = new SqsMessageHandler(amazonSqs);
    messageHandler.setQueue("master");
    return messageHandler;
  }

  @Bean
  public PollableChannel outboundStaging()
  {
    return new NullChannel();
  }

  @Bean
  public QueueChannel inboundRequests()
  {
    return new QueueChannel();
  }
}

Thanks

1

1 Answers

0
votes

You should keep in mind that a StepExecutionRequestHandler has a contract like:

public StepExecution handle(StepExecutionRequest request)

According to you exception and the nature of SQS the payload of the message in the inboundRequests is a string. And I believe it is in the JSON. So, consider to use a JsonToObjectTrabsformer before StepExecutionRequestHandler.

UPDATE

the payload is not is JSON format. it is a string , creating from toString() of StepExecutionRequest class. The format is StepExecutionRequest: [jobExecutionId=2, stepExecutionId=3,stepName=slaveStep]

OK! I see what you mean. An SQS Message can have only String body. The SqsMessageHandler to send messages to the SQS uses GenericMessageConverter by default to convert an incoming object to the String.

I think you need to think to configure SqsMessageHandler with the MappingJackson2MessageConverter to really serialize a StepExecutionRequest to the proper JSON and let it be transferred over SQS.

On the other (slave) side before calling StepExecutionRequestHandler you really should place a @Transformer just after SqsMessageDrivenChannelAdapter for the JsonToObjectTransformer.