I want to implement Kafka Consumer and Producer which sends and receives Java Objects. Full Source I tried this:
Producer:
@Configuration
public class KafkaProducerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, SaleRequestFactory> saleRequestFactoryProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SaleRequestFactorySerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate() {
return new KafkaTemplate<>(saleRequestFactoryProducerFactory());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ReplyingKafkaTemplate<String, SaleRequestFactory, SaleResponseFactory> replyKafkaTemplate(ProducerFactory<String, SaleRequestFactory> producerFactory, ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> factory) {
ConcurrentMessageListenerContainer<String, SaleResponseFactory> kafkaMessageListenerContainer = factory.createContainer("tp-sale");
kafkaMessageListenerContainer.getContainerProperties().setGroupId("tp-sale.reply");
return new ReplyingKafkaTemplate<>(producerFactory, kafkaMessageListenerContainer);
}
}
Send object:
@RestController
@RequestMapping("/checkout")
public class CheckoutController {
private TransactionService transactionService;
private KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate;
private ReplyingKafkaTemplate<String, SaleRequestFactory, SaleResponseFactory> requestReplyKafkaTemplate;
private static String topic = "tp-sale";
@Autowired
public CheckoutController(ValidationMessage validationMessage, TransactionService transactionService,
KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate,
ReplyingKafkaTemplate<String, SaleRequestFactory, SaleResponseFactory> requestReplyKafkaTemplate){
this.transactionService = transactionService;
this.saleRequestFactoryKafkaTemplate = saleRequestFactoryKafkaTemplate;
this.requestReplyKafkaTemplate = requestReplyKafkaTemplate;
}
@PostMapping("test")
private void performPayment() throws ExecutionException, InterruptedException, TimeoutException {
Transaction transaction = new Transaction();
transaction.setStatus(PaymentTransactionStatus.IN_PROGRESS.getText());
Transaction insertedTransaction = transactionService.save(transaction);
SaleRequestFactory obj = new SaleRequestFactory();
obj.setId(100);
ProducerRecord<String, SaleRequestFactory> record = new ProducerRecord<>("tp-sale", obj);
RequestReplyFuture<String, SaleRequestFactory, SaleResponseFactory> replyFuture = requestReplyKafkaTemplate.sendAndReceive(record);
SendResult<String, SaleRequestFactory> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, SaleResponseFactory> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
SaleResponseFactory value = consumerRecord.value();
System.out.println("!!!!!!!!!!!! " + value.getUnique_id());
}
}
Consumer:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
private String groupId = "test";
@Bean
public ConsumerFactory<String, SaleResponseFactory> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SaleResponseFactoryDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Receive Object
@Component
public class ProcessingSaleListener {
private static String topic = "tp-sale";
@KafkaListener(topics = "tp-sale")
public SaleResponseFactory process(@Payload SaleRequestFactory tf, @Headers MessageHeaders headers) throws Exception {
System.out.println(tf.getId());
SaleResponseFactory resObj = new SaleResponseFactory();
resObj.setUnique_id("123123");
return resObj;
}
}
Custom objects
import java.io.Serializable;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder(toBuilder = true)
public class SaleRequestFactory implements Serializable {
private static final long serialVersionUID = 1744050117179344127L;
private int id;
}
Serializer
import org.apache.kafka.common.serialization.Serializer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
public class SaleRequestFactorySerializer implements Serializable, Serializer<SaleRequestFactory> {
@Override
public byte[] serialize(String topic, SaleRequestFactory data)
{
ByteArrayOutputStream out = new ByteArrayOutputStream();
try
{
ObjectOutputStream outputStream = new ObjectOutputStream(out);
outputStream.writeObject(data);
out.close();
}
catch (IOException e)
{
throw new RuntimeException("Unhandled", e);
}
return out.toByteArray();
}
}
Response Object
import java.io.Serializable;
import java.time.LocalDateTime;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder(toBuilder = true)
public class SaleResponseFactory implements Serializable {
private static final long serialVersionUID = 1744050117179344127L;
private String unique_id;
}
Response Class
import org.apache.kafka.common.serialization.Deserializer;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
public class SaleResponseFactoryDeserializer implements Serializable, Deserializer<SaleRequestFactory> {
@Override
public SaleRequestFactory deserialize(String topic, byte[] data)
{
SaleRequestFactory saleRequestFactory = null;
try
{
ByteArrayInputStream bis = new ByteArrayInputStream(data);
ObjectInputStream in = new ObjectInputStream(bis);
saleRequestFactory = (SaleRequestFactory) in.readObject();
in.close();
}
catch (IOException | ClassNotFoundException e)
{
throw new RuntimeException("Unhandled", e);
}
return saleRequestFactory;
}
}
I want to send and receive different Serialized Java Object based on the Object type. For example sometimes SaleRequestFactory
and to receive SaleResponseFactory
or to send AuthRequestFactory
and receive AuthResponseFactory
. Is it possible to send and receive diffrent Java Obects using one topic?
Full example code