0
votes

Filebeat.yml file:

filebeat.inputs:
- type: log
  paths:
    - C:\Program Files\Filebeat\test_logs\*\*\*\*.txt
  exclude_lines: ['^Infobase.+']
output.logstash:
  hosts: ["localhost:5044"]
  worker: 1

Filebeat collects logs from such a folder structure:

C:\Program Files\Filebeat\test_logs\*\*\*\*.txt

There are many folders here, and each has at least several logs at the end.

Example of log files (In several log files, the time may be the same, since the logs are from different users):

"03.08.2020 10:56:38","Event LClick","Type Menu","t=0","beg"
"03.08.2020 10:56:38","Event LClick","Type Menu","Detail SomeDetail","t=109","end"
"03.08.2020 10:56:40","Event LClick","t=1981","beg"
"03.08.2020 10:56:40","Event LClick","t=2090","end"
"03.08.2020 10:56:41","Event LClick","Type ToolBar","t=3026","beg"
"03.08.2020 10:56:43","Event LClick","Type ToolBar","Detail User_Desktop","t=4477","end"
"03.08.2020 10:56:44","Event FormActivate","Name Form_Name:IsaA","t=5444"
"03.08.2020 10:56:51","Event LClick","t=12543","beg"
"03.08.2020 10:56:51","Event LClick","t=12605","end"
"03.08.2020 10:56:52","Event LClick","Form ","Type Label","Name Application.for.training","t=13853","beg"
"03.08.2020 10:57:54","Event LClick","Form Application.for.training","Type Label","Name Application.for.training","t=75442","end"
"03.08.2020 10:57:54","Event FormActivate","Name List.form","t=75785"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85769","beg"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85769","end"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85847","beg"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85847","end"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85879","beg"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85879","end"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85925","beg"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85925","end"
"03.08.2020 10:58:08","Event LClick","Form List.form","Type FormTable","Name Список","t=89373","beg"
"03.08.2020 10:58:08","Event LClick","Form List.form","Type FormTable","Name Список","Detail Data","t=89451","end"
"03.08.2020 10:58:15","Event LClick","Form List.form","Type FormTable","Name Список","t=96580","beg"
"03.08.2020 10:58:15","Event LClick","Form List.form","Type FormTable","Name Список","Detail Data","t=96643","end"

Logstash confing file:

input {
    beats {
        port => '5044'
    }
}
 filter {
    grok {
        patterns_dir => ['./patterns']
        match => { 'message' => '%{TIME:timestamp}(","Event\s)(?<Event>([^"]+))(","Form\s)?(?<Form>([^"]+))?(","ParentType\s)?(?<parent_type>([^"]+))?(","ParentName\s)?(?<parent_name>([^"]+))?(","Type\s)?(?<type>([^"]+))?(","Name\s)?(?<Name_of_form>([^"]+))?(","Detail\s)?(?<Detail>([^"]+))?(","t=)?(?<t>([\d]+))?(",")?(?<Status>(end|beg))?' }
        add_tag => [ '%{Status}' ]
    }
    dissect {
        mapping => {
            '[log][file][path]' => 'C:\Program Files\Filebeat\test_logs\%{somethingtoo}\%{something}\%{User_Name}\%{filename}.txt'
        }
    }
    date {
        match => [ 'timestamp', 'dd.MM.yyyy HH:mm:ss' ]
    }
    elapsed {
        unique_id_field => 'Event'
        start_tag => 'beg'
        end_tag => 'end'
        new_event_on_match => false
    }

    if 'elapsed' in [tags] {
        aggregate {
            task_id => '%{Event}'
            code => 'map["duration"] = [(event.get("elapsed_time")*1000).to_i]'
            map_action => 'create'
        }
    }
    mutate {
        remove_field => ['timestamp', 'ecs', 'log', 'tags', 'message', '@version', 'something', 'somethingtoo', 'filename', 'input', 'host', 'agent', 't', 'parent_type', 'parent_name', 'type']
        rename => {'elapsed_time' => 'Event_duration'}
    }
}
output {
    elasticsearch {
        hosts => ['localhost:9200']
        index => 'test'
    }
}

In my logstash.conf I am using the aggregate filter and set worker 1 (-w 1) to work properly.

When I was doing tests and configuring using only one log file I set -w 1 and everything worked fine. But when I started collecting all the logs from each directory, problems started. The data is not put into elasticsearch correctly (this is clearly seen from the strange numbers based on the results of aggregation)

I tried setting this up in filebeat.yml in logstash output (worker: 1), but it still didn't help.

Questions:

  1. Maybe you know how to solve this problem? Because it's strange that for one log file or several log files at the end of one directory everything works well, and when more directories are added everything breaks down abruptly.
  2. If I understood the theory correctly, then elasticsearch has indexes and types. Each log has a time and a username whose logs are, maybe I should put the data in the index by log time and type by username so that logs for the same time for different users do not overlap. How should I implement this? I tried to find information, found only about document_type, which is already deprecated.
1
You need to provide more information about your logs and your configs. Update your question with your filebeat config, logstash pipeline and log examples from your files so people can try to reproduce your problem.leandrojmp
@leandrojmp Thanks for the advice, added everything I have!Godunov Dmitry

1 Answers

1
votes

You are using elapsed and aggregate with a field that is not unique, you can have the same value for the Event field in different files, which can make the elapsed filter uses the start event from one file and the end event from another.

This happens because filebeat harvester files in parallel and send it in bulk to logstash. The worker option in the config has no use in your case, it is related to the number of worker to ship the data, not collect.

You can try to use the option harvester_limit: 1, to limit the number of parallel harvesters, but this can slow your data processing and there is no guarantee that it won't mix up your filters. Also, Filebeat does not guarantee the order of the events, just at-least-once delivery.

The best solution is to create a unique field concatenating the Event field with the filename field, this way the events from different files won't be mixed up.

You can do that using adding the mutate filter before your elapsed filter.

mutate {
  add_field => { "uniqueEvent" => "%{Event}_%{filename}" }
}

This will create a field named uniqueEvent with a value like Lclick_filename, you will then use this new field in your elapsed and aggregate filters.

If you have the same file name in different folders you will need to use another field from your path until you make the value of uniqueEvent a unique value.