So, I have been trying to get a PubSub Kafka connector running for about a month now with various problems. I have reviewed many questions here about Kafka Connect and the Pubsub connector which have helped me get his far but I am stuck again. When I run this command:
.\bin\windows\connect-standalone.bat
.\etc\kafka\WorkerConfig.properties .\etc\kafka\configSink.properties .\etc\kafka\configSource.properties
I get a long list of errors linked here:
Right after it tries to start the rest server is when the errors "could not scan file [file name]..." start. I am unsure if I need to set the rest.host.name and rest.port because currently, for the standaloneConfig values, it reads
rest.host.name = null
Edit: After reviewing the log file for awhile, I found the following messages:
Kafka consumer created
Created connector CPSConnector
Initializing task CPSConnector-0 with config {connector.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector, task.class=com.google.pubsub.kafka.sink.CloudPubSubSinkTask, tasks.max=1, topics=, cps.project=kohls-sis-sandbox, name=CPSConnector, cps.topic=test-pubsub}
Task CPSConnector-0 threw an uncaught and unrecoverable exception
org.apache.kafka.connect.errors.ConnectException: Sink tasks require a list of topics.
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:202)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Edit: So, I fixed the above issue by adding topics=test in my configSink. The current error message is below. Does this indicate that you can only run either a sink connector or source connector?
Failed to create job for .\etc\kafka\configSource.properties
Stopping after connector error
java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.AlreadyExistsException: Connector CPSConnector already exists
at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:80)
at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:67)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:97)
Caused by: org.apache.kafka.connect.errors.AlreadyExistsException: Connector CPSConnector already exists
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:145)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:94)
In my WorkerConfig.properites, I have bootstrap.servers=localhost:2181. My property files are here.
I am not sure how to fix since I have my properties files set, made sure the cps-kakfa-connector.jar is in the class path. I also set plugin.path=\share\java\kafka\kafka-connect-pubsub.
If anyone can point me in the right direction to fix this issue, that would be great. I followed the directions here: https://github.com/GoogleCloudPlatform/pubsub/tree/master/kafka-connector