0
votes

I want to use filepulse connector to load xml files to kafka.

below are my environment:

  • Win10 WSL, installed Ubuntu
  • downloaded the confluent platform 5.5.1 (see "https://www.confluent.io/download/"), unpacked
  • downloaded zip file version 1.5.2 from github (https://github.com/streamthoughts/kafka-connect-file-pulse/releases), unzipped
  • modified the "connect-standalone.properties" located under confluent path (etc/kafka/connect-standalone.properties) to include path "/home/min/streamthoughts-kafka-connect-file-pulse-1.5.2/lib"
  • I did not create any topic.

I started zookeeper; kafka, and tried to start kafka-connect standalone like below:

$ zookeeper-server-start etc/kafka/zookeeper.properties
$ kafka-server-start etc/kafka/server.properties
$ connect-standalone \
etc/kafka/connect-standalone.properties \
/home/min/streamthoughts-kafka-connect-file-pulse-1.5.2/etc/quickstart-connect-file-pulse-csv.properties

but I have failures see below

[2020-09-08 15:57:45,522] INFO Loading plugin from: /home/min/streamthoughts-kafka-connect-file-pulse-1.5.2/lib/kafka-connect-file-pulse-expression-1.5.2.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:239)
[2020-09-08 15:57:45,541] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/min/streamthoughts-kafka-connect-file-pulse-1.5.2/lib/kafka-connect-file-pulse-expression-1.5.2.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:262)
[2020-09-08 15:57:45,541] INFO Loading plugin from: /home/min/streamthoughts-kafka-connect-file-pulse-1.5.2/lib/kafka-connect-file-pulse-filters-1.5.2.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:239)
[2020-09-08 15:57:45,553] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/min/streamthoughts-kafka-connect-file-pulse-1.5.2/lib/kafka-connect-file-pulse-filters-1.5.2.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:262)
[2020-09-08 15:57:45,554] INFO Loading plugin from: /home/min/streamthoughts-kafka-connect-file-pulse-1.5.2/lib/kafka-connect-file-pulse-plugin-1.5.2.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:239)
[2020-09-08 15:57:45,575] ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectStandalone:130)
java.lang.NoClassDefFoundError: io/streamthoughts/kafka/connect/filepulse/offset/OffsetManager
    at java.lang.Class.getDeclaredConstructors0(Native Method)
    at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
    at java.lang.Class.getConstructor0(Class.java:3075)
    at java.lang.Class.newInstance(Class.java:412)
    at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.versionFor(DelegatingClassLoader.java:385)
    at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:355)
    at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:328)
    at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:261)
    at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:253)
    at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:222)
    at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:199)
    at org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:60)
    at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:79)
Caused by: java.lang.ClassNotFoundException: io.streamthoughts.kafka.connect.filepulse.offset.OffsetManager
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 13 more

question Can you please help what could be the fix?

ps. below is what I tried for jdbc connector, just to see whether other connectors are working or not. It did not have exceptions.

connect-standalone \
etc/kafka/connect-standalone.properties \
/home/min/confluent-5.5.1/etc/kafka-connect-jdbc/sink-quickstart-sqlite.properties

pps. below is the content of file-pulse connector (I did not make changes)

connector.class=io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector
topic=connect-file-pulse-quickstart-csv
tasks.max=1

filters=ParseDelimitedRow

# Delimited Row filter
filters.ParseDelimitedRow.extractColumnName=headers
filters.ParseDelimitedRow.trimColumn=true
filters.ParseDelimitedRow.type=io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter
skip.headers=1
task.reader.class=io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader

# File scanning
fs.cleanup.policy.class=io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy
fs.scanner.class=io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker
fs.scan.directory.path=/tmp/kafka-connect/examples/
fs.scan.interval.ms=10000

# Internal Reporting
internal.kafka.reporter.bootstrap.servers=localhost:9092
internal.kafka.reporter.id=connect-file-pulse-quickstart-csv
internal.kafka.reporter.topic=connect-file-pulse-status

# Track file by name and hash
offset.strategy=name+hash

ppps. below are the key info for the connect-standalone.properties file (I added the /home/min/streamthoughts-kafka-connect-file-pulse-1.5.2/lib)


# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include 
# any combination of: 
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples: 
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,

plugin.path=/usr/share/java,/home/min/streamthoughts-kafka-connect-file-pulse-1.5.2/lib
# plugin.path=/usr/share/java,/home/min/confluent-5.5.1/share/java/kafka-connect-jdbc
1
initially, I tried to download the filepulse lower version (streamthoughts-kafka-connect-file-pulse-1.5.0/), from here confluent.io/hub/streamthoughts/kafka-connect-file-pulse. I thought download higher version from github would have helped.soMuchToLearnAndShare
for the jdbc connector came with the confluent platform, after successfully start, i also verified below $ curl -s localhost:8083/connector-plugins|jq '.[].class' and receive below results "io.confluent.connect.jdbc.JdbcSinkConnector" "io.confluent.connect.jdbc.JdbcSourceConnector" "org.apache.kafka.connect.file.FileStreamSinkConnector" "org.apache.kafka.connect.file.FileStreamSourceConnector" "org.apache.kafka.connect.mirror.MirrorCheckpointConnector" "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector" "org.apache.kafka.connect.mirror.MirrorSourceConnector"soMuchToLearnAndShare
Put /home/min/streamthoughts-kafka-connect-file-pulse-1.5.2 next to the jdbc folder and try againOneCricketeer
I checked the source code, the not found class does exist in connect-file-pulse-api jar. How can I force the class path?soMuchToLearnAndShare
Is there a particular reason not to use `confluent-hub install instead of downloading from Github? Also, I've seen that developer be responsive on Github issues, so you might want to ask thereOneCricketeer

1 Answers

1
votes

I would not call this one a perfect answer, but it is just the way I made it working.

I found the plugin.path property for the 'file-pulse' connector has to be the parent folder of the jar files.

so below worked.

$ cat etc/kafka/connect-standalone.properties

key info extracted:

plugin.path=/user/share/java,/home/min/streamthoughts-kafka-connect-file-pulse-1.5.2
# plugin.path=/usr/share/java,/home/min/confluent-5.5.1/share/java/kafka-connect-jdbc

before it was like below, and throw exception as I mentioned.

plugin.path=/user/share/java,/home/min/streamthoughts-kafka-connect-file-pulse-1.5.2/lib
# plugin.path=/usr/share/java,/home/min/confluent-5.5.1/share/java/kafka-connect-jdbc

maybe I did not understand below instructions in the connect-standalone.properties property file, or maybe the file-pulse connectors did not follow the standards/instructions stated here. certainly, the JDBC connectors the path is the path containing the JAR files.

# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies