0
votes

We are planning to use kafka flume-ng integration(Flafka) where flume is the consumer for kafka queues. Flume agents will receive files listing commands and their output as shown below:

root@host> [Command1]

[Output1]

root@host> [Command2]

[Output2]

The file may contain multiple commands and a command's output may be huge. We need to intercept the event (which is the file data) and split the event into multiple events based on commands. The source will then fan out the flow to multiple channel sending each sub-event to a channel(using multiplexing) and each sink will store the command info to respective Hive table. Is it possible to use fanout flow to split an event to multiple events? Or If I ask in other way, Can we split an event into multiple events in an interceptor?

I have read about regex extractor interceptor and serializer, but not sure if it can be of any help for this scenario.

3

3 Answers

1
votes

If I've understood well, you need the original event taken from a Kafka queue is splited into several, let's say, sub-events. And you want to know which piece of Flume could do that.

I think interceptors are not suitable for that purpose since interceptors are "placed" between the source and the channel, and they are designed to add, delete or modify the headers about a Flume event before putting it into the channel; as much, they can drop the entire event. But they are not able to generate several events based on other existing one.

I think you are looking for something like a handler attached to the source, able to interpret the events taken from Kafka and generating several Flume events at the source output. This concept is something similar to the handlers you can attach to a HTTPSoure (more details here). If such a thing is possible with your source, most probably you will have to develop your own custom handler, since the functionality you are required is very specific.

1
votes

Thanks for the reply frb.

I want to split the incoming event to a flume source to multiple sub-events and send them to respective channel. So the first flume node in the topology will route the each sub-event (using multiplexing) to a specific hop which can handle that kind of piece of information.

As per you reply, I understand that it can not be done using interceptor. Can you please share any example or documentation of handlers?

0
votes

Yes, flume cannot split event to multiple. Here is my alternative solution for this approach, take Kafka source for example.

First implement a source class that extends Kafka source, replace the default ChannelProcessor object.

public class XXXSplitSource extends KafkaSource {

    @Override
    public synchronized ChannelProcessor getChannelProcessor()
    {
        return new XXXYourChannelProcessorProxy(super.getChannelProcessor());
    }
}

Then, in the ChannelProcessor proxy implemention, your can split the event with your custom funtion.

public class XXXYourChannelProcessorProxy  extends ChannelProcessor {
    public ChannelProcessor  m_downstreamChannelProcessor = null;

    public XXXYourChannelProcessorProxy (ChannelSelector selector) {
        super(selector);
    }

    public XXXYourChannelProcessorProxy (ChannelProcessor processor) {
        super(null);
        m_downstreamChannelProcessor = processor;
    }

    @Override
    public void processEventBatch(List<Event> events) {
        List<Event> generatedEvents = YOUR_SPLIT_FUNCTION_HERE(events);
        m_downstreamChannelProcessor.processEventBatch(generatedEvents);    
    }
}