0
votes

I am trying to use Clickhouse Kafka Engine to ingest data. Data is in CSV format. During data ingestion, sometimes I am getting exception

2018.01.08 08:41:47.016826 [ 3499 ] <Debug> StorageKafka (consumer_queue): Started streaming to 1 attached views
2018.01.08 08:41:47.016906 [ 3499 ] <Trace> StorageKafka (consumer_queue): Creating formatted reader
2018.01.08 08:41:49.680816 [ 3499 ] <Error> void DB::StorageKafka::streamThread(): Code: 117, e.displayText() = DB::Exception: Expected end of line, e.what() = DB::Exception, Stack trace:

0. clickhouse-server(StackTrace::StackTrace()+0x16) [0x3221296]
1. clickhouse-server(DB::Exception::Exception(std::string const&, int)+0x1f) [0x144a02f]
2. clickhouse-server() [0x36e6ce1]
3. clickhouse-server(DB::CSVRowInputStream::read(DB::Block&)+0x1a0) [0x36e6f60]
4. clickhouse-server(DB::BlockInputStreamFromRowInputStream::readImpl()+0x64) [0x36e3454]
5. clickhouse-server(DB::IProfilingBlockInputStream::read()+0x16e) [0x2bcae0e]
6. clickhouse-server(DB::KafkaBlockInputStream::readImpl()+0x6c) [0x32f6e7c]
7. clickhouse-server(DB::IProfilingBlockInputStream::read()+0x16e) [0x2bcae0e]
8. clickhouse-server(DB::copyData(DB::IBlockInputStream&, DB::IBlockOutputStream&, std::atomic<bool>*)+0x55) [0x35b3e25]
9. clickhouse-server(DB::StorageKafka::streamToViews()+0x366) [0x32f54f6]
10. clickhouse-server(DB::StorageKafka::streamThread()+0x143) [0x32f58c3]
11. clickhouse-server() [0x40983df]
12. /lib/x86_64-linux-gnu/libpthread.so.0(+0x76ba) [0x7f4d115d06ba]
13. /lib/x86_64-linux-gnu/libc.so.6(clone+0x6d) [0x7f4d10bf13dd]

Below is the table

CREATE TABLE test.consumer_queue (ID Int32,  DAY Date) ENGINE = Kafka('broker-ip:port', 'clickhouse-kyt-test','clickhouse-kyt-test-group', '**CSV**')

CREATE TABLE test.consumer_request ( ID Int32,  DAY Date) ENGINE = MergeTree PARTITION BY DAY ORDER BY (DAY, ID) SETTINGS index_granularity = 8192

CREATE MATERIALIZED VIEW test.consumer_view TO test.consumer_request (ID Int32, DAY Date) AS SELECT ID, DAY FROM test.consumer_queue

CSV Data

10034,"2018-01-05"
10035,"2018-01-05"
10036,"2018-01-05"
10037,"2018-01-05"
10038,"2018-01-05"
10039,"2018-01-05"

Clickhouse server version 1.1.54318.

2

2 Answers

1
votes

It seems that ClickHouse read batch of messages from Kafka and then try to decode all these messages as a single CSV. And messages in this single CSV should be separated with new line character. So all messages should have new line character at the end.

I am not sure if it is a feature or a bug of ClickHouse.

You can try to send to kafka only one message and check if it appears correctly in ClickHouse.

If you send messages to Kafka with script kafka-console-producer.sh then this script (class ConsoleProducer.scala) reads lines from a file and sends each line to a Kafka topic without new line character, so such messages can not be processed correctly.

If you send messages with your own script/application then you can try to modify it and add new line character to the end of each messages. This should solve the problem. Or you can use another format for Kafka Engine, for example JSONEachRow.

0
votes

Agree with @mikhail 's answer, i guess, try kafka_row_delimiter = '\n' in SETTINGS KAFKA engine