1
votes

I am writing to hdfs using flume spool directory. Here is my code

 #initialize agent's source, channel and sink
agent.sources = test
agent.channels = memoryChannel
agent.sinks = flumeHDFS

# Setting the source to spool directory where the file exists
agent.sources.test.type = spooldir
agent.sources.test.spoolDir = /johir
agent.sources.test.fileHeader = false
agent.sources.test.fileSuffix = .COMPLETED

# Setting the channel to memory
agent.channels.memoryChannel.type = memory
# Max number of events stored in the memory channel
agent.channels.memoryChannel.capacity = 10000
# agent.channels.memoryChannel.batchSize = 15000
agent.channels.memoryChannel.transactioncapacity = 1000000

# Setting the sink to HDFS
agent.sinks.flumeHDFS.type = hdfs
agent.sinks.flumeHDFS.hdfs.path =/user/root/
agent.sinks.flumeHDFS.hdfs.fileType = DataStream

# Write format can be text or writable
agent.sinks.flumeHDFS.hdfs.writeFormat = Text

# use a single csv file at a time
agent.sinks.flumeHDFS.hdfs.maxOpenFiles = 1

# rollover file based on maximum size of 10 MB
agent.sinks.flumeHDFS.hdfs.rollCount=0
agent.sinks.flumeHDFS.hdfs.rollInterval=0
agent.sinks.flumeHDFS.hdfs.rollSize = 1000000
agent.sinks.flumeHDFS.hdfs.batchSize =1000

# never rollover based on the number of events
agent.sinks.flumeHDFS.hdfs.rollCount = 0

# rollover file based on max time of 1 min
#agent.sinks.flumeHDFS.hdfs.rollInterval = 0
# agent.sinks.flumeHDFS.hdfs.idleTimeout = 600

# Connect source and sink with channel
agent.sources.test.channels = memoryChannel
agent.sinks.flumeHDFS.channel = memoryChannel

But he problem is data being written to the file is renamed to some a random tmp name. How can I rename the file in hdfs to my original file name in the source directory. For example I have the file day1.txt, day2.txt,day3.txt. Those are data for two different days. I want keep them stored in hdfs as day1.txt,day2.txt,day3.txt. But these three files are merged and stored in hdfs as FlumeData.1464629158164.tmp file. Is there any way to do this?

1

1 Answers

0
votes

If you want to retain the original file name, you should attach the filename as a header to each event.

  1. Set the basenameHeader property to true. This will create a header with the basename key unless set to something else using the basenameHeaderKey property.
  2. Use the hdfs.filePrefix property to set the filename using basenameHeader values.

Add the below properties to your configuration file.

#source properties
agent.sources.test.basenameHeader = true

#sink properties
agent.sinks.flumeHDFS.type = hdfs
agent.sinks.flumeHDFS.hdfs.filePrefix = %{basename}