2
votes

I've got a simple Kafka broker running with a topic raw_events.

With kafka-console-producer --topic raw_events --broker-list kafka:29092 < event.json I'm adding events to that topic that successfully show up with kafka-console-consumer --bootstrap-server kafka:29092 --topic raw_events. So, I know the events land in the broker (in the right topic) and can be consumed from the broker**) as well.

In this case, the event.json file contains a very simple JSON:

{'event_type': 'issue', 'project': 'sample', 'user': {'name': 'John Doe', 'username': 'jdoe'} }

In KSQL, the topic is there:

ksql> show topics;

 Kafka Topic           | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
--------------------------------------------------------------------------------------------------
 raw_events            | true       | 1          | 1                  | 3         | 3

containing some events from earlier attempts:

ksql> print 'raw_events';
Format:STRING
11/2/18 3:36:21 PM UTC , NULL , {'event_type': 'issue', 'project': 'sample', 'user': {'name': 'John Doe', 'username': 'jdoe'}}
11/2/18 3:43:05 PM UTC , NULL , {'event_type': 'issue', 'project': 'sample', 'user': {'name': 'John Doe', 'username': 'jdoe'}}
11/2/18 3:45:19 PM UTC , NULL , {'event_type': 'issue', 'project': 'sample', 'user': {'name': 'John Doe', 'username': 'jdoe'}}
11/2/18 3:45:43 PM UTC , NULL , {'event_type': 'issue', 'project': 'sample', 'user': {'name': 'John Doe', 'username': 'jdoe'}}
11/2/18 3:47:30 PM UTC , NULL , {'event_type': 'issue', 'project': 'sample', 'user': {'name': 'John Doe', 'username': 'jdoe'}}

(I'm following https://docs.confluent.io/current/ksql/docs/developer-guide/create-a-stream.html but with my own data.)

Now, I create a stream in KSQL which succeeds:

create stream new_events (event_type varchar, project varchar) with (kafka_topic='raw_events', value_format='JSON');

The stream is created:

ksql> show streams;

 Stream Name     | Kafka Topic | Format
----------------------------------------
 NEW_EVENTS      | raw_events  | JSON
----------------------------------------

Nevertheless (and this is my question / problem -- it could be a PEBKAC or a KSQL error) a SELECT on that stream just halts and does not show any events... not even when I continue to add events to the topic:

ksql> select * from new_events;
[... nothing here ...]

Selecting a specific column like project does not return entries either.


**) BTW, it is unclear to me why the produce CLI command has an argument --broker-list and the consume CLI command has --bootstrap-server for seemingly the same thing.

1
I think you must define every single field in the JSON Stream for it to be parsed correctly.OneCricketeer
Is it? The user field is then an additional challenge, as its contents isn't a 'flat' value but a nested object again. (Trying to parse and query that correctly is my next task - I'm trying the simple case first.) For the time being, I can leave out that field in the incoming events, and see what happens. Monday :-)Jochem Schulenklopper
Use the STRUCT keyword for the user element. confluent.io/blog/data-wrangling-apache-kafka-ksqlOneCricketeer
Have a look at confluent.io/blog/troubleshooting-ksql-part-1. Specifically, if you're saying that there's nothing even after producing new messages (i.e. offset isn't the issue here) then check the KSQL server log for serialisation errors.Robin Moffatt
Nope, using the STRUCT keyword does not make it work. create stream new_events (event_type varchar, project varchar, user struct<name varchar, username varchar>) with (kafka_topic='raw_events', value_format='JSON');Jochem Schulenklopper

1 Answers

1
votes

Following the troubleshooting tips at https://www.confluent.io/blog/troubleshooting-ksql-part-1...

  • I had data in the source topic
  • I had new data*) arriving in the topic
  • KSQL was consuming data from the right offset
  • Data was matching*) the predicate specified
  • I had no deserialization errors in reading the data... reported *)

You'll notice the *)'s though... I found out that the trouble was that I used single quotes in the JSON, whereas valid JSON officially specifies (you guessed it) quotation marks to be only double quotation marks, ". I was sent on the wrong track by the fact that some internal representation of the JSON was exported as JSON-with-single-quotes.

So, the correct JSON from my example should be

{"event_type": "issue", "project": "sample", "user": {"name": "John Doe", "username": "jdoe"}}

and everything is fine.

(There's nothing in the log of KSQL server that signals this as a reason for the problem though. Luckily this is not documented here as a potential solution if other people encounter this issue.)