1
votes

I'm new to Flume. I have a large CSV text file with records in it, each about 50 characters in length, with CR-LF terminating the lines. I'd like to use Flume to ingest this data into HDFS. The result is that only one of the lines of the file gets written to HDFS (and it's the second line of the file, if that clue helps.)

I'm not seeing any errors in the output. Thanks. Details below.

Here's my execution command:

flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console

And my config:

a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1

a1.sources.r1.type = exec
a1.sources.r1.shell = /bin/bash -c
a1.sources.r1.command = cat /Users/david/flume/testdata.txt
a1.sources.r1.interceptors = a
a1.sources.r1.interceptors.a.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

# Describe the sinks
a1.sinks.k1.type = logger

a1.sinks.k2.type = hdfs
a1.sinks.k2.channel = c1
a1.sinks.k2.hdfs.fileType = DataStream
a1.sinks.k2.hdfs.writeFormat = Text
# a1.sinks.k2.hdfs.path = /flume
a1.sinks.k2.hdfs.path = /flume/trades/%y-%m-%d/%H%M/%S

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

And the output:

13/07/03 22:15:34 INFO lifecycle.LifecycleSupervisor: Starting lifecycle supervisor 1
13/07/03 22:15:34 INFO node.FlumeNode: Flume node starting - a1
13/07/03 22:15:34 INFO nodemanager.DefaultLogicalNodeManager: Node manager starting
13/07/03 22:15:34 INFO lifecycle.LifecycleSupervisor: Starting lifecycle supervisor 9
13/07/03 22:15:34 INFO properties.PropertiesFileConfigurationProvider: Configuration provider starting
13/07/03 22:15:34 INFO properties.PropertiesFileConfigurationProvider: Reloading configuration file:example.conf
13/07/03 22:15:34 INFO conf.FlumeConfiguration: Processing:k2
13/07/03 22:15:34 INFO conf.FlumeConfiguration: Processing:k1
13/07/03 22:15:34 INFO conf.FlumeConfiguration: Processing:k1
13/07/03 22:15:34 INFO conf.FlumeConfiguration: Processing:k2
13/07/03 22:15:34 INFO conf.FlumeConfiguration: Processing:k2
13/07/03 22:15:34 INFO conf.FlumeConfiguration: Processing:k2
13/07/03 22:15:34 INFO conf.FlumeConfiguration: Added sinks: k1 k2 Agent: a1
13/07/03 22:15:34 INFO conf.FlumeConfiguration: Processing:k2
13/07/03 22:15:34 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration  for agents: [a1]
13/07/03 22:15:34 INFO properties.PropertiesFileConfigurationProvider: Creating channels
13/07/03 22:15:34 INFO instrumentation.MonitoredCounterGroup: Monitoried counter group for type: CHANNEL, name: c1, registered successfully.
13/07/03 22:15:34 INFO properties.PropertiesFileConfigurationProvider: created channel c1
13/07/03 22:15:34 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: logger
13/07/03 22:15:34 INFO sink.DefaultSinkFactory: Creating instance of sink: k2, type: hdfs
2013-07-03 22:15:34.777 java[5903:5703] Unable to load realm info from SCDynamicStore
13/07/03 22:15:34 INFO hdfs.HDFSEventSink: Hadoop Security enabled: false
13/07/03 22:15:34 INFO instrumentation.MonitoredCounterGroup: Monitoried counter group for type: SINK, name: k2, registered successfully.
13/07/03 22:15:34 INFO nodemanager.DefaultLogicalNodeManager: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@23dae5f1 counterGroup:{ name:null counters:{} } }, k2=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@782e439a counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
13/07/03 22:15:34 INFO nodemanager.DefaultLogicalNodeManager: Starting Channel c1
13/07/03 22:15:34 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
13/07/03 22:15:34 INFO nodemanager.DefaultLogicalNodeManager: Starting Sink k1
13/07/03 22:15:34 INFO nodemanager.DefaultLogicalNodeManager: Starting Sink k2
13/07/03 22:15:34 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k2 started
13/07/03 22:15:34 INFO nodemanager.DefaultLogicalNodeManager: Starting Source r1
13/07/03 22:15:34 INFO source.ExecSource: Exec source starting with command:cat /Users/david/flume/testdata.txt
13/07/03 22:15:34 INFO source.ExecSource: Command [cat /Users/david/flume/testdata.txt] exited with 0
13/07/03 22:15:34 INFO sink.LoggerSink: Event: { headers:{timestamp=1372904134964} body: 30 30 32 37 38 35 41 39 2C 30 38 2F 30 35 2F 32 002785A9,08/05/2 }
13/07/03 22:15:34 INFO sink.LoggerSink: Event: { headers:{timestamp=1372904134964} body: 30 30 32 37 38 35 41 39 2C 30 38 2F 30 34 2F 32 002785A9,08/04/2 }
13/07/03 22:15:34 INFO sink.LoggerSink: Event: { headers:{timestamp=1372904134964} body: 30 30 32 37 38 35 41 39 2C 31 30 2F 30 37 2F 32 002785A9,10/07/2 }
13/07/03 22:15:34 INFO sink.LoggerSink: Event: { headers:{timestamp=1372904134964} body: 30 30 32 37 38 35 41 39 2C 31 30 2F 30 34 2F 32 002785A9,10/04/2 }
13/07/03 22:15:34 INFO sink.LoggerSink: Event: { headers:{timestamp=1372904134964} body: 30 30 32 37 38 35 41 39 2C 31 30 2F 30 35 2F 32 002785A9,10/05/2 }
13/07/03 22:15:34 INFO sink.LoggerSink: Event: { headers:{timestamp=1372904134964} body: 30 30 32 37 38 35 41 39 2C 31 30 2F 30 36 2F 32 002785A9,10/06/2 }
13/07/03 22:15:34 INFO sink.LoggerSink: Event: { headers:{timestamp=1372904134964} body: 30 30 32 37 37 38 38 36 2C 31 30 2F 30 34 2F 32 00277886,10/04/2 }
13/07/03 22:15:34 INFO sink.LoggerSink: Event: { headers:{timestamp=1372904134964} body: 30 30 30 30 35 44 42 33 2C 30 39 2F 31 39 2F 32 00005DB3,09/19/2 }
13/07/03 22:15:35 INFO hdfs.BucketWriter: Creating /flume/trades/13-07-03/2215/34/FlumeData.1372904134974.tmp
13/07/03 22:16:05 INFO hdfs.BucketWriter: Renaming /flume/trades/13-07-03/2215/34/FlumeData.1372904134974.tmp to /flume/trades/13-07-03/2215/34/FlumeData.1372904134974
3
You may get a more reliable result using the spooling directory source. This is designed to monitor a directory for new files and then forward them via Flume. Note that the files need to never change. This source complains bitterly if they do. - Sarge

