2
votes

I am testing flume to load data into hHase and thinking about parallel data loading with using flume's selector and inteceptor, because of speed gap between source and sink.

So, what I want to do with flume are

  1. creating Event's header with interceptors's regex_extractor type

  2. multiplexing Event with header to more than two channels with selector's multiplexing type

    in one source-channel-sink.

and tried configuration as below.


    agent.sources = tailsrc
    agent.channels = mem1 mem2
    agent.sinks = std1 std2
    agent.sources.tailsrc.type = exec
    agent.sources.tailsrc.command = tail -F /home/flumeuser/test/in.txt
    agent.sources.tailsrc.batchSize = 1
    
    agent.sources.tailsrc.interceptors = i1
    agent.sources.tailsrc.interceptors.i1.type = regex_extractor
    agent.sources.tailsrc.interceptors.i1.regex = ^(\\d)
    agent.sources.tailsrc.interceptors.i1.serializers = t1
    agent.sources.tailsrc.interceptors.i1.serializers.t1.name = type
    
    agent.sources.tailsrc.selector.type = multiplexing
    agent.sources.tailsrc.selector.header = type
    agent.sources.tailsrc.selector.mapping.1 = mem1
    agent.sources.tailsrc.selector.mapping.2 = mem2
    
    agent.sinks.std1.type = file_roll
    agent.sinks.std1.channel = mem1
    agent.sinks.std1.batchSize = 1
    agent.sinks.std1.sink.directory = /var/log/flumeout/1
    agent.sinks.std1.rollInterval = 0
    
    agent.sinks.std2.type = file_roll
    agent.sinks.std2.channel = mem2
    agent.sinks.std2.batchSize = 1
    agent.sinks.std2.sink.directory = /var/log/flumeout/2
    agent.sinks.std2.rollInterval = 0
    
    agent.channels.mem1.type = memory
    agent.channels.mem1.capacity = 100
    
    agent.channels.mem2.type = memory
    agent.channels.mem2.capacity = 100

But, it doesn't work!

when selector part is removed, there are some interceptor debugging message in flume's log. but when selector and interceptor are together, there are nothing.

Is there any wrong expression or something I missed?

Thanks for reading. :)

1

1 Answers

1
votes

I found it.

In the flume log, there are warning message as below.


    2013-10-10 16:34:20,514 (conf-file-poller-0) [WARN - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSources(FlumeConfiguration.java:571)] Removed tailsrc due to Failed to configure component!

so I had attached below line


    agent.sources.tailsrc.channels = mem1 mem2

and then It works!!!!