1
votes

I'm very new to Confluent KSql but not new to Kafka. I have existing topics that exist in Kafka as Avro serialized data. I have the Confluent schema-registry up and running and configure KSql to point to the registry.

When I try to create a table baseed upon one of my topics KSql complains that it can't find the stream. When I try to create a stream in KSql that simply streams my topic within KSql there appears to be no way to point to my Avro serialized topic which has a reference in the registry.

Does anyone know how to attack these two problems? Is the way I want to use KSql not appropriate to what it can do?

UPDATE

Here's some more details

ksql> show topics;

 Kafka Topic                                                                                 | Registered | Partitions | Partition Replicas | Consumers | Consumer Groups
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 COM_FINDOLOGY_MODEL_REPORTING_OUTGOINGFEEDADVERTISERSEARCHDATA                              | false      | 2          | 2                  | 0         | 0
 COM_FINDOLOGY_MODEL_TRAFFIC_CPATRACKINGCALLBACK                                             | false      | 2          | 2                  | 0         | 0
 COM_FINDOLOGY_MODEL_TRAFFIC_ENTRYPOINTCLICK                                                 | true       | 10         | 3                  | 0         | 0

KSql config

#bootstrap.servers=localhost:9092
bootstrap.servers=host1:9092,host2:9092,host3:9092,host4:9092,host5:9092

#listeners=http://localhost:8088
listeners=http://localhost:59093

ksql.server.ui.enabled=true

ksql.schema.registry.url=http://host1:59092

Registry config

# The host name advertised in ZooKeeper. Make sure to set this if running Schema Registry with multiple nodes.
host.name: x.x.x.x
listeners=http://0.0.0.0:59092

# Zookeeper connection string for the Zookeeper cluster used by your Kafka cluster
# (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
#kafkastore.connection.url=localhost:2181

# Alternatively, Schema Registry can now operate without Zookeeper, handling all coordination via
# Kafka brokers. Use this setting to specify the bootstrap servers for your Kafka cluster and it
# will be used both for selecting the master schema registry instance and for storing the data for
# registered schemas.
# (Note that you cannot mix the two modes; use this mode only on new deployments or by shutting down
# all instances, switching to the new configuration, and then starting the schema registry
# instances again.)
kafkastore.bootstrap.servers=PLAINTEXT://host1:9092,PLAINTEXT://host2:9092,PLAINTEXT://host3:9092,PLAINTEXT://host4:9092,PLAINTEXT://host5:9092

# The name of the topic to store schemas in
kafkastore.topic=_schemas

# If true, API requests that fail will include extra debugging information, including stack traces
debug=false

Attempting to solve the problem by declaring an external topic

ksql> register  topic xxx with (value_format='avro', kafka_topic='COM_FINDOLOGY_MODEL_REPORTING_OUTGOINGFEEDADVERTISERSEARCHDATA');
You need to provide avro schema file path for topics in avro format.
2
Can you show your query statements? And the errors? - OneCricketeer
I've tried various things so apologies for flooding you with stuff - Allan Wax
ksql> register topic COM_FINDOLOGY_MODEL_REPORTING_OUTGOINGFEEDADVERTISERSEARCHDATA with (value_format='avro', kafka_topic='COM_FINDOLOGY_MODEL_REPORTING_OUTGOINGFEEDADVERTISERSEARCHDATA'); You need to provide avro schema file path for topics in avro format. - Allan Wax
explain CREATE TABLE possible_fraud AS SELECT clientIp, count() from COM_FINDOLOGY_MODEL_TRAFFIC_ENTRYPOINTCLICK WINDOW TUMBLING (SIZE 5 SECONDS) GROUP BY clientIp HAVING count() > 3; COM_FINDOLOGY_MODEL_TRAFFIC_ENTRYPOINTCLICK does not exist. - Allan Wax
ksql> select * from COM_FINDOLOGY_MODEL_TRAFFIC_ENTRYPOINTCLICK limit 3; COM_FINDOLOGY_MODEL_TRAFFIC_ENTRYPOINTCLICK does not exist. - Allan Wax

2 Answers

0
votes

REGISTER TOPIC is deprecated syntax. You should use CREATE STREAM (or CREATE TABLE, depending on your data access requirements).

So your statement would look something like this:

CREATE STREAM MY_STREAM_1 \
  WITH (VALUE_FORMAT='AVRO', \
  KAFKA_TOPIC='COM_FINDOLOGY_MODEL_REPORTING_OUTGOINGFEEDADVERTISERSEARCHDATA');

Note that I've used \ to break the lines for readability; you don't have to do this.

0
votes

I resolved the issue I had after changing what information I used from the Kafka topic, as opposed to using the whole topic contents. The topic contains Avro encoded data (ok) created by using ReflectionData. KSql has problems dealing with non-standard items in the stream but handles ReflectionData items as long as there is a corresponding KSql data type. I resolved this by creating a new stream in KSql that only selected out the items I needed that are also compatible with KSql. When that was done I could process what I needed from the larger stream.

Comment I see it as somewhat of a deficiency in KSql that you have to create new actual intermediary topic(s) in Kafka to process the data. I think a better solution is to treat the intermediary stream as a View into the actual stream. The need for intermediary topics to hold accumulations and processed items before resolving it as a KTable I understand.