0
votes

I noticed that @timestamp field, which is correctly defined by filebeat, is changed automatically by logstash and its value is replaced with a log timestamp value (field name is a_timestamp). Here is part of logstash debug log:

[2017-07-18T11:55:03,598][DEBUG][logstash.pipeline ] filter received {"event"=>{"@****timestamp"=>2017-07-18T09:54:53.507Z, "offset"=>498, "@version"=>"1", "input_type"=>"log", "beat"=>{"hostname"=>"centos-ea", "name"=>"filebeat_shipper_kp", "version"=>"5.5.0"}, "host"=>"centos-ea", "source"=>"/home/elastic/ELASTIC_NEW/log_bw/test.log", "message"=>"2017-06-05 19:02:46.779 INFO [bwEngThread:In-Memory Process Worker-4] psg.logger - a_applicationName=\"PieceProxy\", a_processName=\"piece.PieceProxy\", a_jobId=\"bw0a10ao\", a_processInstanceId=\"bw0a10ao\", a_level=\"Info\", a_phase=\"ProcessStart\", a_activityName=\"SetAndLog\", a_timeStamp=\"2017-06-05T19:02:46.779\", a_sessionId=\"\", a_sender=\"PCS\", a_cruid=\"37d7e225-bbe5-425b-8abc-f4b44a5a1560\", a_MachineCode=\"CFDM7757\", a_correlationId=\"fa10f\", a_trackingId=\"9d3b8\", a_message=\"START=piece.PieceProxy\"", "type"=>"log", "tags"=>["beats_input_codec_plain_applied"]}}

[2017-07-18T11:55:03,629][DEBUG][logstash.pipeline ] output received {"event"=>{"a_message"=>"START=piece.PieceProxy", "log"=>"INFO ", "bwthread"=>"[bwEngThread:In-Memory Process Worker-4]", "logger"=>"psg.logger ", "a_correlationId"=>"fa10f", "source"=>"/home/elastic/ELASTIC_NEW/log_bw/test.log", "a_trackingId"=>"9d3b8", "type"=>"log", "a_sessionId"=>"\"\"", "a_sender"=>"PCS", "@version"=>"1", "beat"=>{"hostname"=>"centos-ea", "name"=>"filebeat_shipper_kp", "version"=>"5.5.0"}, "host"=>"centos-ea", "a_level"=>"Info", "a_processName"=>"piece.PieceProxy", "a_cruid"=>"37d7e225-bbe5-425b-8abc-f4b44a5a1560", "a_activityName"=>"SetAndLog", "offset"=>498, "a_MachineCode"=>"CFDM7757", "input_type"=>"log", "message"=>"2017-06-05 19:02:46.779 INFO [bwEngThread:In-Memory Process Worker-4] psg.logger - a_applicationName=\"PieceProxy\", a_processName=\"piece.PieceProxy\", a_jobId=\"bw0a10ao\", a_processInstanceId=\"bw0a10ao\", a_level=\"Info\", a_phase=\"ProcessStart\", a_activityName=\"SetAndLog\", a_timeStamp=\"2017-06-05T19:02:46.779\", a_sessionId=\"\", a_sender=\"PCS\", a_cruid=\"37d7e225-bbe5-425b-8abc-f4b44a5a1560\", a_MachineCode=\"CFDM7757\", a_correlationId=\"fa10f\", a_trackingId=\"9d3b8\", a_message=\"START=piece.PieceProxy\"", "a_phase"=>"ProcessStart", "tags"=>["beats_input_codec_plain_applied", "_dateparsefailure", "kv_ok", "taskStarted"], "a_processInstanceId"=>"bw0a10ao", "@timestamp"=>2017-06-05T17:02:46.779Z, "my_index"=>"bw_logs", "a_timeStamp"=>"2017-06-05T19:02:46.779", "a_jobId"=>"bw0a10ao", "a_applicationName"=>"PieceProxy", "TMS"=>"2017-06-05 19:02:46.779"}}

NB:

  1. I noticed that this doesn't happen with a simple pipeline (without grok, kv and other plugins I use in my custom pipeline)
  2. I changed filebeat's property json.overwrite_keys to TRUE but with no success.

Can you explain me why and what happens with @_timestamp changing? I don't expect it to be done automatically (I saw many posts of people asking how to do that) because @timestamp is a system field.. What's wrong with that?

Here is my pipeline:

input { 
   beats {
        port => "5043"
        type => json
    }
}
filter {    
      #date {
      #  match => [ "@timestamp", "ISO8601" ]
      #  target => "@timestamp"
      #}

    if "log_bw" in [source] {
                grok {
                    patterns_dir => ["/home/elastic/ELASTIC_NEW/logstash-5.5.0/config/patterns/extrapatterns"]
                    match => { "message" => "%{CUSTOM_TMS:TMS}\s*%{CUSTOM_LOGLEVEL:log}\s*%{CUSTOM_THREAD:bwthread}\s*%{CUSTOM_LOGGER:logger}-%{CUSTOM_TEXT:text}" }    
                    tag_on_failure => ["no_match"]
                }

                if "no_match" not in [tags] {

                    if "Payload for Request is" in [text] {

                        mutate {
                            add_field => { "my_index" => "json_request" }
                        }                                       

                        grok {
                            patterns_dir => ["/home/elastic/ELASTIC_NEW/logstash-5.5.0/config/patterns/extrapatterns"]
                            match => { "text" => "%{CUSTOM_JSON:json_message}" }
                        }

                        json {
                            source => "json_message"
                            tag_on_failure => ["errore_parser_json"]
                            target => "json_request"
                        }

                        mutate {
                            remove_field => [ "json_message", "text" ]
                        }
                    }
                    else {

                        mutate {
                            add_field => { "my_index" => "bw_logs" }
                        }

                        kv {
                            source => "text"
                            trim_key => "\s"
                            field_split => ","
                            add_tag => [ "kv_ok" ]
                        }

                        if "kv_ok" not in [tags] {
                            drop { }
                        }

                        else {

                            mutate {
                                remove_field => [ "text" ]
                            }

                            if "ProcessStart" in [a_phase] {
                                mutate {
                                    add_tag => [ "taskStarted" ]
                                }
                            }

                            if "ProcessEnd" in [a_phase] {
                                mutate {
                                    add_tag => [ "taskTerminated" ]
                                }
                            }

                            date {
                                match => [ "a_timeStamp", "yyyy'-'MM'-'dd'T'HH:mm:ss.SSS" ]
                            }

                            elapsed {
                                start_tag => "taskStarted"
                                end_tag => "taskTerminated"
                                unique_id_field => "a_cruid"
                            }
                        }
                    }       
                }
    }
    else {

        mutate {
            add_field => { "my_index" => "other_products" } 
        }
    }
}
output {

        elasticsearch { 
            index => "%{my_index}"
            hosts => ["localhost:9200"] 
        }

        stdout { codec => rubydebug }

        file {
            path => "/tmp/loggata.tx"
            codec => json
        }
}

Thank you very much,

Andrea

1

1 Answers

0
votes

This was the error (a typo from previous tests):

date {
    match => [ "a_timeStamp", "yyyy'-'MM'-'dd'T'HH:mm:ss.SSS" ]
}

Thank you guys, anyway!