I'm writing my own Spring Cloud Stream processor that takes an ID as input param, downloads a corresponding file and sends it to the output. These files can be as large as 800MB. The output object is a simple POJO with a string property for the file name and a byte[]
property for the content.
When I run my pipeline I get the following exception:
2019-02-05 10:09:27.829 ERROR 1289 --- [container-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: ConsumerRecord(topic = A.splitter, partition = 0, offset = 2, CreateTime = 1549357804806, serialized key size = -1, serialized value size = 932, headers = RecordHeaders(headers = [RecordHeader(key = sequenceNumber, value = [51]), RecordHeader(key = sequenceSize, value = [52]), RecordHeader(key = deliveryAttempt, value = [49]), RecordHeader(key = scst_nativeHeadersPresent, value = [116, 114, 117, 101]), RecordHeader(key = correlationId, value = [34, 50, 57, 99, 54, 99, 51, 97, 57, 45, 48, 98, 57, 99, 45, 52, 50, 56, 50, 45, 51, 101, 57, 98, 45, 51, 55, 101, 56, 99, 100, 51, 100, 55, 54, 53, 51, 34]), RecordHeader(key = contentType, value = [34, 97, 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 47, 106, 115, 111, 110, 34]), RecordHeader(key = spring_json_header_types, value = [123, 34, 115, 101, 113, 117, 101, 110, 99, 101, 78, 117, 109, 98, 101, 114, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 73, 110, 116, 101, 103, 101, 114, 34, 44, 34, 115, 99, 115, 116, 95, 110, 97, 116, 105, 118, 101, 72, 101, 97, 100, 101, 114, 115, 80, 114, 101, 115, 101, 110, 116, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 66, 111, 111, 108, 101, 97, 110, 34, 44, 34, 115, 101, 113, 117, 101, 110, 99, 101, 83, 105, 122, 101, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 73, 110, 116, 101, 103, 101, 114, 34, 44, 34, 100, 101, 108, 105, 118, 101, 114, 121, 65, 116, 116, 101, 109, 112, 116, 34, 58, 34, 106, 97, 118, 97, 46, 117, 116, 105, 108, 46, 99, 111, 110, 99, 117, 114, 114, 101, 110, 116, 46, 97, 116, 111, 109, 105, 99, 46, 65, 116, 111, 109, 105, 99, 73, 110, 116, 101, 103, 101, 114, 34, 44, 34, 99, 111, 114, 114, 101, 108, 97, 116, 105, 111, 110, 73, 100, 34, 58, 34, 106, 97, 118, 97, 46, 117, 116, 105, 108, 46, 85, 85, 73, 68, 34, 44, 34, 99, 111, 110, 116, 101, 110, 116, 84, 121, 112, 101, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 125])], isReadOnly = false), key = null, value = [B@44091bf1)
org.springframework.messaging.MessagingException: Failed to handle Message; nested exception is org.springframework.messaging.MessagingException: Java heap space; nested exception is java.lang.OutOfMemoryError: Java heap space
at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:230) ~[spring-integration-core-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:180) ~[spring-integration-core-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) ~[spring-integration-core-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401) ~[spring-integration-core-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.1.4.RELEASE.jar!/:5.1.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.1.4.RELEASE.jar!/:5.1.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.1.4.RELEASE.jar!/:5.1.4.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.1.4.RELEASE.jar!/:5.1.4.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:99) ~[spring-messaging-5.1.4.RELEASE.jar!/:5.1.4.RELEASE]
at org.springframework.integration.core.ErrorMessagePublisher.publish(ErrorMessagePublisher.java:168) ~[spring-integration-core-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
at org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer.recover(ErrorMessageSendingRecoverer.java:83) ~[spring-integration-core-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:512) ~[spring-retry-1.2.3.RELEASE.jar!/:na]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:351) ~[spring-retry-1.2.3.RELEASE.jar!/:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211) ~[spring-retry-1.2.3.RELEASE.jar!/:na]
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114) ~[spring-kafka-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40) ~[spring-kafka-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1220) [spring-kafka-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1213) [spring-kafka-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1174) [spring-kafka-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1155) [spring-kafka-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1096) [spring-kafka-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:924) [spring-kafka-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:740) [spring-kafka-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:689) [spring-kafka-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_192]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_192]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_192]
Caused by: org.springframework.messaging.MessagingException: Java heap space; nested exception is java.lang.OutOfMemoryError: Java heap space
at org.springframework.integration.core.ErrorMessagePublisher.determinePayload(ErrorMessagePublisher.java:186) ~[spring-integration-core-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
at org.springframework.integration.core.ErrorMessagePublisher.publish(ErrorMessagePublisher.java:162) ~[spring-integration-core-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
... 17 common frames omitted
Caused by: java.lang.OutOfMemoryError: Java heap space
I tried to increase the deployer heap space with --properties "deployer.*.memory=2048m"
. And monitored the memory stats with jvmtop. My Boot apps apparently use the memory settings but the error is thrown anyways.