4
votes

Guyz... we are stuck.. bail us out! :-)

We are having a 3 step log aggregation pipeline using Fluentd.

[#1 - Tail logs (raw logs)] --(TCP)--> [#2 - Parse the read logs into JSON] --(TCP)--> [#3 - Filter and output to Redis & Mongo]

We are NOT converting the tail'd logs into JSON in the first step. This is mainly due to the fact that we want to avoid any extra CPU consumption on that server. The log lines that we have are pretty complex and parsing is intentionally deferred for step #2 (on a different cluster/servers).

So phase #1 emits: time, tag & record (raw log line). We use in_tail plugin here so by default 'time' attribute indicates the time the record was read from the file. So it's possible under load that the read time may not match the log line's actual timestamp.

The JSON parsing is deferred to the 2nd phase.

At the 2nd phase once we have the log converted to JSON... we want to override the 'time' attribute sent by Phase #1 to the time attribute from the JSON record.

We use Fluent-Plugin-Parser at step #2 (https://github.com/tagomoris/fluent-plugin-parser).

How can we override the time attribute and make FluentD use that instead of the 'time' that was read in step #1?

1

1 Answers

15
votes

Yes, you can do this with fluent-plugin-parser's undocumented feature "time_key" like this:

<source>
  type exec
  run_interval 3s
  format json
  command echo '{"message":"hello,2013-03-03 12:00:13"}'
  tag first
</source>

<match first>
  type parser
  key_name message
  time_key my_time
  time_format %Y-%m-%d %H:%M:%S
  format /^(?<some_field>[^,]*),(?<my_time>.*)/
  tag second
</match>

<match second>
  type stdout
</match>

What the above code snippet does is:

  1. the generates messages {"message":"hello,2013-03-03 12:00:13"} every 3 seconds with the tag "first". This is for the purpose of testing.
  2. It is matched against <match first>. Then, the parser plugin parses the field called "message" with the regular expression. In your case, it would be format json.
  3. time_key my_time tells the parser plugin to look for a field inside the parsed value of the "message" field, and if it exists, it parsed that field with time_format %Y-%m-%d %H:%M:%S. From this point on, this is the new time
  4. Finally, I output to stdout.

If you run the above conf, you should get an output like this:

root@ae4a398d41ef:/home/fluentd# fluentd -c fluent.conf
2014-05-31 00:01:19 +0000 [info]: starting fluentd-0.10.46
2014-05-31 00:01:19 +0000 [info]: reading config file path="fluent.conf"
2014-05-31 00:01:19 +0000 [info]: gem 'fluent-plugin-parser' version '0.3.4'
2014-05-31 00:01:19 +0000 [info]: gem 'fluentd' version '0.10.46'
2014-05-31 00:01:19 +0000 [info]: using configuration file: <ROOT>
  <source>
    type exec
    run_interval 3s
    format json
    command echo '{"message":"hello,2013-03-03 12:00:13"}'
    tag first
  </source>
  <match first>
    type parser
    key_name message
    time_key my_time
    time_format %Y-%m-%d %H:%M:%S
    format /^(?<some_field>[^,]*),(?<my_time>.*)/
    tag second
  </match>
  <match second>
    type stdout
  </match>
</ROOT>
2014-05-31 00:01:19 +0000 [info]: adding source type="exec"
2014-05-31 00:01:19 +0000 [info]: adding match pattern="first" type="parser"
2014-05-31 00:01:19 +0000 [info]: adding match pattern="second" type="stdout"
2013-03-03 12:00:13 +0000 second: {"some_field":"hello"}
2013-03-03 12:00:13 +0000 second: {"some_field":"hello"}
2013-03-03 12:00:13 +0000 second: {"some_field":"hello"}
2013-03-03 12:00:13 +0000 second: {"some_field":"hello"}