0
votes

So, I have been trying to get a PubSub Kafka connector running for about a month now with various problems. I have reviewed many questions here about Kafka Connect and the Pubsub connector which have helped me get his far but I am stuck again. When I run this command:

.\bin\windows\connect-standalone.bat 
.\etc\kafka\WorkerConfig.properties .\etc\kafka\configSink.properties .\etc\kafka\configSource.properties

I get a long list of errors linked here:

Right after it tries to start the rest server is when the errors "could not scan file [file name]..." start. I am unsure if I need to set the rest.host.name and rest.port because currently, for the standaloneConfig values, it reads

rest.host.name = null

Edit: After reviewing the log file for awhile, I found the following messages:

Kafka consumer created
Created connector CPSConnector
Initializing task CPSConnector-0 with config {connector.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector, task.class=com.google.pubsub.kafka.sink.CloudPubSubSinkTask, tasks.max=1, topics=, cps.project=kohls-sis-sandbox, name=CPSConnector, cps.topic=test-pubsub}
Task CPSConnector-0 threw an uncaught and unrecoverable exception
org.apache.kafka.connect.errors.ConnectException: Sink tasks require a list of topics.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:202)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:139)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Edit: So, I fixed the above issue by adding topics=test in my configSink. The current error message is below. Does this indicate that you can only run either a sink connector or source connector?

Failed to create job for .\etc\kafka\configSource.properties
Stopping after connector error
java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.AlreadyExistsException: Connector CPSConnector already exists
    at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:80)
    at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:67)
    at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:97)
Caused by: org.apache.kafka.connect.errors.AlreadyExistsException: Connector CPSConnector already exists
    at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:145)
    at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:94)

In my WorkerConfig.properites, I have bootstrap.servers=localhost:2181. My property files are here.

I am not sure how to fix since I have my properties files set, made sure the cps-kakfa-connector.jar is in the class path. I also set plugin.path=\share\java\kafka\kafka-connect-pubsub.

If anyone can point me in the right direction to fix this issue, that would be great. I followed the directions here: https://github.com/GoogleCloudPlatform/pubsub/tree/master/kafka-connector

1
A screengrab is a poor way to share the log file :) Please can you post it, either the salient messages inline here, or as a gist.github.com (or similar) that you provide the link to. Can you also share you connector configuration file. - Robin Moffatt
Thank you for the feedback! I will make the changes now. I am having a hard time determining which messages are important but I will put some I think are important. - Cody Ferguson
Ignore the "could not scan file" log messages. - Randall Hauch
Okay, I did that which helped me find message which I think best desribes the issue. I will edit the question to show this error message. Thanks! - Cody Ferguson

1 Answers

2
votes

Each Connector instance, whether it's a source or a sink, needs to have a unique name when you submit its configuration properties to a Kafka Connect cluster, or standalone worker.

In the above example, just name your Source differently than your Sink. For instance:

$ head -n 1 configSource.properties
name=CPSSourceConnector
$ head -n 1 configSink.properties
name=CPSSinkConnector

or, might as well:

$ head -n 1 configSource.properties
name=Tom
$ head -n 1 configSink.properties
name=Jerry