2
votes

I'm trying to create a Kafka stream in KSQL on top of a Kafka topic. I have JSON records like below stored in a Kafka topic.

{ "venue": { "venue_name": "HATCH", "lon": -71.18291, "lat": 42.36667, "venue_id": 22491322 }, "visibility": "public", "response": "yes", "guests": 0, "member": { "member_id": 237655942, "member_name": "Nts" }, "rsvp_id": 1724941595, "mtime": 1524620970613, "event": { "event_name": "Intro to Soldering", "event_id": "250106100", "time": 1526853600000, "event_url": "https:\/\/www.meetup.com\/Makers-of-HATCH-Makerspace\/events\/250106100\/" }, "group": { "group_topics": [ { "urlkey": "quilting", "topic_name": "Quilting" }, { "urlkey": "robotics", "topic_name": "Robotics" }, { "urlkey": "sewing", "topic_name": "Sewing" }, { "urlkey": "edtech", "topic_name": "Education & Technology" }, { "urlkey": "craftswap", "topic_name": "Crafts" }, { "urlkey": "diy", "topic_name": "DIY (Do It Yourself)" }, { "urlkey": "hacking", "topic_name": "Hacking" }, { "urlkey": "3d-modeling", "topic_name": "3D Modeling" }, { "urlkey": "tools", "topic_name": "Tools" }, { "urlkey": "arduino", "topic_name": "Arduino" }, { "urlkey": "makers", "topic_name": "Makers" }, { "urlkey": "makerspaces", "topic_name": "Makerspaces" }, { "urlkey": "3d-printing", "topic_name": "3D Printing" }, { "urlkey": "laser-cutting", "topic_name": "Laser Cutting" }, { "urlkey": "scrapbook-die-cutting-machines", "topic_name": "Scrapbook die cutting machines." } ], "group_city": "Watertown", "group_country": "us", "group_id": 18457932, "group_name": "Makers of HATCH Makerspace", "group_lon": -71.18, "group_urlname": "Makers-of-HATCH-Makerspace", "group_state": "MA", "group_lat": 42.37 } }

This data has been loaded into a Kafka topic.

I have created a stream in KSQL as below.

CREATE STREAM meetup_rsvp_raw 
(  Venue varchar, 
   Visibility varchar, 
   Response varchar, 
   Guests integer, 
   Member varchar, 
   rsvp_id bigint, 
   mtime bigint, 
   event varchar, 
   group_info varchar 
) WITH (KAFKA_TOPIC='meetup-rsvp', VALUE_FORMAT='JSON');

I see null in group_info(last field in the kafka stream) field. Note: Kafka does not me let me create a field with name "group" as it is a keyword. So named the field as group_info.

ksql> select * from meetup_rsvp_raw limit 2;

1524624181126 | null | {"venue_name":"Houghton's Pond - Blue Hills","lon":-71.09453,"lat":42.208187,"venue_id":1506300} | public | yes | 0 | {"member_id":159617162,"photo":"https://secure.meetupstatic.com/photos/member/7/2/b/c/thumb_215729372.jpeg","member_name":"Tena Kerns"} | 1724949934 | 1524623875376 | {"event_name":"Blue Hills Buck Hill - Easy Pace / Moderate hike","event_id":"250084062","time":1525010400000,"event_url":"https://www.meetup.com/HikeBikeSocialClub/events/250084062/"} | null

1524624181126 | null | {"venue_name":"Community Wholeness Centre CWC","lon":-79.69191,"lat":44.38976,"venue_id":19966962} | public | no | 0 | {"member_id":222279178,"photo":"https://secure.meetupstatic.com/photos/member/d/3/f/c/thumb_273714268.jpeg","member_name":"Natalie Roy"} | 1724949935 | 1524623875430 | {"event_name":"Karate Class - Ken Shin Budo Kai","event_id":"kbsjtmyxgbnc","time":1525129200000,"event_url":"https://www.meetup.com/CWCBarrie/events/250120204/"} | null

Not sure what I am doing wrong but any suggestions are welcome.

1

1 Answers

1
votes

You're right, 'GROUP' is a keyword in KSQL. You're work around of renaming the field name in the CREATE STREAM statement won't work, as KSQL doesn't know that you're group_info column refers to the group field.

You can use the quotes around your columns to allow you to import the topic, (Currently, the identifier in the quotes needs to be uppercase, but this is a bug), e.g.

CREATE STREAM meetup_rsvp_raw 
(  venue varchar, 
   visibility varchar, 
   response varchar, 
   guests integer, 
   member varchar, 
   rsvp_id bigint, 
   mtime bigint, 
   event varchar, 
   "GROUP" varchar 
) WITH (KAFKA_TOPIC='meetup-rsvp', VALUE_FORMAT='JSON');

Note, that you'll also need to use quotes when selecting this field:

SELECT `GROUP` from meetup_rsvp_raw limit 5;

I've created a Github issue to track the lack of documentation in this area.

Let us know how you get on with this.

Thanks,

Andy