I want to to make an interactive query to my kafka stream topic.
At the moment i can send avro serialized json objects to my topic and read them again with avro deserializer. I use for this scenario the normal MessageChannel Binder, this works as intended.
Now i want to use the kafka stream binder and i cant get it to work. Maybe someone can help me out there.
My Configuration:
spring:
cloud:
bus:
enabled: true
stream:
schemaRegistryClient.endpoint: http://192.168.99.100:8081
bindings:
segments-in:
destination: segments
contentType: application/vnd.segments-value.v1+avro
segments-all:
destination: segments
group: segments-all
consumer:
headerMode: raw
useNativeDecoding: true
kafka:
binder:
zkNodes: 192.168.99.100:2181
brokers: 192.168.99.100:32768
streams:
bindings:
segments-all:
consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
Kafka Config Class:
@Configuration
public class KafkaConfiguration {
@Bean
public MessageConverter classificationMessageConverter() {
AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter();
converter.setSchema(Segment.SCHEMA$);
return converter;
}
}
Schema Config
@Configuration
public class SchemaRegistryConfiguration {
@Bean
public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") final String endpoint) {
ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
client.setEndpoint(endpoint);
return client;
}
}
And now my Interface
public interface Channels {
String EVENTS = "segments-in";
String ALLSEGMENTS = "segments-all";
@Input(Channels.EVENTS)
SubscribableChannel events();
@Input(Channels.ALLSEGMENTS)
KTable<?, ?> segmentsIn();
}
I always get following error(Warn Message), but only when i have the second channel open called segmentsIn().
org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-3] Connection to node -1 could not be established. Broker may not be available.
With the SubscribableChannel (segments-in) everything works fine, what am i doing wrong here? How can i get the channel segments-all to work with with the kafka stream api?