1
votes

Cannot create a Kafka -> Cassandra Sink connector using Ksqldb :

CREATE SINK CONNECTOR cassandra WITH( "connector.class" = 'io.confluent.connect.cassandra.CassandraSinkConnector', "tasks.max" = '1', "topics" = 'tst', "cassandra.contact.points" = 'cassandra', "cassandra.keyspace" = 'test', "cassandra.write.mode" = 'Update', "confluent.topic.bootstrap.servers" = 'kafka:9092' );

ERROR [CASS|worker] WorkerConnector{id=CASS} Error while starting connector (org.apache.kafka.connect.runtime.WorkerConnector:118)
    org.apache.kafka.connect.errors.ConnectException: Error while attempting to create/find topic(s) '_confluent-command'
            at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:262)
            at io.confluent.license.LicenseStore$1.run(LicenseStore.java:161)
            at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:128)
            at io.confluent.license.LicenseStore.start(LicenseStore.java:190)
            at io.confluent.license.LicenseManager.<init>(LicenseManager.java:155)
            at io.confluent.license.LicenseManager.<init>(LicenseManager.java:140)
            at io.confluent.connect.utils.licensing.ConnectLicenseManager$Builder.lambda$build$0(ConnectLicenseManager.java:210)
            at io.confluent.connect.utils.licensing.ConnectLicenseManager.registerOrValidateLicense(ConnectLicenseManager.java:255)
            at io.confluent.connect.cassandra.CassandraSinkConnector.doStart(CassandraSinkConnector.java:50)
            at io.confluent.connect.cassandra.CassandraSinkConnector.start(CassandraSinkConnector.java:45)
            at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:110)
            at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:135)
            at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:195)
            at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:257)
            at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1190)
            at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:126)
            at org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.call(DistributedHerder.java:1206)
            at org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.call(DistributedHerder.java:1202)
            at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.
            at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
            at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
            at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
            at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
            at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:229)
            ... 21 more
    Caused by: org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.
1
How do you start Kafka Connect and Kafka?Iskuskov Alexander

1 Answers

1
votes

The Confluent Cassandra sink connector Replication factor had a default value = 3. Modifying the default value in the connector config solved the problem!

"confluent.topic.replication.factor" = '1',