0
votes

I'm new using Apache Flume and I have hard to understand how it works exactly. In order to explain my problem, so I explain my need and what I've done.

I want to configure a stream between a directory of csv files (these files are builded every 5 minutes) and a HDFS cluster.

I identified that the "spooling directory" source and the HDFS sink are what I need. That's give me this flume.conf file

agent.sources = seqGenSrc
agent.channels = memoryChannel
agent.sinks = hdfsSink

# For each one of the sources, the type is defined
agent.sources.seqGenSrc.type = spooldir
agent.sources.seqGenSrc.spoolDir = /home/user/data

# The channel can be defined as follows.
agent.sources.seqGenSrc.channels = memoryChannel

# Each sink's type must be defined
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.hdfs.path = hdfs://localhost/Flume/data
agent.sinks.hdfsSink.hdfs.fileType = DataStream

agent.sinks.hdfsSink.hdfs.writeFormat=Text    

#Specify the channel the sink should use
agent.sinks.hdfsSink.channel = memoryChannel

# Each channel's type is defined.
agent.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 100

The result is that input files are renamed with ".complete" on my local filesystem and data is uploaded on HDFS with a new name I guess unique and generated by Flume.

It's almost what I needed.

But before uploading, I want to make some file specific operation (remove header, escape comma..). I don't know how to do it, I think about using an interceptor. But when data is in flume it's transformed in event and it' streamed. At his point, there no knowledge of file.

Otherwise, original time event is written in the filename, so I'd like that this time is associated with my event instead of the current date.

I want also to keep the orignal filename in hdfs (there are some useful information in it).

Anybody have an advice to help me ?

1

1 Answers

1
votes

The original file name can be preserved as a a header if you specify

agent.sources.seqGenSrc.fileHeader=true 

That can then be retrieved in your sink.

if you want to manipulate the data within your files, use an interceptor. You should be aware that an event is basically a line within a file in the spool directory.

Last but not least, you'll want to use the fileHeader property to pipe the events back to the right file. This can be achieved by specifying the path in your sink as follows:

agent.sinks.hdfsSink.hdfs.path = hdfs://localhost/Flume/data/%{file}

You can use Prefix and Suffix to further configure the file name:

hdfs.filePrefix FlumeData   Name prefixed to files created by Flume in hdfs directory
hdfs.fileSuffix –   Suffix to append to file (eg .avro - NOTE: period is not automatically added)