0
votes

I have a Kafka queue with json objects. I am filling this queue with a java based offline producer. The structure of json object is shown as an example:

{ 
    "key": "999998", 
    "message"  : "dummy \n Messages \n Line 1 ", 
    "type"  : "app_event", 
    "stackTrace" : "dummyTraces", 
    "tags" : "dummyTags" 

}

Note the \n in the "message".

I loaded the queue with million objects and started logstash with the following script:

input {
        kafka {
                zk_connect => "localhost:2181"
                topic_id => "MemoryTest"
                type => "app_event"
                group_id => "dash_prod"
        } 
}

filter{
        if [type] == "app_event" {
                multiline {
                         pattern => "^\s"
                         what => "previous"
                }
        }
}

output {
    if [type] == "app_event" {
        stdout { 
         codec => rubydebug 
         }

          elasticsearch {
                  host => "localhost"
                  protocol => "http"
                  port => "9200"
                  index => "app_events"
                  index_type => "event"     
          }
    }
}

The multiline filter is expected to remove \n from the message field. When I start logstash, I start getting two issues:

  1. None of the event is pushed into Elastic. I am getting error:_jsonparsefailure. Also notice that the message of one event 'gobbles' up consecutive events.

    { "message" => "{ \n\t\"key\": \"146982\", \n\t\"message\" : \"dummy \n Messages \n Line 1 \", \n\t\"type\" : \"app_event\", \n\t\"stackTrace\" : \"dummyTraces\", \n\t\"tags\" : \"dummyTags\" \n \t \n}\n{ \n\t\"key\": \"146983\", \n\t\"message\" : \"dummy \n Messages \n Line 1 \", \n\t\"type\" : \"app_event\", \n\t\"stackTrace\" : \"dummyTraces\", \n\t\"tags\" : \"dummyTags\" \n \t \n}\n{ \n\t\"key\": \"146984\", \n\t\"message\" : \"dummy \n Messages \n Line 1 \", \n\t\"type\" : \"app_event\", \n\t\"stackTrace\" : \"dummyTraces\", \n\t\"tags\" : \"dummyTags\" \n \t \n}, "tags" => [ [0] "_jsonparsefailure", 1 "multiline" ], "@version" => "1", "@timestamp" => "2015-09-21T18:38:32.005Z", "type" => "app_event" }

  2. After few minutes, the available heap memory reached a cap and logstash stopped.

A memory profile is attached with this issue. After 13 minutes, logstash hit the memory cap and stopped responding.

Logstash Profile

I am trying to understand how to get multiline worked for this scenario and what causes memory crash.

1
What do you want your 'message' field to become?Alain Collins
I want \n to be removed from message.Mohitt

1 Answers

0
votes

To replace part of a string, use mutate->gsub{}.

filter {
  mutate {
    gsub => [
      # replace all forward slashes with underscore
      "fieldname", "/", "_",
      # replace backslashes, question marks, hashes, and minuses
      # with a dot "."
      "fieldname2", "[\\?#-]", "."
    ]
  }
}

multiline is, as you've discovered, for combining several events into one.