1
votes

I have a @StreamListener that accepts a @Payload String. For testing this Listener class I have written a Junit class using embedded Kafka. I'm getting below error while running my test class

Error

ERROR o.s.i.handler.LoggingHandler - org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [java.lang.String] for GenericMessage


If I change the dataType of @Payload from String to byte[] message is getting picked by my listener class.

Can someone help me know what the issue here? I guess this is something with the cloud stream configuration.


@ExtendWith(SpringExtension.class)
@DirtiesContext
@SpringBootTest(classes = IntegrationTestConsumer.class)
@EmbeddedKafka(partitions = 1, controlledShutdown = true,
        topics = {
                "input",
                "output"})
public class TestUtils {

    public static final String KEY_SERIALIZER = "key.serializer";
    public static final String VALUE_SERIALIZER = "value.serializer";

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @BeforeEach
    public void setup() {
        System.setProperty("spring.cloud.stream.kafka.binder.brokers", embeddedKafka.getBrokersAsString());
    }

    @Test
    public void someTest() throws Exception {
        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
        senderProps.put(KEY_SERIALIZER, StringSerializer.class);
        senderProps.put(VALUE_SERIALIZER, StringSerializer.class);
        DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(senderProps);
        KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory, true);
        template.setDefaultTopic("input");
        template.sendDefault("foo");

        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(
                "input_group",
                "false",
                this.embeddedKafka);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        consumerProps.put("key.deserializer", StringDeserializer.class);
        consumerProps.put("value.deserializer", StringDeserializer.class);
        DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);

        Consumer<String, String> consumer = cf.createConsumer();
        consumer.subscribe(Collections.singleton("output"));
        ConsumerRecords<String, String> records = consumer.poll(10_000);
        consumer.commitSync();
        Assertions.assertThat(records.count()).isGreaterThanOrEqualTo(1);

Here is my application.yaml looks like.
spring:
  cloud:
    stream:
      kafka:
        streams:
          binder:
            configuration:
              default:
                key:
                  serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                value:
                  serde: org.apache.kafka.common.serialization.Serdes$StringSerde
        bindings:
          input:
            consumer:
              enable-dlq: true
              dlq-name: output
              dlq-producer-properties:
                retries: 1
        binder:
          brokers: ${spring.embedded.kafka.brokers}
          replicationFactor: ${replication_factor:1}
          autoCreateTopics: true
          autoAddPartitions: true
          configuration:
            retries: 1
            batch.size: 16384
            linger.ms: 1
            enable.idempotence: true
            buffer.memory: 33554432
            request.timeout.ms: 3000
            transaction.timeout.ms: 3000
            max.block.ms: ${kafka_max_block_time:5000}
            max.poll.records: 80
            poll.timeout: 10000
            commit.retries: 1
            commit.retry.interval: 1000
            session.timeout.ms.config: 50000
            shutdown.signal: INT,TERM
            acks: "all"
      bindings:
        output:
          destination: output
          contentType: application/json
          producer:
            partitionCount: ${partition_count:1}
        input:
          destination: input
          contentType: application/json
          partitioned: true
          group: input_group

2

2 Answers

1
votes

Please check if you have mocked the ObjectMapper, since ObjectMapper is not able to convert the byte[] to String.

1
votes

Similar error happened in my project as well, where the developer was mocking objectMapper, please check and remove mocking if any such code exist.