1
votes

I am trying to build a pipeline based on this tutorial where Kafka reads from a file with a File Source connector. Using these Docker images for the Elastic Stack, I want to register Logstash as a consumer for the "quickstart-data" topic but I have failed for the moment.

Here is my logstash.conf file:

input {
  kafka {
    bootstrap_servers => 'localhost:9092'
    topics => 'quickstart-data'
  }
}

output {
  elasticsearch {
    hosts => [ 'elasticsearch']
    user => 'elastic'
    password => 'changeme'
  }
  stdout {}
}

The connection to Elasticsearch works because I tested it with a heartbeat input. The message error I get is the following: Connection to node -1 could not be established. Broker may not be available. Give up sending metadata request since no node is available

Any ideas?

2
Did you manage to find a solution for this? I have stumbled upon the exact same problem.geoandri

2 Answers

0
votes

I would recommend you keep things simple and use Kafka Connect for landing the data to Elasticsearch too : https://docs.confluent.io/current/connect/connect-elasticsearch/docs/elasticsearch_connector.html#quick-start

0
votes

There may be a better way to do it but here how I correct the issue:

  1. Change my Zookeeper & Kafka images to Confluent images
zookeeper:
    image: confluentinc/cp-zookeeper:latest
    ports: 
      - "2181:2181"
    environment: 
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    networks: 
      - stack
  kafka:
    image: confluentinc/cp-kafka:latest
    ports:
      - "9092:9092"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on: 
      - zookeeper
    networks: 
      - stack
  1. Logstash configuration (Please note that port is 2902):
input {
    stdin{}
    kafka {
        id => "my_kafka_1"
        bootstrap_servers => "kafka:29092"
        topics => "test"
    }
}