I am trying to build a data pipeline from MySql to Ksql.
Use Case: data source is MySql. I have created a table in MySql.
I am using
./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./etc/kafka-connect-jdbc/source-quickstart-sqlite.properties
to start a standalone connector. And it is working fine.
I am starting the consumer with topic name i.e.
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1Category --from-beginning
when I am inserting the data in MySQL table I am getting the result in consumer as well. I have created KSQL Stream as will with the same topic name. I am expecting the same result in my Kstream as well, But i am not getting any result when i am doing
select * from <streamName>
Connector configuration--source-quickstart-mysql.properties
name=jdbc_source_mysql
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
connection.url=jdbc:mysql://localhost:3306/testDB?user=root&password=cloudera
#comment=Which table(s) to include
table.whitelist=ftest
mode=incrementing
incrementing.column.name=id
topic.prefix=ftopic
Sample Data
- MySql
1.) Create Database:
CREATE DATABASE testDB;
2.) Use Database:
USE testDB;
3.) create the table:
CREATE TABLE products (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512),
weight FLOAT
);
4.) Insert data into the table:
INSERT INTO products(id,name,description,weight)
VALUES (103,'car','Small car',20);
- KSQL
1.) Create Stream:
CREATE STREAM pro_original (id int, name varchar, description varchar,weight bigint) WITH \
(kafka_topic='proproducts', value_format='DELIMITED');
2.) Select Query:
Select * from pro_original;
Expected Output
- Consumer
getting the data which is inserted in the MySQL table.
Here I am getting the data in MySQL.
- Ksql
In-Stream data should be populated which is inserted in Mysql table and reflecting in Kafka topic.
I am not getting expected result in ksql
Help me for this data pipeline.