0
votes

I have a spring integration flow that starts with a channel inboundadapter and picks up files and passes them through the system as messages. After a few components, the messages are aggregated at an "Aggregator" from where they are released based on release strategies or by group timeout of 30 sec. The downstream processing has another bunch of components till the final one.

The problem I am facing is this, When I send 33 files which create 33 "groups/buckets" based on correlation IDs, aggregated at the "Aggregator", some of the files or messages seems to be "released" twice. The reason I conclude that is because I have a channel interceptor which shows a few messages passing through the "released" channel (appearing right after the aggregator) a second time, after completing the downstream processing successfully, the first time. Additionally, this behavior causes my application to not find a file and throw an exception which I see. This leads me to conclude that the message bucket/group/corrID is somehow being "Released" twice.

I have tried to debug this many ways , but essentially, I want to know how a corrID/bucket after being released and having successfully gone through all downstream components in a single thread, can be "released" again.

My question is, how can I debug this? I want to know what is making this message/bucket re-appear in the aggregator. My aggregator is as follows,

<int:aggregator id="bufferedFiles" input-channel="inQueueForStage"
        output-channel="released" expire-groups-upon-completion="true"
        send-partial-result-on-expiry="true" release-strategy="releaseHandler"
        release-strategy-method="canRelease"
        group-timeout-expression="size() > 0 ? T(com.att.datalake.ifr.loader.utils.MessageUtils).getAggregatorTimeout(one, @sourceSnapshot) : -1">
        <int:poller fixed-delay="${files.pickup.delay:3000}"
            max-messages-per-poll="${num.files.pickup.per.poll:10}"
            task-executor="executor" />
    </int:aggregator>

Explanation of aggregator: The size()>0 applies to EACH correlation bucket. each of the 33 files I am sending will spawn/generate/create a new bucket because of the file name, so the aggregator will have 33 buckets/groups/corrIds, each bucket will contain only one file. So the aggregator SPEL expression simply says that if there no release strategies, then release the bucket/group after 30 secs if the group indeed has at least some files.

My Channel inbound adapter is as follows:

<int-file:inbound-channel-adapter id="files" channel="dispatchFiles" directory="${source.dir}" scanner="directoryScanner"> <int:poller fixed-delay="${files.pickup.delay:3000}" max-messages-per-poll="${num.files.pickup.per.poll:10}" /> </int-file:inbound-channel-adapter>

Logs here is the log of message completing the flow the first time. The completion time invoked suggests reaching the last component a "completionHandler" SA. enter image description heremessage_appearing_twice_causing_error

Explanation of Log: "cor" is the bucket/corrId that is being released twice. The reason I get the final exception is because during the first time, the file is removed from that original location and processed. So the second time around when this erroneous release happens, there is nothing to process there. From the pictures it can be seen that the first batch/corrId/bucket is processed and finished around 11:09, and the second one is started around 11:10

an important point I noticed that this behavior only happens when I have a global channel interceptor in which I am doing somewhat long processing. When this interceptor is commented out, the errors go away.

Question: is it possible for aggregator to double release a batch/corrId under any circumstance? How can I make aggregator emit any logs?

Thanks

Edit 10:15pm

My channel following the aggregator has an interceptor as follows,

public Message<?> preSend(Message<?> message, MessageChannel channel) { LOGGER.info("******** Releasing from aggregator(interceptor) , corrID:{} at time:{} ********",MessageUtils.getCorrelationId(message), new Date() ); finalReporter.callback(channel.toString(), message); return message; }

From Aggregator down to final compeltionHandler SA, I have single threaded processing Aggregator -> releasedChannel -> some SA1 -> some channel -> ..... -> completionChannel->completeSA

When I run for 33 partitions, let's follow corrId = "alh" The first time it is released, it looks like following, alh first time released What it shows is that thread-5 released it and it should process all the downstream components. But it leaves it mid-way and starts doing other things and is picked up again by a diffferent thread a little later as follows, alh second thread-8 That seems/seemed to be the problem,

Solution Update: I did following 3 things to sort of work around, at the moment,

  1. for some reason, my interceptors were doing return super.preSend(message, channel) instead of simply return message. I changed it to latter

  2. I had a global channel interceptors, I removed global and kept individual ones

  3. If the channel interceptors had any issues before returning, would that cause a new release?

Although I still see the above scenario depicted in pictures, I am not getting double processing attempts and as such it avoids the errors. I am still trying to make sense out of this.

I understand it's too specific and difficult to explain; still thanks for the time and comments...

2
Share, please, exception, <inbound-channel-adapter>, logs would be good too. For the last one I mean log4j.category.org.springframework.integration=DEBUG. And expain, please, such an aggregator configuration. It isn't clear when you say that you send 33 files as groups, but at the same time you check size() > ...Artem Bilan
Note that expire-groups-upon-completion="true" means that a new message with the same correlation id will form a new group; false means new messages with an old correlation id will be discarded. But yes, a full DEBUG log showing the problem will be useful.Gary Russell
Thank you for such a comprehensive explanation, but your business logs don't say me anything. Need to see how messages travel through the channels. In that case I can the correlationId. Don't you think that you build different groups for the same correlationId ?Artem Bilan
thanks @GaryRussell but the thing is I do want a new message with the same corr id to form a new group. for e.g. if I am forming a group with corr-id = abc based on file abc-1.txt, then if I get a new file, abc-2.txt, I do want a new group with the same corr_id=abc to be formed, that's why that option expire-groups-upon-completion="true" is required for meameet chaubal

2 Answers

0
votes

However, yes. I think @GaryRussell is right: since you use expire-groups-upon-completion="true" some partial groups may be released by group-timeout-expression and the new messages with the same correlationId will form a new group, which is released by the next group-timeout. Your size() > 0 isn't good too. It means that it is going to release partial group after that group-timeout. Maybe size() > 1? The group can't be size() == 0 though. Because it is created on the first message, so, if gruop exists, it contains at least one message. Yes, group can be empty, but in that case the aggregator should be marked with expire-groups-upon-completion="false". In that case it is marked as completed and doesn't allow new messages.

0
votes

After struggling with debugging and various blind scenarios, I believe that at least I have a workaround and a possible root cause. I will try to outline all the things that I modified,

Root Cause:

My interceptors were calling a Common class with a common callback method. This method, based on the channel name from which the request was coming from, would decide the appropriate action to take. The actions were essentially collecting data, incrementing counters and persisting to database some information. It seems that some of them were having errors and consequently, the thread was dying and message re-released. I am not entirely sure about it and please correct me if that's not the case. But after I fixed those errors, the re-release issue seems to have subsided or vanished altogether. The reason it was hard to diagnose was because I could not see those errors thrown during callback method invocations; may be I was catching them or may be they were lost.

I also found that the issue was only on any channel interceptors AFTER the aggregator. Interceptors before the aggregator did not present any issues; may be because they were simpler...

To debug, I removed the interceptors and made the callback directly from various components (SAs), removed global interceptors and tried to add individual interceptors for specific channels.

Thanks for all the help.