1
votes

I'm trying to consume confluent avro message from kafka topic as Kstream with spring boot 2.0.

I was able to consume the message as MessageChannel but not as KStream.

@Input(ORGANIZATION)
KStream<String, Organization> organizationMessageChannel();


@StreamListener
    public void processOrganization(@Input(KstreamBinding.ORGANIZATION)KStream<String, Organization> organization) {
        log.info("Organization Received:" + organization);
 }

Exception:

Exception in thread "pcs-7bb7b444-044d-41bb-945d-450c902337ff-StreamThread-3" org.apache.kafka.streams.errors.StreamsException: stream-thread [pcs-7bb7b444-044d-41bb-945d-450c902337ff-StreamThread-3] Failed to rebalance. at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:860) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) Caused by: org.apache.kafka.streams.errors.StreamsException: Failed to configure value serde class io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:859) at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.(AbstractProcessorContext.java:59) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.(ProcessorContextImpl.java:42) at org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:134) at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:404) at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:365) at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:350) at org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks(TaskManager.java:137) at org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:88) at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:259) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111) at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851) ... 3 more Caused by: io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value. at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:243) at io.confluent.common.config.AbstractConfig.(AbstractConfig.java:78) at io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.(AbstractKafkaAvroSerDeConfig.java:61) at io.confluent.kafka.serializers.KafkaAvroSerializerConfig.(KafkaAvroSerializerConfig.java:32) at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:48) at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.configure(SpecificAvroSerializer.java:58) at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde.configure(SpecificAvroSerde.java:107) at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:855) ... 19 more

Based on the error I think I'm missing to configure the schema.registry.url for confluent. I had a quick look at the sample here Kind of bit lost on how to do the same with spring cloud stream using the streamListener

Does this need to be a separate configuration? or Is there a way to configure schema.registry.url in application.yml itself that confluent is looking for?

here is the code repo https://github.com/naveenpop/springboot-kstream-confluent

Organization.avsc

{
   "namespace":"com.test.demo.avro",
   "type":"record",
   "name":"Organization",
   "fields":[
      {
         "name":"orgId",
         "type":"string",
         "default":"null"
      },
      {
         "name":"orgName",
         "type":"string",
         "default":"null"
      },
      {
         "name":"orgType",
         "type":"string",
         "default":"null"
      },
      {
         "name":"parentOrgId",
         "type":"string",
         "default":"null"
      }
   ]
}

DemokstreamApplication.java

@SpringBootApplication
@EnableSchemaRegistryClient
@Slf4j
public class DemokstreamApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemokstreamApplication.class, args);
    }

    @Component
    public  static class organizationProducer implements ApplicationRunner {

        @Autowired
        private KafkaProducer kafkaProducer;

        @Override
        public void run(ApplicationArguments args) throws Exception {
            log.info("Starting: Run method");
            List<String> names = Arrays.asList("blue", "red", "green", "black", "white");
            List<String> pages = Arrays.asList("whiskey", "wine", "rum", "jin", "beer");
            Runnable runnable = () -> {
                String rPage = pages.get(new Random().nextInt(pages.size()));
                String rName = names.get(new Random().nextInt(names.size()));

                try {
                    this.kafkaProducer.produceOrganization(rPage, rName, "PARENT", "111");
                } catch (Exception e) {
                    log.info("Exception :" +e);
                }
            };
            Executors.newScheduledThreadPool(1).scheduleAtFixedRate(runnable ,1 ,1, TimeUnit.SECONDS);
        }
    }
}

KafkaConfig.java

@Configuration
public class KafkaConfig {

    @Value("${spring.cloud.stream.schemaRegistryClient.endpoint}")
    private String endpoint;

    @Bean
    public SchemaRegistryClient confluentSchemaRegistryClient() {
        ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
        client.setEndpoint(endpoint);
        return client;
    }

}

KafkaConsumer.java

@Slf4j
@EnableBinding(KstreamBinding.class)
public class KafkaConsumer {

   @StreamListener
    public void processOrganization(@Input(KstreamBinding.ORGANIZATION_INPUT) KStream<String, Organization> organization) {
        organization.foreach((s, organization1) -> log.info("KStream Organization Received:" + organization1));
    }
}

