1
votes

I was trying to put data from kafka to clickhouse with filebeat, my configs looks like below

filebeat conf

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/nginx/access.log

output.kafka:
  # specifying filebeat to take timestamp and message fields, other wise it
  # take the lines as json and publish to kafka
  codec.format:
    string: '%{[@timestamp]} %{[message]}'

  # kafka
  # publishing to 'log' topic
  hosts: ["kafka:9092"]
  topic: 'myfirst'
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

in the Kafka im getting my log in topic and Everything is fine, a part that the data are inserted to kafka topic like this

2021-01-01T21:51:25.225Z {"remote_addr": "192.168.222.1","remote_user": "-","time_local":  "01/Jan/2021:21:51:17 +0000","request":     "GET / HTTP/1.1","status":      "304","body_bytes_sent": "0","http_referer": "-","http_user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36"}

and i create clickhouse tables and MATERIALIZED

CREATE TABLE accesslog (
...
    ) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka:9092',


but query output in clickhouse was like this without data!why?

┌─remote_addr─┬─remote_user─┬─time_local─┬───────date─┬─request─┬─status─┬─body_bytes_sent─┬─http_referer─┬─http_user_agent─┐
│             │             │            │ 0000-00-00 │         │      0 │               0 │              │                 │
│             │             │            │ 0000-00-00 │         │      0 │               0 │              │                 │
│             │             │            │ 0000-00-00 │         │      0 │               0 │              │                 │
└─────────────┴─────────────┴────────────┴────────────┴─────────┴────────┴─────────────────┴──────────────┴─────────────────┘
1
check the log /var/log/clickhouse-server/clickhouse-server.log. - vladimir
are you sure that to accesslog is coming any events? To check it: 1) stop MV by "detach table log_consumer", 2) add some events to Topic, 3) run this query to check it: "select * from accesslog". - vladimir
thank you for answer ,yes accesslog is update by events even in kafka, i did this 3 step but select * from accesslog have record by each event also its without data like i said. - Mim sdi
and clickhouse server log : 2021.01.01 23:20:03.466759 [ 51 ] {} <Trace> StorageKafka (accesslog): Already assigned to : [ myfirst[0:#] ] 2021.01.01 23:20:03.466942 [ 47 ] {} <Trace> StorageKafka (accesslog): Already assigned to : [ ] 2021.01.01 23:20:03.980754 [ 48 ] {} <Trace> StorageKafka (accesslog): Polled batch of 1 messages. Offset position: [ myfirst[0:1] ] 2021.01.01 23:20:03.981755 [ 48 ] {} <Trace> IRowInputFormat: Skipped 1 rows with errors while reading the input stream 2021.01.01 23:20:04.489136 [ 48 ] {} <Trace> StorageKafka (accesslog): Stalled - Mim sdi
which version of ClickHouse do you use? I cannot reproduce this error on version 20.12.3.3. Or better provide your docker-compose.yml. - vladimir

1 Answers

1
votes

It looks like the issue is the wrong Kafka broker address. Should be used not external address kafka:9092 but internal kafka:19092:

CREATE TABLE accesslog (
..
  ) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka:19092', ..


Reproducing steps:

Kafka-side:

# run shell in Kafka container
docker exec -it kafka bash

# create topic
kafka-topics --create --topic myfirst --partitions 1 --replication-factor 1 --bootstrap-server kafka:19092

# check topic
# kafka-topics --describe --topic myfirst  --bootstrap-server kafka:19092

# add events to the topic
kafka-console-producer --topic myfirst --broker-list kafka:19092
# event body: {"remote_addr": "192.168.222.1","remote_user": "-","time_local":  "01/Jan/2021:21:51:17 +0000","request":     "GET / HTTP/1.1","status":      "304","body_bytes_sent": "0","http_referer": "-","http_user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36"}
..

ClickHouse-side:

SELECT *
FROM accesslog

/*
┌─remote_addr───┬─remote_user─┬─time_local─────────────────┬─request────────┬─status─┬─body_bytes_sent─┬─http_referer─┬─http_user_agent────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ 192.168.222.1 │ -           │ 01/Jan/2021:21:51:17 +0000 │ GET / HTTP/1.1 │    304 │               0 │ -            │ Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36 │
..
*/

Excerpt from docker-compose.yml:

..
  kafka:
    image: confluentinc/cp-kafka:5.2.2
    container_name: kafka
    restart: unless-stopped
    hostname: kafka
    depends_on:
      - zookeeper
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-x.x.x.x}:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    ports:
      - 9092:9092
    networks:
      - net1
..