1
votes

I configure a flume agent, which read from an FTP server and send files to hdfs sink. My big problem is, I wanna store files in hdfs with their original filename. I tried with Spooldir source and it works fine and able to store files in hdfs with their basename, but flume agent crush:

1) If a file is written to after being placed into the spooling directory, Flume will print an error to its log file and stop processing.

2) If a file name is reused at a later time, Flume will print an error to its log file and stop processing.

In fact, spooldir-source is not suitable for my use case. So, is there an idea how to make ftp source keep file name, subsequently, the hdfs stores files seperatly according to their names.

This is my agent:

agent.sources = r1
agent.channels = c1
agent.sinks = k

#configure ftp source
agent.sources.r1.type = org.keedio.flume.source.mra.source.Source
agent.sources.r1.client.source = sftp
agent.sources.r1.name.server = ip
agent.sources.r1.user = user
agent.sources.r1.password = pwd
agent.sources.r1.port = 22
agent.sources.r1.knownHosts = ~/.ssh/known_hosts
agent.sources.r1.work.dir = /DATA/flume_ftp_source
agent.sources.r1.fileHeader = true
agent.sources.r1.basenameHeader = true
agent.sources.r1.inputCharset = ISO-8859-1
agent.sources.r1.flushlines = true

#configure sink s1
agent.sinks.k.type = hdfs
agent.sinks.k.hdfs.path =  hdfs://hostname:8020/user/admin/DATA/import_flume/agents/agent1/%Y/%m/%d/%H
agent.sinks.k.hdfs.filePrefix = %{basename}
agent.sinks.k.hdfs.rollCount = 0
agent.sinks.k.hdfs.rollInterval = 0
agent.sinks.k.hdfs.rollSize = 0
agent.sinks.k.hdfs.useLocalTimeStamp = true
agent.sinks.k.hdfs.batchsize =    1000000
agent.sinks.k.hdfs.fileType = DataStream

agent.channels.c1.type = memory
agent.channels.c1.capacity =  1000000
agent.channels.c1.transactionCapacity =   1000000

agent.sources.r1.channels = c1
agent.sinks.k.channel = c1
2

2 Answers

0
votes

I just pushed a solution to the flume ftp github project :

KR, Philippe

Is there a trick on how to fix the fact that the property %{basename} is missing ?

0
votes

As I said, according the following code updating: Fix for Flume FTP source, here's how I used the %{basename} variable :

##############
# COMPONENTS #
##############
myPrj.sources = source_01
myPrj.channels = channel_01
myPrj.sinks = sink_01

############
# BINDINGS #
############
myPrj.sources.source_01.channels = channel_01
myPrj.sinks.sink_01.channel = channel_01

###########
# CHANNEL #
###########
myPrj.channels.channel_01.type = memory
myPrj.channels.channel_01.capacity = 10000
myPrj.channels.channel_01.transactionCapacity = 10000

##########
# SOURCE #
##########
myPrj.sources.source_01.type = org.keedio.flume.source.ftp.source.Source

myPrj.sources.source_01.client.source = ftp
myPrj.sources.source_01.name.server = 127.0.0.1
myPrj.sources.source_01.user = myPrj
myPrj.sources.source_01.password = myPrj
myPrj.sources.source_01.port = 21

#myPrj.sources.source_01.security.enabled = true
#myPrj.sources.source_01.security.cipher = TLS
#myPrj.sources.source_01.security.certificate.enabled = true
#myPrj.sources.source_01.path.keystore = /paht/to/keystore
#myPrj.sources.source_01.store.pass = the_keyStore_password 

myPrj.sources.source_01.run.discover.delay = 5000
myPrj.sources.source_01.flushlines = false
myPrj.sources.source_01.chunk.size = 33554432

myPrj.sources.source_01.folder = /home/foo/app-flume-ftp-hdfs
myPrj.sources.source_01.file.name = flume-ftp-hdfs.ser

myPrj.sources.source_01.fileHeader = true
myPrj.sources.source_01.basenameHeader = true

# Deserializer
myPrj.sources.source_01.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
myPrj.sources.source_01.deserializer.maxBlobLength = 33554432

########
# SINK #
########
#myPrj.sinks.sink_01.type = logger
myPrj.sinks.sink_01.type = hdfs
myPrj.sinks.sink_01.hdfs.path = /user/foo/ftp_source/%Y/%m/%d/%H/%M/%{basename}
myPrj.sinks.sink_01.hdfs.filePrefix = FTP_SOURCE
myPrj.sinks.sink_01.hdfs.useLocalTimeStamp = true
myPrj.sinks.sink_01.hdfs.rollCount = 0
myPrj.sinks.sink_01.hdfs.rollInterval = 0
myPrj.sinks.sink_01.hdfs.batchSize = 100

# Data compressed
#myPrj.sinks.sink_01.hdfs.rollSize = 33554432
#myPrj.sinks.sink_01.hdfs.codeC = gzip
#myPrj.sinks.sink_01.hdfs.fileType = CompressedStream

# Data no compressed
myPrj.sinks.sink_01.hdfs.rollSize = 33554432
myPrj.sinks.sink_01.hdfs.fileType = DataStream