1
votes

i have the following json input that i want to dump to logstash (and eventually search/dashboard in elasticsearch/kibana).

{"vulnerabilities":[
    {"ip":"10.1.1.1","dns":"z.acme.com","vid":"12345"},
    {"ip":"10.1.1.2","dns":"y.acme.com","vid":"12345"},
    {"ip":"10.1.1.3","dns":"x.acme.com","vid":"12345"}
]}

i'm using the following logstash configuration

input {
  file {
    path => "/tmp/logdump/*"
    type => "assets"
    codec => "json"
  }
}
output {
  stdout { codec => rubydebug }
  elasticsearch { host => localhost }
}

output

{
       "message" => "{\"vulnerabilities\":[\r",
      "@version" => "1",
    "@timestamp" => "2014-10-30T23:41:19.788Z",
          "type" => "assets",
          "host" => "av12612sn00-pn9",
          "path" => "/tmp/logdump/stack3.json"
}
{
       "message" => "{\"ip\":\"10.1.1.30\",\"dns\":\"z.acme.com\",\"vid\":\"12345\"},\r",
      "@version" => "1",
    "@timestamp" => "2014-10-30T23:41:19.838Z",
          "type" => "assets",
          "host" => "av12612sn00-pn9",
          "path" => "/tmp/logdump/stack3.json"
}
{
       "message" => "{\"ip\":\"10.1.1.31\",\"dns\":\"y.acme.com\",\"vid\":\"12345\"},\r",
      "@version" => "1",
    "@timestamp" => "2014-10-30T23:41:19.870Z",
          "type" => "shellshock",
          "host" => "av1261wag2sn00-pn9",
          "path" => "/tmp/logdump/stack3.json"
}
{
            "ip" => "10.1.1.32",
           "dns" => "x.acme.com",
           "vid" => "12345",
      "@version" => "1",
    "@timestamp" => "2014-10-30T23:41:19.884Z",
          "type" => "assets",
          "host" => "av12612sn00-pn9",
          "path" => "/tmp/logdump/stack3.json"
}

obviously logstash is treating each line as an event and it thinks {"vulnerabilities":[ is an event and i'm guessing the trailing commas on the 2 subsequent nodes mess up the parsing, and the last node appears coorrect. how do i tell logstash to parse the events inside the vulnerabilities array and to ignore the commas at the end of the line?

Updated: 2014-11-05 Following Magnus' recommendations, I added the json filter and it's working perfectly. However, it would not parse the last line of the json correctly without specifying start_position => "beginning" in the file input block. Any ideas why not? I know it parses bottom up by default but would anticipate the mutate/gsub would handle this smoothly?

file {
    path => "/tmp/logdump/*"
    type => "assets"
    start_position => "beginning"
  }
}
filter {
  if [message] =~ /^\[?{"ip":/ {
    mutate {
      gsub => [
        "message", "^\[{", "{",
        "message", "},?\]?$", "}"
      ]
    }
    json {
      source => "message"
      remove_field => ["message"]
    }
  }
}
output {
  stdout { codec => rubydebug }
  elasticsearch { host => localhost }
}
1

1 Answers

5
votes

You could skip the json codec and use a multiline filter to join the message into a single string that you can feed to the json filter.filter {

filter {
  multiline {
    pattern => '^{"vulnerabilities":\['
    negate => true
    what => "previous"
  }
  json {
    source => "message"
  }
}

However, this produces the following unwanted results:

{
            "message" => "<omitted for brevity>",
           "@version" => "1",
         "@timestamp" => "2014-10-31T06:48:15.589Z",
               "host" => "name-of-your-host",
               "tags" => [
        [0] "multiline"
    ],
    "vulnerabilities" => [
        [0] {
             "ip" => "10.1.1.1",
            "dns" => "z.acme.com",
            "vid" => "12345"
        },
        [1] {
             "ip" => "10.1.1.2",
            "dns" => "y.acme.com",
            "vid" => "12345"
        },
        [2] {
             "ip" => "10.1.1.3",
            "dns" => "x.acme.com",
            "vid" => "12345"
        }
    ]
}

Unless there's a fixed number of elements in the vulnerabilities array I don't think there's much we can do with this (without resorting to the ruby filter).

How about just applying the json filter to lines that look like what we want and drop the rest? Your question doesn't make it clear whether all of the log looks like this so this may not be so useful.

filter {
  if [message] =~ /^\s+{"ip":/ {
    # Remove trailing commas
    mutate {
      gsub => ["message", ",$", ""]
    }
    json {
      source => "message"
      remove_field => ["message"]
    }
  } else {
    drop {}
  }
}