0
votes

Any ideas would be appreciated, I am trying write test as;

@ExtendWith(SpringExtension.class)
@EmbeddedKafka(count = 1, controlledShutdown = true, topics = { "input", "output" }, brokerProperties = { "broker.id=2",
    "listeners=PLAINTEXT://127.0.0.1:9092" })
class BindingTest {

@Autowired
private ApplicationContext applicationContext;

@Autowired
private EmbeddedKafkaBroker embeddedKafka;

@Autowired
private CustomBindings cBindings;

/**
 * @throws java.lang.Exception
 */
@BeforeEach
void setUp() throws Exception {
}

/**
 * @throws java.lang.Exception
 */
@AfterEach
void tearDown() throws Exception {
    embeddedKafka.getKafkaServers().forEach(b -> b.shutdown());
}

@Test
void test0() {
    String KEY = "KEY";
    String testMessage = "TESTMESSAGE";
    Message<String> message = MessageBuilder.withPayload(testMessage)
            .setHeader(KafkaHeaders.MESSAGE_KEY, KEY).build();
    cBindings.output().send(message);

}

@SpringBootApplication
@EnableBinding(CustomBindings.class)
public static class BindingApplication {

}

}

and

spring.cloud.stream.bindings.output.destination=output spring.cloud.stream.bindings.output.contentType=application/json

spring.cloud.stream.bindings.output.producer.header-mode=raw spring.cloud.stream.bindings.output.producer.use-native-encoding=true

spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde==org.apache.kafka.common.serialization.StringSerializer spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde==org.apache.kafka.common.serialization.StringSerializer

Still gets

error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@71f0806b]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer, failedMessage=GenericMessage [payload=byte[8], headers={id=c91897bc-2e4e-0a74-bc05-17fb31b690f6, kafka_messageKey=KEY, contentType=application/json, timestamp=1542295539746}]

Which doesnt make sense to me

1
I believe you need to set useNativeDecoding to true since you're choosing to use native Kafka serde. See more hereOleg Zhurakousky

1 Answers

3
votes

From the looks of it, this is not a Kafka Streams application, but a regular Spring Cloud Stream application with the Kafka binder. Therefore, you don't need these two properties. spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde==org.apache.kafka.common.serialization.StringSerializer spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde==org.apache.kafka.common.serialization.StringSerializer

In addition, in order to fix your error, you need to remove this line from your config: spring.cloud.stream.bindings.output.producer.use-native-encoding=true.

By setting native encoding to true, you are asking Kafka to do the serialization which is going to rely on the default ByteArraySerializer. If you really intended native serialization, you need to set the appropriate value serializer (StringSerializer). But since this is a test, I suggest you remove this property and see if your test passes.