0
votes

We have a Spring Cloud processors using spring cloud Functions methodology. the current boot version used is 2.1.4 and cloud version of Greenwich.SR1

Processor skeleton is below

@EnableBinding(Processor.class)
public class FilterProcessor {

    @Bean
    public Function<DeviceEvent, DeviceEvent> filter() {
        return deviceEvent -> {
            // process and return data
        };
    }
}

Following is application yml config

spring:
  cloud:
    stream:
      default:
        content-type: application/*+avro
        producer:
          useNativeEncoding: true
        consumer:
          useNativeEncoding: true
      function:
        definition: filter
      kafka:
        binder:
          producer-properties:
            key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            schema.registry.url: http://schemaregistry:8081
          consumer-properties:
            key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
            value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
            specific.avro.reader: true
            schema.registry.url: http://schemaregistry:8081

But on moving from 2.1.4 to 2.2.4/Hoxton.SR1, made an yml change as below suggested on another ticket

      function:
        definition: filter
        bindings:
          filter-in-0: input
          filter-out-0: output

Everyother details on the app and config remains the same. Also removed @EnableBinding annotation.

But on dropping a message to my processor in stream, i get the following exception


2020-04-05 23:12:45.370 ERROR 462 --- [container-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: ConsumerRecord(topic = FTV, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1586108023129, serialized key size = -1, serialized value size = 108, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = CustomJavaObject)

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@76ec21a5]; nested exception is org.springframework.web.client.ResourceAccessException: I/O error on POST request for "http://localhost:8990": Connection refused (Connection refused); nested exception is java.net.ConnectException: Connection refused (Connection refused), failedMessage=GenericMessage [payload={"providerCode": "GTB", "customerId": "123", "type": "ASSET", "emitTime": 1585101533015, "captureTime": 1585101533015, "readTime": 1585101533015, "deviceId": "DeviceId", "data": {"battery_voltage": "13", "fmi": "4", "battery_voltage2": "21", "battery_voltage3": "13"}}, headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@337330a0, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=FTV, kafka_receivedTimestamp=1586108023129, kafka_groupId=fls}]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1745) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1734) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1647) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1577) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1485) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1235) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:985) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:905) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_192]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_192]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_192]
Caused by: org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@76ec21a5]; nested exception is org.springframework.web.client.ResourceAccessException: I/O error on POST request for "http://localhost:8990": Connection refused (Connection refused); nested exception is java.net.ConnectException: Connection refused (Connection refused)
        at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:111) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:95) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:127) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]

Caused by: org.springframework.web.client.ResourceAccessException: I/O error on POST request for "http://localhost:8990": Connection refused (Connection refused); nested exception is java.net.ConnectException: Connection refused (Connection refused)
        at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:751) ~[spring-web-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:677) ~[spring-web-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.web.client.RestTemplate.postForEntity(RestTemplate.java:452) ~[spring-web-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.cloud.stream.schema.client.DefaultSchemaRegistryClient.register(DefaultSchemaRegistryClient.java:69) ~[spring-cloud-stream-schema-2.2.1.RELEASE.jar!/:2.2.1.RELEASE]
        at org.springframework.cloud.stream.schema.avro.AvroSchemaRegistryClientMessageConverter.resolveSchemaForWriting(AvroSchemaRegistryClientMessageConverter.java:308) ~[spring-cloud-stream-schema-2.2.1.RELEASE.jar!/:2.2.1.RELEASE]
        at org.springframework.cloud.stream.schema.avro.AbstractAvroMessageConverter.convertToInternal(AbstractAvroMessageConverter.java:125) ~[spring-cloud-stream-schema-2.2.1.RELEASE.jar!/:2.2.1.RELEASE]
        at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:217) ~[spring-messaging-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:207) ~[spring-messaging-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.messaging.converter.CompositeMessageConverter.toMessage(CompositeMessageConverter.java:83) ~[spring-messaging-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.lambda$convertOutputValueIfNecessary$2(BeanFactoryAwareFunctionRegistry.java:620) ~[spring-cloud-function-context-3.0.1.RELEASE.jar!/:3.0.1.RELEASE]
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[na:1.8.0_192]
        at java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1359) ~[na:1.8.0_192]
        at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126) ~[na:1.8.0_192]
        at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498) ~[na:1.8.0_192]
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485) ~[na:1.8.0_192]
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) ~[na:1.8.0_192]
        at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:152) ~[na:1.8.0_192]
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[na:1.8.0_192]
        at java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:464) ~[na:1.8.0_192]
        at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.convertOutputValueIfNecessary(BeanFactoryAwareFunctionRegistry.java:626) ~[spring-cloud-function-context-3.0.1.RELEASE.jar!/:3.0.1.RELEASE]
        at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.doApply(BeanFactoryAwareFunctionRegistry.java:569) ~[spring-cloud-function-context-3.0.1.RELEASE.jar!/:3.0.1.RELEASE]
        at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.apply(BeanFactoryAwareFunctionRegistry.java:465) ~[spring-cloud-function-context-3.0.1.RELEASE.jar!/:3.0.1.RELEASE]
        at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:602) ~[spring-cloud-stream-3.0.1.RELEASE.jar!/:3.0.1.RELEASE]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_192]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_192]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_192]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_192]
        at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:129) ~[spring-expression-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:112) ~[spring-expression-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:55) ~[spring-expression-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:386) ~[spring-expression-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:92) ~[spring-expression-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:117) ~[spring-expression-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:375) ~[spring-expression-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:171) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:156) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeExpression(MessagingMethodInvokerHelper.java:636) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.fallbackToInvokeExpression(MessagingMethodInvokerHelper.java:629) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInvokeExceptionAndFallbackToExpressionIfAny(MessagingMethodInvokerHelper.java:613) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeHandlerMethod(MessagingMethodInvokerHelper.java:584) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:477) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:355) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:108) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        ... 34 common frames omitted

Updated Stacktrace of error

Any pointers to make the processor in the latest boot would help, as our custom libraries are getting upgraded and using the latest features, the processors being on old boot is preventing some functionalities in working correctly or not loading up say, when configproperties uses inner class or uses constructor binding.

So any pointers to resolve would be highly appreciated. Just to reiterate, there arent any HTTP calls made in the code, processor is just an in out processor with just log statement.

Stream definition as below

:TopicName > Processor | Sink

Both processor and sink contain only log statements. Processor just returns event it receives.

1
I/O error on POST request for "http://localhost:8990": Connection refused This is unrelated to software versions - it just means there is no HTTP server on port 8990. - Gary Russell
yes, but my stream is a pure kafka stream, why is the processor even trying to make a http post. My stream is :TopicName > Processor | sink. The same stream definition works, when apps deployed are using boot version 2.1.4 and Greenwich sR1. But when i change the versions remove EnableBinding, add the filter mapping to in and out, the same stream is giving out http invoke error, when none of module uses http for communication - Id View
The stack trace appears to be incomplete - there is nothing in the framework that will attempt to reach an HTTP server. - Gary Russell
Thanks for responses. I was able to figure out the issue, the stack trace has a default schema registry client reference. In 2.1.4 i just used annotation @EnableSchemaRegistryClient and it picked up schema registry url from the application yml as mine was running on diff port. So i created a new config and return a confluent schema registry client with approriate url value, now the apps are up and running fine. Thanks for this group to help tide over on these learnings. - Id View

1 Answers

0
votes

Just to summarize for anyone who might face similar issue. When moving from lower version to higher, mine is 2.1.4 (Greenwich.SR1) to 2.2.4 (Hoxton.SR1)

Add function bindings as below, the bindings being <functionName>-<in for input, out for output>-<param sequence number>

      function:
        definition: filter
        bindings:
          filter-in-0: input
          filter-out-0: output

Add schema registry client config. @EnableSchemaRegistry alone doesnt seem to work

   @Bean
    public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.kafka.binder.producer-properties.schema.registry.url}") String endPoint) {
        ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
        client.setEndpoint(endPoint);
        return client;
    }