1
votes

I am trying to build a data pipeline from MySql to Ksql.

Use Case: data source is MySql. I have created a table in MySql.

I am using

./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties  ./etc/kafka-connect-jdbc/source-quickstart-sqlite.properties 

to start a standalone connector. And it is working fine.

I am starting the consumer with topic name i.e.

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1Category --from-beginning

when I am inserting the data in MySQL table I am getting the result in consumer as well. I have created KSQL Stream as will with the same topic name. I am expecting the same result in my Kstream as well, But i am not getting any result when i am doing

select * from <streamName>

Connector configuration--source-quickstart-mysql.properties

    name=jdbc_source_mysql
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

connection.url=jdbc:mysql://localhost:3306/testDB?user=root&password=cloudera

#comment=Which table(s) to include
table.whitelist=ftest

mode=incrementing
incrementing.column.name=id

topic.prefix=ftopic

Sample Data

  • MySql

1.) Create Database:

CREATE DATABASE testDB;

2.) Use Database:

USE testDB;

3.) create the table:

    CREATE TABLE products (
  id INTEGER NOT NULL PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  description VARCHAR(512),
  weight FLOAT
);

4.) Insert data into the table:

    INSERT INTO products(id,name,description,weight)
  VALUES (103,'car','Small car',20);
  • KSQL

1.) Create Stream:

CREATE STREAM pro_original (id int, name varchar, description varchar,weight bigint) WITH \
(kafka_topic='proproducts', value_format='DELIMITED');

2.) Select Query:

Select * from pro_original;

Expected Output

  1. Consumer

getting the data which is inserted in the MySQL table.

Here I am getting the data in MySQL.

  1. Ksql

In-Stream data should be populated which is inserted in Mysql table and reflecting in Kafka topic.

I am not getting expected result in ksql

Help me for this data pipeline.

1
How did you create your stream? Also, it would be useful if you share some sample data.Giorgos Myrianthous
@GiorgosMyrianthous I have mentioned all the sample data which i am using to create MySQL table and Kstream. I am getting the result in Kafka consumer but it is not reflecting in streamuser9518134
Can you post your connector's configuration as well? It would be useful in order to check the format of your values.Giorgos Myrianthous
@GiorgosMyrianthous I have added the source-quickstart-mysql.properties configuration. please check. If you need any more details let me knowuser9518134
See my answer. Hope it helps.Giorgos Myrianthous

1 Answers

2
votes

Your data is in AVRO format but in the VALUE_FORMAT instead of AVRO you've defined DELIMITED. It is important to instruct KSQL the format of the values that are stored in the topic. The following should do the trick for you.

CREATE STREAM pro_original_v2 \ 
WITH (KAFKA_TOPIC='products', VALUE_FORMAT='AVRO');

Data inserted into kafka topic after executing

SELECT * FROM pro_original_v2;

should now be visible in your ksql console window.

You can have a look at some Avro examples in KSQL here.