1
votes

I have single node kafka running on my server machine. I used following command to create topic "bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test". I have two logstash instances running. First one reads data from some java application log file inject the same to the kafka. It works fine, I can see data in kafka on console using "bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning" command. But the other logstash instance which reads from kafka(same topic "test") and injects into elasicsearch, is failing. This second instance of logstash fails to read data. I changed its configuration file to read from kafka and print on console, then also it does not output anything.Here is the config file for failing logstash:

// config file
     input {
        kafka {
        zk_connect => "localhost:2181"
        topic_id => "test" 
        }
        }
        output {
        stdout{}
        }

Logstash neither print anything nor it throws any error. I am using Logstash 2.4 and kafka 0.10. I used kafka quick start guide (http://kafka.apache.org/documentation.html#quickstart)

3
Are you sure you have a Zookeeper instance running on localhost?Val

3 Answers

1
votes

If you look at Kafka input plugin configuration, you can see an important parameter, which allows to connect to Kafka cluster: zk_connect.

According to the documentation, it is by default set to localhost:2181. Make sure it is set to your Kafka cluster instance, or ideally multiple instances, depending on your setup.

For example, let's say you're connecting to a three-node Kafka cluster with JSON topic. The config would be as follows:

kafka {
 topic_id => "your_topic"
 zk_connect => "kc1.host:2181,kc2.host:2181,kc3.host:2181"
}

Also, it is important to configure the right codec for the topic. Example above will work with JSON events. If you use Avro, you need to set another parameter - codec. Details on how to config it are well documented on the documentation page. It basically requires Avro schema file to be pointed to, which can be given as avsc file or Schema Registry endpoint (in my opinion much better solution).

If you have Schema Registry running in your Kafka environment, you can point codec to it's url. A full example would be:

kafka {
 codec => avro_schema_registry { endpoint => "http://kc1.host:8081"}
 topic_id => "your_topic"
 zk_connect => "kc1.host:2181,kc2.host:2181,kc3.host:2181"
}

Hope it works!

1
votes
@wjp 
Hi wjp, I am running single node kafka cluster. There is no Schema Registry running. zookeeper is also running.   I used following command to create topic "bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test".  I have two logstash instances running. First one reads data from some java application log file inject the same to the kafka. It works fine, I can see data in kafka on console using "bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning" command. But the other logstash instance which reads from kafka(same topic "test") and injects into elasicsearch, is failing. This second instance of logstash fails to read data. I changed its configuration file to read from kafka and print on console, then also it does not output anything.Here is the config file for failing logstash:
input {
kafka {
zk_connect => "localhost:2181"
topic_id => "test" 
}
}
output {
stdout{}
}
Logstash neither print anything nor it throws any error.
I am using Logstash 2.4 and kafka 0.10.
I used kafka quick start guide (http://kafka.apache.org/documentation.html#quickstart)
0
votes

Please have a look at Kafka Input Configurations options https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html

Please find Logstash configurations to data from Kafka and push it to ELK Stack.

input {
  kafka {
    bootstrap_servers => "localhost:9092"
    topics => ["topic_name"]
  }
}

output{
    elasticsearch{
        hosts => ["http://localhost:9200/"]
        index => "index_name"
    }
}

Hope it helps!