1
votes

I have setup an elk stack infrastructure with docker. I can't see files being processed by logstash.

Filebeat is configured to send .csv files to logstash from logstash, to elasticsearch. I see the logstash filebeat listner staring. Logstash to elasticsearch pipeline works however there is no document/index written.

enter image description here

Please advise

filebeat.yml

    filebeat.prospectors:
    - input_type: log
      paths:
         - logs/sms/*.csv
      document_type: sms
      paths:
         - logs/voip/*.csv
      document_type: voip

    output.logstash:
      enabled: true
      hosts: ["logstash:5044"]

    logging.to_files: true
    logging.files:

logstash.conf

input {
    beats {
        port => "5044"
   }
}

filter {
 if [document_type] == "sms" {
        csv {
                columns => ['Date', 'Time', 'PLAN', 'CALL_TYPE', 'MSIDN', 'IMSI', 'IMEI']
                separator => " "
                skip_empty_columns => true
                quote_char => "'"
        }
  }
 if [document_type] == "voip" {
  csv {
    columns => ['Date', 'Time', 'PostDialDelay', 'Disconnect-Cause', 'Sip-Status','Session-Disposition', 'Calling-RTP-Packets-Lost','Called-RTP-Packets-Lost', 'Calling-RTP-Avg-Jitter','Called-RTP-Avg-Jitter', 'Calling-R-Factor', 'Called-R-Factor', 'Calling-MOS', 'Called-MOS', 'Ingress-SBC', 'Egress-SBC', 'Originating-Trunk-Group', 'Terminating-Trunk-Group']
        separator => " "
        skip_empty_columns => true
        quote_char => "'"
    }
  }
}

output {
     if [document_type] == "sms"{
                elasticsearch {
                        hosts => ["elasticsearch:9200"]
                        index => "smscdr_index"
                        }
                stdout {
                codec => rubydebug
               }
      }
  if [document_type] == "voip" {
               elasticsearch {
                        hosts => ["elasticsearch:9200"]
                        index => "voipcdr_index"
                    }
               stdout {
               codec => rubydebug
              }
      }
}

Logstash partial logs

[2019-12-05T12:48:38,227][INFO ][logstash.inputs.beats    ] Beats inputs: Starting input listener {:address=>"0.0.0.0:5044"}
[2019-12-05T12:48:38,411][INFO ][logstash.pipeline        ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x4ffc5251 run>"}
[2019-12-05T12:48:38,949][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-12-05T12:48:39,077][INFO ][org.logstash.beats.Server] Starting server on port: 5044
==========================================================================================
[2019-12-05T12:48:43,518][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["http://elasticsearch:9200"]}
[2019-12-05T12:48:43,745][INFO ][logstash.pipeline        ] Pipeline started successfully {:pipeline_id=>".monitoring-logstash", :thread=>"#<Thread:0x46e8e60c run>"}
[2019-12-05T12:48:43,780][INFO ][logstash.agent           ] Pipelines running {:count=>2, :running_pipelines=>[:".monitoring-logstash", :main], :non_running_pipelines=>[]}
[2019-12-05T12:48:45,770][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

filebeat log sample

2019-12-05T12:55:33.119Z        INFO    log/harvester.go:255    Harvester started for file: /usr/share/filebeat/logs/voip/voip_cdr_1595.csv
2019-12-05T12:55:33.126Z        INFO    log/harvester.go:255    Harvester started for file: /usr/share/filebeat/logs/voip/voip_cdr_2004.csv
2019-12-05T12:55:33.130Z        INFO    log/harvester.go:255    Harvester started for file: /usr/share/filebeat/logs/voip/voip_cdr_2810.csv
======================================================================================================
2019-12-05T13:00:44.002Z        INFO    log/harvester.go:280    File is inactive: /usr/share/filebeat/logs/voip/voip_cdr_563.csv. Closing because close_inactive of 5m0s reached.
2019-12-05T13:00:44.003Z        INFO    log/harvester.go:280    File is inactive: /usr/share/filebeat/logs/voip/voip_cdr_2729.csv. Closing because close_inactive of 5m0s reached.
2019-12-05T13:00:44.004Z        INFO    log/harvester.go:280    File is inactive: /usr/share/filebeat/logs/voip/voip_cdr_2308.csv. Closing because close_inactive of 5m0s reached.
2019-12-05T13:00:49.218Z        INFO    log/harvester.go:280    File is inactive: /usr/share/filebeat/logs/voip/voip_cdr_981.csv. Closing because close_inactive of 5m0s reached.

docker-compose ps

docker-compose -f docker-compose_stash.yml ps
The system cannot find the path specified.
      Name                     Command               State                            Ports
---------------------------------------------------------------------------------------------------------------------
elasticsearch_cdr   /usr/local/bin/docker-entr ...   Up      0.0.0.0:9200->9200/tcp, 9300/tcp
filebeat_cdr        /usr/local/bin/docker-entr ...   Up
kibana_cdr          /usr/local/bin/kibana-docker     Up      0.0.0.0:5601->5601/tcp
logstash_cdr        /usr/local/bin/docker-entr ...   Up      0.0.0.0:5000->5000/tcp, 0.0.0.0:5044->5044/tcp, 9600/tcp
1
elasticsearch_cdr /usr/local/bin/docker-entr ... Up 0.0.0.0:9200->9200/tcp, 9300/tcp filebeat_cdr /usr/local/bin/docker-entr ... Up kibana_cdr /usr/local/bin/kibana-docker Up 0.0.0.0:5601->5601/tcp logstash_cdr /usr/local/bin/docker-entr ... Up 0.0.0.0:5000->5000/tcp, 0.0.0.0:5044->5044/tcp, 9600/tcppraslea

1 Answers

1
votes

In logstash you have a conditional check in the field document_type, but this field is not generated by filebeat, you need to correct your filebeat config.

Try this config for your inputs.

filebeat.prospectors:
- input_type: log
  paths:
     - logs/sms/*.csv
  fields:
    document_type: sms
  paths:
     - logs/voip/*.csv
  fields:
    document_type: voip

This will create a field named fields with a nested field named document_type, like the example below.

{ "fields" : { "document_type" : "voip" } }

And change your logstash conditionals to check agains the field fields.document_type, like the example below.

if [fields][document_type] == "sms" {
  your filters
}

If you want, you can use the option fields_under_root: true in filebeat to create the document_type in the root of your document, so you will not need to change your logstash conditionals.

filebeat.prospectors:
- input_type: log
  paths:
     - logs/sms/*.csv
  fields:
    document_type: sms
  fields_under_root: true