We have a Spring Cloud processors using spring cloud Functions methodology. the current boot version used is 2.1.4 and cloud version of Greenwich.SR1
Processor skeleton is below
@EnableBinding(Processor.class)
public class FilterProcessor {
@Bean
public Function<DeviceEvent, DeviceEvent> filter() {
return deviceEvent -> {
// process and return data
};
}
}
Following is application yml config
spring:
cloud:
stream:
default:
content-type: application/*+avro
producer:
useNativeEncoding: true
consumer:
useNativeEncoding: true
function:
definition: filter
kafka:
binder:
producer-properties:
key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: http://schemaregistry:8081
consumer-properties:
key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
specific.avro.reader: true
schema.registry.url: http://schemaregistry:8081
But on moving from 2.1.4 to 2.2.4/Hoxton.SR1, made an yml change as below suggested on another ticket
function:
definition: filter
bindings:
filter-in-0: input
filter-out-0: output
Everyother details on the app and config remains the same. Also removed @EnableBinding annotation.
But on dropping a message to my processor in stream, i get the following exception
2020-04-05 23:12:45.370 ERROR 462 --- [container-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: ConsumerRecord(topic = FTV, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1586108023129, serialized key size = -1, serialized value size = 108, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = CustomJavaObject)
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@76ec21a5]; nested exception is org.springframework.web.client.ResourceAccessException: I/O error on POST request for "http://localhost:8990": Connection refused (Connection refused); nested exception is java.net.ConnectException: Connection refused (Connection refused), failedMessage=GenericMessage [payload={"providerCode": "GTB", "customerId": "123", "type": "ASSET", "emitTime": 1585101533015, "captureTime": 1585101533015, "readTime": 1585101533015, "deviceId": "DeviceId", "data": {"battery_voltage": "13", "fmi": "4", "battery_voltage2": "21", "battery_voltage3": "13"}}, headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@337330a0, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=FTV, kafka_receivedTimestamp=1586108023129, kafka_groupId=fls}]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1745) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1734) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1647) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1577) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1485) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1235) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:985) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:905) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_192]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_192]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_192]
Caused by: org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@76ec21a5]; nested exception is org.springframework.web.client.ResourceAccessException: I/O error on POST request for "http://localhost:8990": Connection refused (Connection refused); nested exception is java.net.ConnectException: Connection refused (Connection refused)
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:111) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:95) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:127) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
Caused by: org.springframework.web.client.ResourceAccessException: I/O error on POST request for "http://localhost:8990": Connection refused (Connection refused); nested exception is java.net.ConnectException: Connection refused (Connection refused)
at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:751) ~[spring-web-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:677) ~[spring-web-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.web.client.RestTemplate.postForEntity(RestTemplate.java:452) ~[spring-web-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.cloud.stream.schema.client.DefaultSchemaRegistryClient.register(DefaultSchemaRegistryClient.java:69) ~[spring-cloud-stream-schema-2.2.1.RELEASE.jar!/:2.2.1.RELEASE]
at org.springframework.cloud.stream.schema.avro.AvroSchemaRegistryClientMessageConverter.resolveSchemaForWriting(AvroSchemaRegistryClientMessageConverter.java:308) ~[spring-cloud-stream-schema-2.2.1.RELEASE.jar!/:2.2.1.RELEASE]
at org.springframework.cloud.stream.schema.avro.AbstractAvroMessageConverter.convertToInternal(AbstractAvroMessageConverter.java:125) ~[spring-cloud-stream-schema-2.2.1.RELEASE.jar!/:2.2.1.RELEASE]
at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:217) ~[spring-messaging-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:207) ~[spring-messaging-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.messaging.converter.CompositeMessageConverter.toMessage(CompositeMessageConverter.java:83) ~[spring-messaging-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.lambda$convertOutputValueIfNecessary$2(BeanFactoryAwareFunctionRegistry.java:620) ~[spring-cloud-function-context-3.0.1.RELEASE.jar!/:3.0.1.RELEASE]
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[na:1.8.0_192]
at java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1359) ~[na:1.8.0_192]
at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126) ~[na:1.8.0_192]
at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498) ~[na:1.8.0_192]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485) ~[na:1.8.0_192]
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) ~[na:1.8.0_192]
at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:152) ~[na:1.8.0_192]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[na:1.8.0_192]
at java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:464) ~[na:1.8.0_192]
at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.convertOutputValueIfNecessary(BeanFactoryAwareFunctionRegistry.java:626) ~[spring-cloud-function-context-3.0.1.RELEASE.jar!/:3.0.1.RELEASE]
at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.doApply(BeanFactoryAwareFunctionRegistry.java:569) ~[spring-cloud-function-context-3.0.1.RELEASE.jar!/:3.0.1.RELEASE]
at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.apply(BeanFactoryAwareFunctionRegistry.java:465) ~[spring-cloud-function-context-3.0.1.RELEASE.jar!/:3.0.1.RELEASE]
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:602) ~[spring-cloud-stream-3.0.1.RELEASE.jar!/:3.0.1.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_192]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_192]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_192]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_192]
at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:129) ~[spring-expression-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:112) ~[spring-expression-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:55) ~[spring-expression-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:386) ~[spring-expression-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:92) ~[spring-expression-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:117) ~[spring-expression-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:375) ~[spring-expression-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:171) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:156) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeExpression(MessagingMethodInvokerHelper.java:636) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.fallbackToInvokeExpression(MessagingMethodInvokerHelper.java:629) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInvokeExceptionAndFallbackToExpressionIfAny(MessagingMethodInvokerHelper.java:613) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeHandlerMethod(MessagingMethodInvokerHelper.java:584) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:477) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:355) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:108) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
... 34 common frames omitted
Updated Stacktrace of error
Any pointers to make the processor in the latest boot would help, as our custom libraries are getting upgraded and using the latest features, the processors being on old boot is preventing some functionalities in working correctly or not loading up say, when configproperties uses inner class or uses constructor binding.
So any pointers to resolve would be highly appreciated. Just to reiterate, there arent any HTTP calls made in the code, processor is just an in out processor with just log statement.
Stream definition as below
:TopicName > Processor | Sink
Both processor and sink contain only log statements. Processor just returns event it receives.
I/O error on POST request for "http://localhost:8990": Connection refusedThis is unrelated to software versions - it just means there is no HTTP server on port 8990. - Gary Russell