0
votes

I want to implement Kafka Consumer and Producer which sends and receives Java Objects. Full Source I tried this:

Producer:

    @Configuration
public class KafkaProducerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<String, SaleRequestFactory> saleRequestFactoryProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SaleRequestFactorySerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new DefaultKafkaProducerFactory<>(configProps);
    }


    @Bean
    public KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate() {
        return new KafkaTemplate<>(saleRequestFactoryProducerFactory());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ReplyingKafkaTemplate<String, SaleRequestFactory, SaleResponseFactory> replyKafkaTemplate(ProducerFactory<String, SaleRequestFactory> producerFactory, ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> factory) {
        ConcurrentMessageListenerContainer<String, SaleResponseFactory> kafkaMessageListenerContainer = factory.createContainer("tp-sale");
        kafkaMessageListenerContainer.getContainerProperties().setGroupId("tp-sale.reply");
        return new ReplyingKafkaTemplate<>(producerFactory, kafkaMessageListenerContainer);
    }
}

Send object:

 @RestController
@RequestMapping("/checkout")
public class CheckoutController {
    
    private TransactionService transactionService;
    private KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate;
    private ReplyingKafkaTemplate<String, SaleRequestFactory, SaleResponseFactory> requestReplyKafkaTemplate;
    private static String topic = "tp-sale";

    @Autowired
    public CheckoutController(ValidationMessage validationMessage, TransactionService transactionService,
                              KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate,
                              ReplyingKafkaTemplate<String, SaleRequestFactory, SaleResponseFactory> requestReplyKafkaTemplate){
        this.transactionService = transactionService;
        this.saleRequestFactoryKafkaTemplate = saleRequestFactoryKafkaTemplate;
        this.requestReplyKafkaTemplate = requestReplyKafkaTemplate;
    }

    @PostMapping("test")
    private void performPayment() throws ExecutionException, InterruptedException, TimeoutException {

        Transaction transaction = new Transaction();
        transaction.setStatus(PaymentTransactionStatus.IN_PROGRESS.getText());

        Transaction insertedTransaction = transactionService.save(transaction);

        SaleRequestFactory obj = new SaleRequestFactory();
        obj.setId(100);

        ProducerRecord<String, SaleRequestFactory> record = new ProducerRecord<>("tp-sale", obj);
        RequestReplyFuture<String, SaleRequestFactory, SaleResponseFactory> replyFuture = requestReplyKafkaTemplate.sendAndReceive(record);
        SendResult<String, SaleRequestFactory> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
        ConsumerRecord<String, SaleResponseFactory> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);


        SaleResponseFactory value = consumerRecord.value();
        System.out.println("!!!!!!!!!!!! " + value.getUnique_id());
    }
}

Consumer:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    private String groupId = "test";

    @Bean
    public ConsumerFactory<String, SaleResponseFactory> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SaleResponseFactoryDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

Receive Object

@Component
public class ProcessingSaleListener {

    private static String topic = "tp-sale";

    @KafkaListener(topics = "tp-sale")
    public SaleResponseFactory process(@Payload SaleRequestFactory tf, @Headers MessageHeaders headers) throws Exception {

        System.out.println(tf.getId());

        SaleResponseFactory resObj = new SaleResponseFactory();
        resObj.setUnique_id("123123");

        return resObj;
    }
}

Custom objects

import java.io.Serializable;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder(toBuilder = true)
public class SaleRequestFactory implements Serializable {

    private static final long serialVersionUID = 1744050117179344127L;
    
    private int id;

}

Serializer

import org.apache.kafka.common.serialization.Serializer;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;

public class SaleRequestFactorySerializer implements Serializable, Serializer<SaleRequestFactory> {

    @Override
    public byte[] serialize(String topic, SaleRequestFactory data)
    {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        try
        {
            ObjectOutputStream outputStream = new ObjectOutputStream(out);
            outputStream.writeObject(data);
            out.close();
        }
        catch (IOException e)
        {
            throw new RuntimeException("Unhandled", e);
        }
        return out.toByteArray();
    }
}

