I have a @StreamListener that accepts a @Payload String. For testing this Listener class I have written a Junit class using embedded Kafka. I'm getting below error while running my test class
Error
ERROR o.s.i.handler.LoggingHandler - org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [java.lang.String] for GenericMessage
If I change the dataType of @Payload from String to byte[] message is getting picked by my listener class.
Can someone help me know what the issue here? I guess this is something with the cloud stream configuration.
@ExtendWith(SpringExtension.class)
@DirtiesContext
@SpringBootTest(classes = IntegrationTestConsumer.class)
@EmbeddedKafka(partitions = 1, controlledShutdown = true,
topics = {
"input",
"output"})
public class TestUtils {
public static final String KEY_SERIALIZER = "key.serializer";
public static final String VALUE_SERIALIZER = "value.serializer";
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@BeforeEach
public void setup() {
System.setProperty("spring.cloud.stream.kafka.binder.brokers", embeddedKafka.getBrokersAsString());
}
@Test
public void someTest() throws Exception {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
senderProps.put(KEY_SERIALIZER, StringSerializer.class);
senderProps.put(VALUE_SERIALIZER, StringSerializer.class);
DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory, true);
template.setDefaultTopic("input");
template.sendDefault("foo");
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(
"input_group",
"false",
this.embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put("key.deserializer", StringDeserializer.class);
consumerProps.put("value.deserializer", StringDeserializer.class);
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<String, String> consumer = cf.createConsumer();
consumer.subscribe(Collections.singleton("output"));
ConsumerRecords<String, String> records = consumer.poll(10_000);
consumer.commitSync();
Assertions.assertThat(records.count()).isGreaterThanOrEqualTo(1);
Here is my application.yaml looks like.
spring:
cloud:
stream:
kafka:
streams:
binder:
configuration:
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings:
input:
consumer:
enable-dlq: true
dlq-name: output
dlq-producer-properties:
retries: 1
binder:
brokers: ${spring.embedded.kafka.brokers}
replicationFactor: ${replication_factor:1}
autoCreateTopics: true
autoAddPartitions: true
configuration:
retries: 1
batch.size: 16384
linger.ms: 1
enable.idempotence: true
buffer.memory: 33554432
request.timeout.ms: 3000
transaction.timeout.ms: 3000
max.block.ms: ${kafka_max_block_time:5000}
max.poll.records: 80
poll.timeout: 10000
commit.retries: 1
commit.retry.interval: 1000
session.timeout.ms.config: 50000
shutdown.signal: INT,TERM
acks: "all"
bindings:
output:
destination: output
contentType: application/json
producer:
partitionCount: ${partition_count:1}
input:
destination: input
contentType: application/json
partitioned: true
group: input_group