I configure a flume agent, which read from an FTP server and send files to hdfs sink. My big problem is, I wanna store files in hdfs with their original filename. I tried with Spooldir source and it works fine and able to store files in hdfs with their basename, but flume agent crush:
1) If a file is written to after being placed into the spooling directory, Flume will print an error to its log file and stop processing.
2) If a file name is reused at a later time, Flume will print an error to its log file and stop processing.
In fact, spooldir-source is not suitable for my use case. So, is there an idea how to make ftp source keep file name, subsequently, the hdfs stores files seperatly according to their names.
This is my agent:
agent.sources = r1
agent.channels = c1
agent.sinks = k
#configure ftp source
agent.sources.r1.type = org.keedio.flume.source.mra.source.Source
agent.sources.r1.client.source = sftp
agent.sources.r1.name.server = ip
agent.sources.r1.user = user
agent.sources.r1.password = pwd
agent.sources.r1.port = 22
agent.sources.r1.knownHosts = ~/.ssh/known_hosts
agent.sources.r1.work.dir = /DATA/flume_ftp_source
agent.sources.r1.fileHeader = true
agent.sources.r1.basenameHeader = true
agent.sources.r1.inputCharset = ISO-8859-1
agent.sources.r1.flushlines = true
#configure sink s1
agent.sinks.k.type = hdfs
agent.sinks.k.hdfs.path = hdfs://hostname:8020/user/admin/DATA/import_flume/agents/agent1/%Y/%m/%d/%H
agent.sinks.k.hdfs.filePrefix = %{basename}
agent.sinks.k.hdfs.rollCount = 0
agent.sinks.k.hdfs.rollInterval = 0
agent.sinks.k.hdfs.rollSize = 0
agent.sinks.k.hdfs.useLocalTimeStamp = true
agent.sinks.k.hdfs.batchsize = 1000000
agent.sinks.k.hdfs.fileType = DataStream
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000000
agent.channels.c1.transactionCapacity = 1000000
agent.sources.r1.channels = c1
agent.sinks.k.channel = c1