Need a solution to write a data on RabbitMQ using AmqpWriter
and read the data using RabbitMQ using AmqpReader
. We're not looking for Apache Kafka, we want to simply send say Program details and consume it.
Writer Code
JobConfig.java
@Configuration
public class JobConfig {
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Bean
public ConnectionFactory connectionFactory() {
return new CachingConnectionFactory("localhost");
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
@Bean
public Queue myQueue() {
return new Queue("myqueue");
}
@Bean
public FlatFileItemReader<Customer> customerItemReader() {
FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
reader.setLinesToSkip(1);
reader.setResource(new ClassPathResource("/data/customer.csv"));
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames(new String[] { "id", "firstName", "lastName", "birthdate" });
DefaultLineMapper<Customer> customerLineMapper = new DefaultLineMapper<>();
customerLineMapper.setLineTokenizer(tokenizer);
customerLineMapper.setFieldSetMapper(new CustomerFieldSetMapper());
customerLineMapper.afterPropertiesSet();
reader.setLineMapper(customerLineMapper);
return reader;
}
@Bean
public AmqpItemWriter<Customer> amqpWriter(){
AmqpItemWriter<Customer> amqpItemWriter = new AmqpItemWriter<>(this.rabbitTemplate());
return amqpItemWriter;
}
@Bean
public Step step1() throws Exception {
return stepBuilderFactory.get("step1")
.<Customer, Customer>chunk(10)
.reader(customerItemReader())
.writer(amqpWriter())
.build();
}
@Bean
public Job job() throws Exception {
return jobBuilderFactory.get("job")
.incrementer(new RunIdIncrementer())
.start(step1())
.build();
}
}
CustomerFieldSetMapper.java
public class CustomerFieldSetMapper implements FieldSetMapper<Customer> {
@Override
public Customer mapFieldSet(FieldSet fieldSet) throws BindException {
return Customer.builder()
.id(fieldSet.readLong("id"))
.firstName(fieldSet.readRawString("firstName"))
.lastName(fieldSet.readRawString("lastName"))
.birthdate(fieldSet.readRawString("birthdate"))
.build();
}
}
Customer.java
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class Customer implements Serializable {
private static final long serialVersionUID = 1L;
private Long id;
private String firstName;
private String lastName;
private String birthdate;
}
SpringBatchAmqpApplication.java
@EnableBatchProcessing
@SpringBootApplication
@EnableBinding(Source.class)
public class SpringBatchAmqpApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchAmqpApplication.class, args);
}
}
Reader code
JobConfiguration.java
@Configuration
public class JobConfiguration {
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Bean
public ConnectionFactory connectionFactory() {
return new CachingConnectionFactory("localhost");
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setMessageConverter(jsonMessageConverter());
return factory;
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setDefaultReceiveQueue("myqueue");
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
@Bean
public Queue myQueue() {
return new Queue("myqueue");
}
@Bean
public ItemReader<Customer> customerReader(){
return new AmqpItemReader<>(this.rabbitTemplate());
}
@Bean
public ItemWriter<Customer> customerItemWriter(){
return items -> {
for(Customer c : items) {
System.out.println(c.toString());
}
};
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Customer, Customer> chunk(10)
.reader(customerReader())
.writer(customerItemWriter())
.listener(customerStepListener())
.build();
}
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(step1())
.build();
}
@Bean
public CustomerStepListener customerStepListener() {
return new CustomerStepListener();
}
}
CustomerStepListener.java
public class CustomerStepListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
System.out.println("==");
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
System.out.println("READ COUNT = "+stepExecution);
return ExitStatus.COMPLETED;
}
}
Logs
2021-01-18 18:41:05.023 INFO 25532 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step1] == 2021-01-18 18:41:05.031 INFO 25532 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: localhost:5672 2021-01-18 18:41:05.072 INFO 25532 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: connectionFactory#20a14b55:0/SimpleConnection@4650a407 [delegate=amqp://[email protected]:5672/, localPort= 55797] READ COUNT = StepExecution: id=1, version=2, name=step1, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription= 2021-01-18 18:41:05.097 INFO 25532 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step1] executed in 73ms 2021-01-18 18:41:05.099 INFO 25532 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=job]] completed with the following parameters: [{-spring.output.ansi.enabled=always}] and the following status: [COMPLETED] in 87ms
no data is going to Queue.
: Do you have any errors/warnings in the logs? Can you check thereadCount/writeCount
from the step execution after your job is finished? – Mahmoud Ben Hassine