Response Object

import java.io.Serializable;
import java.time.LocalDateTime;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder(toBuilder = true)
public class SaleResponseFactory implements Serializable {

    private static final long serialVersionUID = 1744050117179344127L;

    private String unique_id;
}

Response Class

import org.apache.kafka.common.serialization.Deserializer;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;

public class SaleResponseFactoryDeserializer implements Serializable, Deserializer<SaleRequestFactory> {

    @Override
    public SaleRequestFactory deserialize(String topic, byte[] data)
    {
        SaleRequestFactory saleRequestFactory = null;
        try
        {
            ByteArrayInputStream bis = new ByteArrayInputStream(data);
            ObjectInputStream in = new ObjectInputStream(bis);
            saleRequestFactory = (SaleRequestFactory) in.readObject();
            in.close();
        }
        catch (IOException | ClassNotFoundException e)
        {
            throw new RuntimeException("Unhandled", e);
        }
        return saleRequestFactory;
    }
}

I want to send and receive different Serialized Java Object based on the Object type. For example sometimes SaleRequestFactory and to receive SaleResponseFactory or to send AuthRequestFactory and receive AuthResponseFactory. Is it possible to send and receive diffrent Java Obects using one topic?

Full example code

2
I don't know how to implement the code. Can you show me please? It's not clear for me how to configure the Consumer and Producer for that case.Peter Penzov

2 Answers

1
votes

Use Object as the value type - here is an example using Boot's auto-configured infrastructure beans...

@SpringBootApplication
public class So65866763Application {

    public static void main(String[] args) {
        SpringApplication.run(So65866763Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, Object> template) {
        return args -> {
            template.send("so65866763", new Foo());
            template.send("so65866763", new Bar());
        };
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so65866763").partitions(1).replicas(1).build();
    }

}

class Foo implements Serializable {

}

class Bar implements Serializable {

}

@Component
@KafkaListener(id = "so65866763", topics = "so65866763")
class Listener {

    @KafkaHandler
    void fooListener(Foo foo) {
        System.out.println("In fooListener: " + foo);
    }

    @KafkaHandler
    void barListener(Bar bar) {
        System.out.println("In barListener: " + bar);
    }

}
public class JavaSerializer implements Serializer<Object> {

    @Override
    public byte[] serialize(String topic, Object data) {
        return null;
    }

    @Override
    public byte[] serialize(String topic, Headers headers, Object data) {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
            oos.writeObject(data);
            return baos.toByteArray();
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

}
public class JavaDeserializer implements Deserializer<Object> {

    @Override
    public Object deserialize(String topic, byte[] data) {
        return null;
    }

    @Override
    public Object deserialize(String topic, Headers headers, byte[] data) {
        ByteArrayInputStream bais = new ByteArrayInputStream(data);
        try (ObjectInputStream ois = new ObjectInputStream(bais)) {
            return ois.readObject();
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
        catch (ClassNotFoundException e) {
            throw new IllegalStateException(e);
        }
    }

}
spring.kafka.consumer.auto-offset-reset=earliest

spring.kafka.producer.value-serializer=com.example.demo.JavaSerializer
spring.kafka.consumer.value-deserializer=com.example.demo.JavaDeserializer
In fooListener: com.example.demo.Foo@331ca660
In barListener: com.example.demo.Bar@26f54288
1
votes

It is possible, but you need two separate producer factories for each object type. Or one with ByteArraySerializer and serialize objects yourself (equivalently, Gary's answer)

Same for the consumer if you actually want to get the object deserialized correctly. Otherwise, you would use the ByteArrayDeserializer (again, equivalent to Gary's shown deserializer), then assuming Java were unable to determine what object types are in the bytes (which serialized Object Streams are}, you'd include extra metadata in the record, such as headers, or a specific key that you parse to determine how to deserialize the data, then call the respective deserialize method yourself

Overall, I suggest re-evaluating why you need to put different types of records in one topic, or look at alternative message formats, including something like the CloudEvents spec, or use Avro / Protobuf / polymorphic JSON types, which would work better to use with clients other than Kafka