KafkaProducer.java

@EnableBinding(KstreamBinding.class)
public class KafkaProducer {

    @Autowired
    private KstreamBinding kstreamBinding;

    public void produceOrganization(String orgId, String orgName, String orgType, String parentOrgId) {

        try {
            Organization organization = Organization.newBuilder()
                    .setOrgId(orgId)
                    .setOrgName(orgName)
                    .setOrgType(orgType)
                    .setParentOrgId(parentOrgId)
                    .build();

            kstreamBinding.organizationOutputMessageChannel()
                            .send(MessageBuilder.withPayload(organization)
                            .setHeader(KafkaHeaders.MESSAGE_KEY, orgName)
                            .build());

        } catch (Exception e){
            log.error("Failed to produce Organization Message:" +e);
        }
    }
}

KstreamBinding.java

public interface KstreamBinding {

    String ORGANIZATION_INPUT= "organizationInput";
    String ORGANIZATION_OUTPUT= "organizationOutput";

    @Input(ORGANIZATION_INPUT)
    KStream<String, Organization> organizationInputMessageChannel();

    @Output(ORGANIZATION_OUTPUT)
    MessageChannel organizationOutputMessageChannel();
}

Update 1:

I applied the suggestion from dturanski here and the error vanished. However still not able to consume the message as KStream<String, Organization> no error in the console.

Update 2:

Applied the suggestion from sobychacko here and the message is consumable with empty values in the object.

I've made a commit to the GitHub sample to produce the message from spring boot itself and still getting it as empty values.

Thanks for your time on this issue.

2

2 Answers

2
votes

The following implementation will not do what you are intending:

@StreamListener
    public void processOrganization(@Input(KstreamBinding.ORGANIZATION)KStream<String, Organization> organization) {
        log.info("Organization Received:" + organization);
 }

That log statement is only invoked once at the bootstrap phase. In order for this to work, you need to invoke some operations on the received KStream and then provide the logic there. For e.g. following works where I am providing a lambda expression on the foreach method call.

 @StreamListener
    public void processOrganization(@Input(KstreamBinding.ORGANIZATION) KStream<String, Organization> organization) {

        organization.foreach((s, organization1) -> log.info("Organization Received:" + organization1));
    }

You also have an issue in the configuration where you are wrongly assigning avro Serde for keys where it is actually a String. Change it like this:

default:
                key:
                  serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                value:
                  serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

With these changes, I get the logging statement each time I send something to the topic. However, there is a problem in your sending groovy script, I am not getting any actual data from your Organization domain, but I will let you figure that out.

Update on the issue with the empty Organization domain object

This happens because you have a mixed mode of serialization strategies going on. You are using Spring Cloud Stream's avro message converters on the producer side but on the Kafka Streams processor, using the Confluent avro Serdes. I just tried with the Confluent's serializers all the way from producers to processor and I was able to see the Organization domain on the outbound. Here is the modified configuration to make the serialization consistent.

spring:
  application:
    name: kstream
  cloud:
    stream:
      schemaRegistryClient:
        endpoint: http://localhost:8081
      schema:
        avro:
          schema-locations: classpath:avro/Organization.avsc
      bindings:
        organizationInput:
          destination: organization-updates
          group: demokstream.org
          consumer:
            useNativeDecoding: true
        organizationOutput:
          destination: organization-updates
          producer:
            useNativeEncoding: true
      kafka:
        bindings:
          organizationOutput:
            producer:
              configuration:
                key.serializer: org.apache.kafka.common.serialization.StringSerializer
                value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
                schema.registry.url: http://localhost:8081
        streams:
          binder:
            brokers: localhost
            configuration:
              schema.registry.url: http://localhost:8081
              commit:
                interval:
                  ms: 1000
              default:
                key:
                  serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                value:
                  serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

You can also remove the KafkaConfig class as wells as the EnableSchemaRegistryClient annotation from the main application class.

1
votes

Try spring.cloud.stream.kafka.streams.binder.configuration.schema.registry.url: ...