Guyz... we are stuck.. bail us out! :-)
We are having a 3 step log aggregation pipeline using Fluentd.
[#1 - Tail logs (raw logs)] --(TCP)--> [#2 - Parse the read logs into JSON] --(TCP)--> [#3 - Filter and output to Redis & Mongo]
We are NOT converting the tail'd logs into JSON in the first step. This is mainly due to the fact that we want to avoid any extra CPU consumption on that server. The log lines that we have are pretty complex and parsing is intentionally deferred for step #2 (on a different cluster/servers).
So phase #1 emits: time, tag & record (raw log line). We use in_tail plugin here so by default 'time' attribute indicates the time the record was read from the file. So it's possible under load that the read time may not match the log line's actual timestamp.
The JSON parsing is deferred to the 2nd phase.
At the 2nd phase once we have the log converted to JSON... we want to override the 'time' attribute sent by Phase #1 to the time attribute from the JSON record.
We use Fluent-Plugin-Parser at step #2 (https://github.com/tagomoris/fluent-plugin-parser).
How can we override the time attribute and make FluentD use that instead of the 'time' that was read in step #1?