0
votes

Can I have a job with just the slaves and no master and listen to a rabbitmq queue? I want to listen to a queue and process the messages in chunk oriented manner using spring batch and spring integration in a spring boot app.

I want to use the chunkProcessorChunkHandler configuration explained in the RemoteChunking example for Spring batch by Michael Minella (https://www.youtube.com/watch?v=30Tdp1mfR0g), but without a master configuration.

Below is my configuration for the job.

@Configuration
@EnableIntegration
public class QueueIntegrationConfiguration {

  @Autowired
  private CassandraItemWriter cassandraItemWriter;
  @Autowired
  private VendorProcessor vendorProcessor;

  @Autowired
  ConnectionFactory connectionFactory;

  @Bean
  public AmqpInboundChannelAdapter inboundChannelAdapter(
      SimpleMessageListenerContainer listenerContainer) {
    AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
    adapter.setOutputChannel(inboundQueueChannel());
    adapter.setAutoStartup(true);

    return adapter;
  }

 @Bean
  public SimpleMessageListenerContainer listenerContainer(ConnectionFactory connectionFactory, MessageConverter jsonMessageConverter) {

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(
        connectionFactory);
    container.setQueueNames("ProductStore_Partial");
    container.setAutoStartup(true);
    container.setMessageConverter(jsonMessageConverter);

    return container;
  }

  @Bean
  @ServiceActivator(inputChannel = "ProductStore_Partial")
  public ChunkProcessorChunkHandler chunkProcessorChunkHandler()
      throws Exception {
    SimpleChunkProcessor chunkProcessor = new SimpleChunkProcessor(vendorProcessor,
        cassandraItemWriter);
    chunkProcessor.afterPropertiesSet();

    ChunkProcessorChunkHandler<Vendor> chunkHandler = new ChunkProcessorChunkHandler<>();
    chunkHandler.setChunkProcessor(chunkProcessor);
    chunkHandler.afterPropertiesSet();

    return chunkHandler;
  }

  @Bean
  public QueueChannel inboundQueueChannel() {
    return new QueueChannel().;
  }

}

Below is my Application.java class for spring boot.

@SpringBootApplication
@EnableBatchProcessing
public class BulkImportProductApplication {

  public static void main(String[] args) {
    SpringApplication app = new SpringApplication(BulkImportProductApplication.class);
    app.setWebEnvironment(false);
    app.run(args).close();
  }

}

From what I understand from spring integration, I have an AmqpInboundChannelAdapter for listening to messages from the queue. A ServiceActivator, an inboundQueueChannel, autowired ItemProcessor and ItemWriter. I am not sure what am I missing here.

The batch job starts, consumes one message from the queue and get a cancelOk and my job terminates without processing the message.

I am also sharing my debug logging if that would help.

2017-12-04 09:58:49.679  INFO 7450 --- [           main] c.a.s.p.b.BulkImportProductApplication   : Started BulkImportProductApplication in 9.412 seconds (JVM running for 10.39)
2017-12-04 09:58:49.679  INFO 7450 --- [           main] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@31c88ec8: startup date [Mon Dec 04 09:58:40 PST 2017]; root of context hierarchy
2017-12-04 09:58:49.679 DEBUG 7450 --- [           main] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'org.springframework.integration.config.IdGeneratorConfigurer#0'
2017-12-04 09:58:49.680 DEBUG 7450 --- [           main] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'inboundChannelAdapter'
2017-12-04 09:58:49.680 DEBUG 7450 --- [           main] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'listenerContainer'
2017-12-04 09:58:49.680 DEBUG 7450 --- [           main] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'integrationHeaderChannelRegistry'
2017-12-04 09:58:49.680 DEBUG 7450 --- [           main] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'org.springframework.amqp.rabbit.config.internalRabbitListenerEndpointRegistry'
2017-12-04 09:58:49.680 DEBUG 7450 --- [           main] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean '_org.springframework.integration.errorLogger'
2017-12-04 09:58:49.680 DEBUG 7450 --- [           main] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'queueIntegrationConfiguration.chunkProcessorChunkHandler.serviceActivator.handler'
2017-12-04 09:58:49.680 DEBUG 7450 --- [           main] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'queueIntegrationConfiguration.chunkProcessorChunkHandler.serviceActivator'
2017-12-04 09:58:49.680 DEBUG 7450 --- [           main] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'lifecycleProcessor'
2017-12-04 09:58:49.680  INFO 7450 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647
2017-12-04 09:58:49.680 DEBUG 7450 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Asking bean 'inboundChannelAdapter' of type [class org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter] to stop
2017-12-04 09:58:49.680 DEBUG 7450 --- [           main] o.s.a.r.l.SimpleMessageListenerContainer : Shutting down Rabbit listener container
2017-12-04 09:58:49.814 DEBUG 7450 --- [pool-1-thread-5] o.s.a.r.listener.BlockingQueueConsumer   : Storing delivery for Consumer@7c52fc81: tags=[{}], channel=Cached Rabbit Channel: AMQChannel(amqp://admin@xxxx:5672/,2), conn: Proxy@26f1249d Shared Rabbit Connection: SimpleConnection@680bddf5 [delegate=amqp://admin@xxxx:5672/, localPort= 65035], acknowledgeMode=AUTO local queue size=0
2017-12-04 09:58:49.814 DEBUG 7450 --- [enerContainer-1] o.s.a.r.listener.BlockingQueueConsumer   : Received message: (Body:'[B@358a5358(byte[618])' MessageProperties [headers={__TypeId__=com.art.service.product.bulkimportproduct.data.model.Vendor}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=ProductStore, receivedRoutingKey=, receivedDelay=null, deliveryTag=2, messageCount=0, consumerTag=amq.ctag-nWGbRxjFiaeTEoZylv6Hrg, consumerQueue=null])
2017-12-04 09:58:49.815 DEBUG 7450 --- [enerContainer-1] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[amqp_receivedDeliveryMode] WILL be mapped, matched pattern=*
2017-12-04 09:58:49.815 DEBUG 7450 --- [enerContainer-1] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[amqp_contentEncoding] WILL be mapped, matched pattern=*
2017-12-04 09:58:49.815 DEBUG 7450 --- [enerContainer-1] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[amqp_receivedExchange] WILL be mapped, matched pattern=*
2017-12-04 09:58:49.815 DEBUG 7450 --- [enerContainer-1] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[amqp_deliveryTag] WILL be mapped, matched pattern=*
2017-12-04 09:58:49.815 DEBUG 7450 --- [enerContainer-1] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[json__TypeId__] WILL be mapped, matched pattern=*
2017-12-04 09:58:49.815 DEBUG 7450 --- [enerContainer-1] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[amqp_redelivered] WILL be mapped, matched pattern=*
2017-12-04 09:58:49.815 DEBUG 7450 --- [enerContainer-1] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[contentType] WILL be mapped, matched pattern=*
2017-12-04 09:58:49.815 DEBUG 7450 --- [enerContainer-1] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[__TypeId__] WILL be mapped, matched pattern=*
2017-12-04 09:58:49.815 DEBUG 7450 --- [enerContainer-1] o.s.integration.channel.QueueChannel     : preSend on channel 'inboundQueueChannel', message: GenericMessage [payload=byte[618], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_contentEncoding=UTF-8, amqp_receivedExchange=ProductStore, amqp_deliveryTag=2, json__TypeId__=com.art.service.product.bulkimportproduct.data.model.Vendor, amqp_redelivered=false, id=a4868670-240f-ddf2-8a8c-ac4b8d234cdd, amqp_consumerTag=amq.ctag-nWGbRxjFiaeTEoZylv6Hrg, contentType=json, __TypeId__=com.art.service.product.bulkimportproduct.data.model.Vendor, timestamp=1512410329815}]
2017-12-04 09:58:49.815 DEBUG 7450 --- [enerContainer-1] o.s.integration.channel.QueueChannel     : postSend (sent=true) on channel 'inboundQueueChannel', message: GenericMessage [payload=byte[618], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_contentEncoding=UTF-8, amqp_receivedExchange=ProductStore, amqp_deliveryTag=2, json__TypeId__=com.art.service.product.bulkimportproduct.data.model.Vendor, amqp_redelivered=false, id=a4868670-240f-ddf2-8a8c-ac4b8d234cdd, amqp_consumerTag=amq.ctag-nWGbRxjFiaeTEoZylv6Hrg, contentType=json, __TypeId__=com.art.service.product.bulkimportproduct.data.model.Vendor, timestamp=1512410329815}]
2017-12-04 09:58:49.853  INFO 7450 --- [           main] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2017-12-04 09:58:49.853 DEBUG 7450 --- [pool-1-thread-6] o.s.a.r.listener.BlockingQueueConsumer   : Received cancelOk for tag amq.ctag-nWGbRxjFiaeTEoZylv6Hrg (null); Consumer@7c52fc81: tags=[{}], channel=Cached Rabbit Channel: AMQChannel(amqp://admin@xxxx:5672/,2), conn: Proxy@26f1249d Shared Rabbit Connection: SimpleConnection@680bddf5 [delegate=amqp://admin@xxxx:5672/, localPort= 65035], acknowledgeMode=AUTO local queue size=0
2017-12-04 09:58:49.853 DEBUG 7450 --- [enerContainer-1] o.s.a.r.l.SimpleMessageListenerContainer : Cancelling Consumer@7c52fc81: tags=[{}], channel=Cached Rabbit Channel: AMQChannel(amqp://admin@xxxx:5672/,2), conn: Proxy@26f1249d Shared Rabbit Connection: SimpleConnection@680bddf5 [delegate=amqp://admin@xxxx:5672/, localPort= 65035], acknowledgeMode=AUTO local queue size=0
2017-12-04 09:58:49.853 DEBUG 7450 --- [enerContainer-1] o.s.a.r.listener.BlockingQueueConsumer   : Closing Rabbit Channel: Cached Rabbit Channel: AMQChannel(amqp://admin@xxxx:5672/,2), conn: Proxy@26f1249d Shared Rabbit Connection: SimpleConnection@680bddf5 [delegate=amqp://admin@xxxx:5672/, localPort= 65035]
2017-12-04 09:58:49.853 DEBUG 7450 --- [enerContainer-1] o.s.a.r.c.CachingConnectionFactory       : Closing cached Channel: AMQChannel(amqp://admin@xxxx:5672/,2)
2017-12-04 09:58:50.027  INFO 7450 --- [           main] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.
2017-12-04 09:58:50.027 DEBUG 7450 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Bean 'inboundChannelAdapter' completed its stop procedure

What am I missing here? Why is my message getting processed? Please correct me if I'm missing out something here? Also feel free to ask any other configuration that you feel would help analyze the situation here.

EDIT: After removing the code that closes the application context manually( app.run(args).close() ), I was able to receive the messages, but looks like they are lost after a successful retrieve. sharing the debug log for this behavior.

2017-12-04 14:39:11.297 DEBUG 1498 --- [pool-1-thread-5] o.s.a.r.listener.BlockingQueueConsumer   : Storing delivery for Consumer@7219ac49: tags=[{amq.ctag-Z8siptJMdxGU6sXdOHkVCA=ProductStore_Partial}], channel=Cached Rabbit Channel: AMQChannel(amqp://admin@xxxx:5672/,2), conn: Proxy@6df20ade Shared Rabbit Connection: SimpleConnection@7ba63fe5 [delegate=amqp://admin@xxxx:5672/, localPort= 51172], acknowledgeMode=AUTO local queue size=0
2017-12-04 14:39:11.297 DEBUG 1498 --- [enerContainer-1] o.s.a.r.listener.BlockingQueueConsumer   : Received message: (Body:'[B@347c8f87(byte[624])' MessageProperties [headers={__TypeId__=com.art.service.product.bulkimportproduct.data.model.Vendor}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=ProductStore, receivedRoutingKey=, receivedDelay=null, deliveryTag=2, messageCount=0, consumerTag=amq.ctag-Z8siptJMdxGU6sXdOHkVCA, consumerQueue=ProductStore_Partial])
2017-12-04 14:39:11.297 DEBUG 1498 --- [enerContainer-1] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[amqp_receivedDeliveryMode] WILL be mapped, matched pattern=*
2017-12-04 14:39:11.297 DEBUG 1498 --- [enerContainer-1] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[amqp_contentEncoding] WILL be mapped, matched pattern=*
2017-12-04 14:39:11.297 DEBUG 1498 --- [enerContainer-1] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[amqp_receivedExchange] WILL be mapped, matched pattern=*
2017-12-04 14:39:11.297 DEBUG 1498 --- [enerContainer-1] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[amqp_deliveryTag] WILL be mapped, matched pattern=*
2017-12-04 14:39:11.297 DEBUG 1498 --- [enerContainer-1] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[json__TypeId__] WILL be mapped, matched pattern=*
2017-12-04 14:39:11.297 DEBUG 1498 --- [enerContainer-1] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[amqp_redelivered] WILL be mapped, matched pattern=*
2017-12-04 14:39:11.297 DEBUG 1498 --- [enerContainer-1] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[contentType] WILL be mapped, matched pattern=*
2017-12-04 14:39:11.297 DEBUG 1498 --- [enerContainer-1] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[__TypeId__] WILL be mapped, matched pattern=*
2017-12-04 14:39:11.297 DEBUG 1498 --- [enerContainer-1] o.s.integration.channel.QueueChannel     : preSend on channel 'inboundQueueChannel', message: GenericMessage [payload=byte[624], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_contentEncoding=UTF-8, amqp_receivedExchange=ProductStore, amqp_deliveryTag=2, json__TypeId__=com.art.service.product.bulkimportproduct.data.model.Vendor, amqp_consumerQueue=ProductStore_Partial, amqp_redelivered=false, id=540399a5-62a6-7178-2524-e274bad4ed13, amqp_consumerTag=amq.ctag-Z8siptJMdxGU6sXdOHkVCA, contentType=json, __TypeId__=com.art.service.product.bulkimportproduct.data.model.Vendor, timestamp=1512427151297}]
2017-12-04 14:39:11.297 DEBUG 1498 --- [enerContainer-1] o.s.integration.channel.QueueChannel     : postSend (sent=true) on channel 'inboundQueueChannel', message: GenericMessage [payload=byte[624], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_contentEncoding=UTF-8, amqp_receivedExchange=ProductStore, amqp_deliveryTag=2, json__TypeId__=com.art.service.product.bulkimportproduct.data.model.Vendor, amqp_consumerQueue=ProductStore_Partial, amqp_redelivered=false, id=540399a5-62a6-7178-2524-e274bad4ed13, amqp_consumerTag=amq.ctag-Z8siptJMdxGU6sXdOHkVCA, contentType=json, __TypeId__=com.art.service.product.bulkimportproduct.data.model.Vendor, timestamp=1512427151297}]
2017-12-04 14:39:11.297 DEBUG 1498 --- [enerContainer-1] o.s.a.r.listener.BlockingQueueConsumer   : Retrieving delivery for Consumer@7219ac49: tags=[{amq.ctag-Z8siptJMdxGU6sXdOHkVCA=ProductStore_Partial}], channel=Cached Rabbit Channel: AMQChannel(amqp://admin@xxxx:5672/,2), conn: Proxy@6df20ade Shared Rabbit Connection: SimpleConnection@7ba63fe5 [delegate=amqp://admin@xxxx:5672/, localPort= 51172], acknowledgeMode=AUTO local queue size=0

This goes on repeating and new messages are consumed, but the messages are not getting processed and written to the data-store using the itemWriter provided. Now come to think of it, since I have not provided the tasklet/step bean reference anywhere in this code, is that something I am missing out here?

1

1 Answers

0
votes

app.run(args).close();

You are explicitly closing the application context, which shuts everything down.

Closing org.springframework.context.annotation.AnnotationConfigApplicationContext.