I am assuming zookeeper & kafka in each user's own folder as 'kafkaflume'
There are 2 folders inside kafkaflume folder: one is zookeeper, another one is kafka
Here a configuration file also given : flumekafka.conf
You need to edit this file as per your requirements.
First need to start zookeeper:
open a terminal, go zookeeper folder and start it :
bin/zkServer.sh start
open another terminal, go kafka folder and start it as :
bin/kafka-server-start.sh config/server.properties
open another terminal, go kafka folder and start producer program as :
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic <topicname>
Now edit flumekafka.conf file as well as prepare hdfs folder for data loading.
Now run flume agent command from console.
=======================
Configuration file for Flume:=======================
# Name the components on this agent
agent.sources = r1
agent.sinks = k2
agent.channels = c1
# Describe/configure the source
agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.r1.zookeeperConnect = localhost:2181
agent.sources.r1.topic = <topicname>
agent.sources.r1.groupId = group1
agent.sources.r1.channels = c1
agent.sources.r1.interceptors = i1
agent.sources.r1.interceptors.i1.type = timestamp
agent.sources.r1.kafka.consumer.timeout.ms = 10000
# Describing/Configuring the sink
agent.sinks.k2.type = hdfs
agent.sinks.k2.hdfs.path = hdfs://localhost:8020/user/<username>/<foldername>/%y-%m-%d
agent.sinks.k2.hdfs.rollInterval = 5
agent.sinks.k2.hdfs.rollSize = 0
agent.sinks.k2.hdfs.rollCount = 0
agent.sinks.k2.hdfs.fileType = DataStream
agent.sinks.k2.channel = c1
# Describing/Configuring the channel agent.channels.MemChannel.type = memory
agent.channels.c1.type=memory
agent.channels.c1.capacity = 10000
agent.channels.c1.transactionCapacity = 1000