3 Answers

0
votes

I figure this out. Turns out I needed to be more specific in my configuration file, which now looks like this:

# Example command to run:
# flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console

# Name the components on this agent
a1.sources = r1
a1.sinks =  k2
a1.channels = c1

# Describe/configure the source
# a1.sources.r1.type = netcat
# a1.sources.r1.bind = localhost
# a1.sources.r1.port = 44444
a1.sources.r1.type = exec
a1.sources.r1.shell = /bin/bash -c
a1.sources.r1.command = cat /Users/david/flume/testdata.txt
a1.sources.r1.interceptors = a
a1.sources.r1.interceptors.a.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

# Describe the sinks
a1.sinks.k1.type = logger

a1.sinks.k2.type = hdfs
a1.sinks.k2.channel = c1
a1.sinks.k2.hdfs.fileType = DataStream
a1.sinks.k2.hdfs.batchSize = 2000
a1.sinks.k2.hdfs.rollCount = 5000
a1.sinks.k2.hdfs.rollSize = 1000000
a1.sinks.k2.hdfs.rollInterval = 10
a1.sinks.k2.hdfs.writeFormat = Text
# a1.sinks.k2.hdfs.path = /flume
a1.sinks.k2.hdfs.path = /flume/trades/%y-%m-%d/%H%M/%S

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
0
votes

I also got the same problem. I think once source filled the channel & it sinks to hdfs. somehow that channel doesn't become free so that it's not displaying other part into hdfs.

-1
votes

if you want to get streamed data from cat file ,cat file must be in flume-ng -->conf directory.Then only you will be able to execute.