1
votes

Need a solution to write a data on RabbitMQ using AmqpWriter and read the data using RabbitMQ using AmqpReader. We're not looking for Apache Kafka, we want to simply send say Program details and consume it.

Writer Code

JobConfig.java

@Configuration
public class JobConfig {
    
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    
    @Bean
    public ConnectionFactory connectionFactory() {
        return new CachingConnectionFactory("localhost");
    }
    
    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(connectionFactory());
    }

    @Bean
    public Queue myQueue() {
       return new Queue("myqueue");
    }
    

    @Bean
    public FlatFileItemReader<Customer> customerItemReader() {
        FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
        reader.setLinesToSkip(1);
        reader.setResource(new ClassPathResource("/data/customer.csv"));

        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames(new String[] { "id", "firstName", "lastName", "birthdate" });

        DefaultLineMapper<Customer> customerLineMapper = new DefaultLineMapper<>();
        customerLineMapper.setLineTokenizer(tokenizer);
        customerLineMapper.setFieldSetMapper(new CustomerFieldSetMapper());
        customerLineMapper.afterPropertiesSet();

        reader.setLineMapper(customerLineMapper);

        return reader;
    }
        
    @Bean
    public AmqpItemWriter<Customer> amqpWriter(){
        AmqpItemWriter<Customer> amqpItemWriter = new AmqpItemWriter<>(this.rabbitTemplate());
        return amqpItemWriter;
    }
        
    @Bean
    public Step step1() throws Exception {
        return stepBuilderFactory.get("step1")
                .<Customer, Customer>chunk(10)
                .reader(customerItemReader())
                .writer(amqpWriter())
                .build();
    }
    
    @Bean
    public Job job() throws Exception {
        return jobBuilderFactory.get("job")
                .incrementer(new RunIdIncrementer())
                .start(step1())
                .build();
    }
}

CustomerFieldSetMapper.java

public class CustomerFieldSetMapper implements FieldSetMapper<Customer> {
    
    @Override
    public Customer mapFieldSet(FieldSet fieldSet) throws BindException {
        return Customer.builder()
                .id(fieldSet.readLong("id"))
                .firstName(fieldSet.readRawString("firstName"))
                .lastName(fieldSet.readRawString("lastName"))
                .birthdate(fieldSet.readRawString("birthdate"))
                .build();
    }
}

Customer.java

@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class Customer implements Serializable {
    private static final long serialVersionUID = 1L;
    private Long id;
    private String firstName;
    private String lastName;
    private String birthdate;
}

SpringBatchAmqpApplication.java

@EnableBatchProcessing
@SpringBootApplication
@EnableBinding(Source.class)
public class SpringBatchAmqpApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBatchAmqpApplication.class, args);
    }
}

Reader code

JobConfiguration.java

@Configuration
public class JobConfiguration {
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Bean
    public ConnectionFactory connectionFactory() {
        return new CachingConnectionFactory("localhost");
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setMessageConverter(jsonMessageConverter());
        return factory; 
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setDefaultReceiveQueue("myqueue");
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }

    @Bean
    public Queue myQueue() {
        return new Queue("myqueue");
    }

    @Bean
    public ItemReader<Customer> customerReader(){
        return new AmqpItemReader<>(this.rabbitTemplate());
    }

    @Bean
    public ItemWriter<Customer> customerItemWriter(){
        return items -> {
            for(Customer c : items) {
                System.out.println(c.toString());
            }
        };
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<Customer, Customer> chunk(10)
                .reader(customerReader())
                .writer(customerItemWriter())
                .listener(customerStepListener())
                .build();
    }

    @Bean
    public Job job() {
        return jobBuilderFactory.get("job")
                .start(step1())
                .build();
    }

    @Bean
    public CustomerStepListener customerStepListener() {
        return new CustomerStepListener();
    }
}

CustomerStepListener.java

public class CustomerStepListener implements StepExecutionListener {

    @Override
    public void beforeStep(StepExecution stepExecution) {
        System.out.println("==");
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        System.out.println("READ COUNT = "+stepExecution);
        return ExitStatus.COMPLETED;
    }
}

Logs

2021-01-18 18:41:05.023 INFO 25532 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step1] == 2021-01-18 18:41:05.031 INFO 25532 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: localhost:5672 2021-01-18 18:41:05.072 INFO 25532 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: connectionFactory#20a14b55:0/SimpleConnection@4650a407 [delegate=amqp://[email protected]:5672/, localPort= 55797] READ COUNT = StepExecution: id=1, version=2, name=step1, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription= 2021-01-18 18:41:05.097 INFO 25532 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step1] executed in 73ms 2021-01-18 18:41:05.099 INFO 25532 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=job]] completed with the following parameters: [{-spring.output.ansi.enabled=always}] and the following status: [COMPLETED] in 87ms

1
no data is going to Queue.: Do you have any errors/warnings in the logs? Can you check the readCount/writeCount from the step execution after your job is finished?Mahmoud Ben Hassine

1 Answers

1
votes

On the "Writer Code" side, you are using an AmqpItemWriter configured with a RabbitTemplate. By default, messages will be sent to the nameless exchange, here an excerpt from the Javadoc:

Messages will be sent to the nameless exchange if not specified on the provided AmqpTemplate.

In your writer configuration, there is no "connection" between the rabbit template and your queue. So you need to configure the rabbit template to send messages to your queue:

@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
    rabbitTemplate.setRoutingKey(myQueue().getName());
    return rabbitTemplate;
}

This is similar to what you did on the reader side with rabbitTemplate.setDefaultReceiveQueue("myqueue");.