0
votes

I'm trying to pull data off my kafka topic and write it to HDFS, and appear to have my flume conf identical to what I've seen in several examples, but I can't seem to get around the below error. I can consume from the the topic through python, so I know I'm ok there. I'm on flume version 1.6.0 and java 9.0.1. What am I doing wrong to make it not accept the kafka topic?

09 Jul 2018 17:17:26,973 INFO  [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.loadChannels:145) -Creating channels
09 Jul 2018 17:17:26,984 INFO  [conf-file-poller-0] (org.apache.flume.channel.DefaultChannelFactory.create:42)  - Creating instance of channel kafka_hdfs_channel type memory
09 Jul 2018 17:17:26,989 INFO  [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.loadChannels:200)  - Created channel kafka_hdfs_channel

09 Jul 2018 17:17:26,989 INFO  [conf-file-poller-0] (org.apache.flume.source.DefaultSourceFactory.create:41)  - Creating instance of source kafka_source, type org.apache.flume.source.kafka.KafkaSource
09 Jul 2018 17:17:26,993 ERROR [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.loadSources:361)  - Source kafka_source has been removed due to an error during configuration
org.apache.flume.conf.ConfigurationException: Kafka topic must be specified.
    at org.apache.flume.source.kafka.KafkaSource.configure(KafkaSource.java:180)
    at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
    at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:326)
    at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:97)
    at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:514)
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:300)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
    at java.base/java.lang.Thread.run(Thread.java:844)}

And here is my flume config:

agentCDIS.sources = kafka_source
agentCDIS.channels = kafka_hdfs_channel
agentCDIS.sinks = hdfs_sink

agentCDIS.sources.kafka_source.type = org.apache.flume.source.kafka.KafkaSource
agentCDIS.sources.kafka_source.kafka.bootstrap.servers = 10.4.3.61:9092, 10.4.3.62:9092, 10.4.3.63:9092
agentCDIS.sources.kafka_source.kafka.topic = test
agentCDIS.sources.kafka_source.kafka.consumer.group.id = cn_flume_group
agentCDIS.sources.kafka_source.channels = kafka_hdfs_channel
agentCDIS.sources.kafka_source.interceptors = i1
agentCDIS.sources.kafka_source.interceptors.i1.type = timestamp
agentCDIS.sources.kafka_source.kafka.consumer.timeout.ms = 1000

agentCDIS.channels.kafka_hdfs_channel.type = memory
agentCDIS.channels.kafka_hdfs_channel.capacity = 10000
agentCDIS.channels.kafka_hdfs_channel.transactionCapacity = 1000

agentCDIS.sinks.hdfs_sink.type = hdfs
agentCDIS.sinks.hdfs_sink.hdfs.path = hdfs://10.4.16.16:8020/user/cnelson/kafka/%{topic}/%y-%m-%d
agentCDIS.sinks.hdfs_sink.hdfs.rollInterval = 5
agentCDIS.sinks.hdfs_sink.hdfs.rollSize = 0
agentCDIS.sinks.hdfs_sink.fileType = DataStream
agentCDIS.sinks.hdfs_sink.channel = kafka_hdfs_channel

agentCDIS.sinks.loggerSink.type = logger
agentCDIS.sinks.loggerSink.kafka_hdfs_channel = memoryChannel

agentCDIS.channels.memoryChannel.type = memory
agentCDIS.channels.memoryChannel.capacity = 100
1
I've tried every version of Topic topic TOPIC TOPICS topic Topics... I did just go back and try it again with kafka.topic just to be sure. Same error. - supahcraig
Well, according to the source code, it thinks your topic is null github.com/apache/flume/blob/release-1.6.0/flume-ng-sources/…, and it should be topic github.com/apache/flume/blob/release-1.6.0/flume-ng-sources/… - OneCricketeer
No argument. I've updated my initial post to reflect the correct syntax (what I posted was the 75th iteration of me trying things). Regardless, having the correct syntax yields the exact same error. - supahcraig

1 Answers

0
votes

I went through the post and the config a few times and noticed - you've mentioned that you are using Flume's version 1.6 and as per the documentation, the properties are slightly different. Could you please try the following:

  1. Instead of agentCDIS.sources.kafka_source.kafka.bootstrap.servers => try agentCDIS.sources.kafka_source.zookeeperConnect - the value for this property would be the zookeeper URI used by your Kafka cluster.
  2. Instead of agentCDIS.sources.kafka_source.kafka.topic = test => try agentCDIS.sources.kafka_source.topic = test
  3. Instead of agentCDIS.sources.kafka_source.kafka.consumer.group.id = cn_flume_group => try agentCDIS.sources.kafka_source.groupId = cn_flume_group

The above 3 properties that you've used in your config file were introduced from version 1.7.

I hope this helps!