0
votes

I'm writing my own Spring Cloud Stream processor that takes an ID as input param, downloads a corresponding file and sends it to the output. These files can be as large as 800MB. The output object is a simple POJO with a string property for the file name and a byte[] property for the content.

When I run my pipeline I get the following exception:

2019-02-05 10:09:27.829 ERROR 1289 --- [container-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: ConsumerRecord(topic = A.splitter, partition = 0, offset = 2, CreateTime = 1549357804806, serialized key size = -1, serialized value size = 932, headers = RecordHeaders(headers = [RecordHeader(key = sequenceNumber, value = [51]), RecordHeader(key = sequenceSize, value = [52]), RecordHeader(key = deliveryAttempt, value = [49]), RecordHeader(key = scst_nativeHeadersPresent, value = [116, 114, 117, 101]), RecordHeader(key = correlationId, value = [34, 50, 57, 99, 54, 99, 51, 97, 57, 45, 48, 98, 57, 99, 45, 52, 50, 56, 50, 45, 51, 101, 57, 98, 45, 51, 55, 101, 56, 99, 100, 51, 100, 55, 54, 53, 51, 34]), RecordHeader(key = contentType, value = [34, 97, 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 47, 106, 115, 111, 110, 34]), RecordHeader(key = spring_json_header_types, value = [123, 34, 115, 101, 113, 117, 101, 110, 99, 101, 78, 117, 109, 98, 101, 114, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 73, 110, 116, 101, 103, 101, 114, 34, 44, 34, 115, 99, 115, 116, 95, 110, 97, 116, 105, 118, 101, 72, 101, 97, 100, 101, 114, 115, 80, 114, 101, 115, 101, 110, 116, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 66, 111, 111, 108, 101, 97, 110, 34, 44, 34, 115, 101, 113, 117, 101, 110, 99, 101, 83, 105, 122, 101, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 73, 110, 116, 101, 103, 101, 114, 34, 44, 34, 100, 101, 108, 105, 118, 101, 114, 121, 65, 116, 116, 101, 109, 112, 116, 34, 58, 34, 106, 97, 118, 97, 46, 117, 116, 105, 108, 46, 99, 111, 110, 99, 117, 114, 114, 101, 110, 116, 46, 97, 116, 111, 109, 105, 99, 46, 65, 116, 111, 109, 105, 99, 73, 110, 116, 101, 103, 101, 114, 34, 44, 34, 99, 111, 114, 114, 101, 108, 97, 116, 105, 111, 110, 73, 100, 34, 58, 34, 106, 97, 118, 97, 46, 117, 116, 105, 108, 46, 85, 85, 73, 68, 34, 44, 34, 99, 111, 110, 116, 101, 110, 116, 84, 121, 112, 101, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 125])], isReadOnly = false), key = null, value = [B@44091bf1)

org.springframework.messaging.MessagingException: Failed to handle Message; nested exception is org.springframework.messaging.MessagingException: Java heap space; nested exception is java.lang.OutOfMemoryError: Java heap space
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:230) ~[spring-integration-core-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:180) ~[spring-integration-core-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) ~[spring-integration-core-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401) ~[spring-integration-core-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.1.4.RELEASE.jar!/:5.1.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.1.4.RELEASE.jar!/:5.1.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.1.4.RELEASE.jar!/:5.1.4.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.1.4.RELEASE.jar!/:5.1.4.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:99) ~[spring-messaging-5.1.4.RELEASE.jar!/:5.1.4.RELEASE]
    at org.springframework.integration.core.ErrorMessagePublisher.publish(ErrorMessagePublisher.java:168) ~[spring-integration-core-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
    at org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer.recover(ErrorMessageSendingRecoverer.java:83) ~[spring-integration-core-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
    at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:512) ~[spring-retry-1.2.3.RELEASE.jar!/:na]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:351) ~[spring-retry-1.2.3.RELEASE.jar!/:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211) ~[spring-retry-1.2.3.RELEASE.jar!/:na]
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114) ~[spring-kafka-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40) ~[spring-kafka-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1220) [spring-kafka-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1213) [spring-kafka-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1174) [spring-kafka-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1155) [spring-kafka-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1096) [spring-kafka-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:924) [spring-kafka-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:740) [spring-kafka-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:689) [spring-kafka-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_192]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_192]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_192]
Caused by: org.springframework.messaging.MessagingException: Java heap space; nested exception is java.lang.OutOfMemoryError: Java heap space
    at org.springframework.integration.core.ErrorMessagePublisher.determinePayload(ErrorMessagePublisher.java:186) ~[spring-integration-core-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
    at org.springframework.integration.core.ErrorMessagePublisher.publish(ErrorMessagePublisher.java:162) ~[spring-integration-core-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
    ... 17 common frames omitted
Caused by: java.lang.OutOfMemoryError: Java heap space

I tried to increase the deployer heap space with --properties "deployer.*.memory=2048m". And monitored the memory stats with jvmtop. My Boot apps apparently use the memory settings but the error is thrown anyways.

1

1 Answers

0
votes

It is unclear what you're doing in your custom application, but it appears the error relates to the file-size and the business logic.

SCDF attempted to deploy the app. The memory-override attempt appears to have successfully applied as well. SCDF is merely a lightweight orchestration service. It doesn't keep track of application state, neither it attributes towards applications' memory footprint. In other words, the reported problem is outside of SCDF and SCSt.

You may want to review the application business logic. A good first step is to run the application standalone via java -jar. Make it run in this model; if it runs locally, the application should also run when deployed via SCDF, too.