0
votes

I am trying to connect MySQL with Kafka Connect and many errors are showing up. I am sharing my connect-standalone.properties and mysql-jdbc-connector.properties with the error showing up. My Kafka and MySQL is in different clusters and I am using confluent connector but not in confluent interface. I downloaded 4.1.0 JDBC MySQL confluent connector.

mysql-jdbc-connector.properties

name=source-mysql
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.url=jdbc:mysql://1**.**.*.29:3306/kconnect?user=bigdata&password=bigdata
connection.user=bigdata
connection.password=bigdata
task.max=10
mode=bulk
topic.prefix=mysql-jdbc-
poll.interval.ms=3600000

connect-standalone.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=Nifi-Staging:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples:
plugin.path=/usr/share/java

When running

bin/connect-standalone.sh config/connect-standalone.properties config/mysql-jdbc-connector.properties

The result is

 (io.confluent.connect.jdbc.source.JdbcSourceTaskConfig:347)
[2020-01-14 14:01:33,289] INFO WorkerSourceTask{id=source-mysql-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:200)
[2020-01-14 14:01:33,415] INFO [Producer clientId=connector-producer-source-mysql-0] Cluster ID: VgW2NunYREqVY5cHNS6snQ (org.apache.kafka.clients.Metadata:266)
[2020-01-14 14:01:43,610] INFO WorkerSourceTask{id=source-mysql-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:398)
[2020-01-14 14:01:43,611] INFO WorkerSourceTask{id=source-mysql-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:415)
[2020-01-14 14:01:44,319] ERROR WorkerSourceTask{id=source-mysql-0} Flush of offsets threw an unexpected exception:  (org.apache.kafka.connect.runtime.WorkerSourceTask:483)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.ConnectException: java.nio.file.AccessDeniedException: /tmp/connect.offsets
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:206)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:472)
        at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:111)
        at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:46)
        at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:84)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.nio.file.AccessDeniedException: /tmp/connect.offsets
        at org.apache.kafka.connect.storage.FileOffsetBackingStore.save(FileOffsetBackingStore.java:101)
        at org.apache.kafka.connect.storage.MemoryOffsetBackingStore$2.call(MemoryOffsetBackingStore.java:105)
        at org.apache.kafka.connect.storage.MemoryOffsetBackingStore$2.call(MemoryOffsetBackingStore.java:99)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        ... 3 more
Caused by: java.nio.file.AccessDeniedException: /tmp/connect.offsets
        at sun.nio.fs.UnixException.translateToIOException(UnixException.java:84)
        at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
        at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
        at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
        at java.nio.file.spi.FileSystemProvider.newOutputStream(FileSystemProvider.java:434)
        at java.nio.file.Files.newOutputStream(Files.java:216)
        at org.apache.kafka.connect.storage.FileOffsetBackingStore.save(FileOffsetBackingStore.java:92)
        ... 6 more
[2020-01-14 14:01:44,326] ERROR WorkerSourceTask{id=source-mysql-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:114)
2
you dont have persmission to write in /tmp/connect.offsets. look at your user and user permission of this directory.maxime G

2 Answers

1
votes

Welcome to StackOverflow :)

The error you see here:

java.nio.file.AccessDeniedException: /tmp/connect.offsets

Indicates the problem - the user under which the Kafka Connect process is running does not have permissions to write to the file /tmp/connect.offsets. Kafka Connect needs this file to store the progress of the connector. You should make that folder writeable by the user, and then restart the Kafka Connect worker.

0
votes

A quick solution:

chown cp-kafka-connect:confluent /tmp/connect.offsets

I think you need to manage the permissions. Either by granting permission to "cp-kafka-connect" over the directories of the sources you are pulling data from or by changing the UID and GID of systemd service "confluent-kafka-connect.service"