1
votes

I am relatively new to the flume interceptors concept and facing an issue where before applying the interceptor the file sinked is normal text file and after applying the interceptor everything turns really bad.

My interceptor code as below -

package com.flume;

import org.apache.flume.*;
import org.apache.flume.interceptor.*;

import java.util.List;
import java.util.Map;
import java.util.ArrayList;
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.net.UnknownHostException;

public class CustomHostInterceptor implements Interceptor {

    private String hostValue;
    private String hostHeader;

    public CustomHostInterceptor(String hostHeader){
        this.hostHeader = hostHeader;
    }

    @Override
    public void initialize() {
        // At interceptor start up
        try {
            hostValue =
                    InetAddress.getLocalHost().getHostName();
        } catch (UnknownHostException e) {
            throw new FlumeException("Cannot get Hostname", e);
        }
    }

    @Override
    public Event intercept(Event event) {

        // This is the event's body
        String body = new String(event.getBody());
        if(body.toLowerCase().contains("text")){
            try {
                event.setBody("hadoop".getBytes("UTF-8"));
            } catch (UnsupportedEncodingException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        // These are the event's headers
        Map<String, String> headers = event.getHeaders();

        // Enrich header with hostname
        headers.put(hostHeader, hostValue);

        // Let the enriched event go
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {

        List<Event> interceptedEvents =
                new ArrayList<Event>(events.size());
        for (Event event : events) {
            // Intercept any event
            Event interceptedEvent = intercept(event);
            interceptedEvents.add(interceptedEvent);
        }

        return interceptedEvents;
    }

    @Override
    public void close() {
        // At interceptor shutdown
    }

    public static class Builder
            implements Interceptor.Builder {

        private String hostHeader;

        @Override
        public void configure(Context context) {
            // Retrieve property from flume conf
            hostHeader = context.getString("hostHeader");
        }

        @Override
        public Interceptor build() {
            return new CustomHostInterceptor(hostHeader);
        }
    }
}

Flume conf is -

agent.sources=exec-source
agent.sinks=hdfs-sink
agent.channels=ch1

agent.sources.exec-source.type=exec
agent.sources.exec-source.command=tail -F /home/cloudera/Desktop/app.log
agent.sources.exec-source.interceptors = i1
agent.sources.exec-source.interceptors.i1.type = com.flume.CustomHostInterceptor$Builder
agent.sources.exec-source.interceptors.i1.hostHeader = hostname

agent.sinks.hdfs-sink.type=hdfs
agent.sinks.hdfs-sink.hdfs.path= hdfs://localhost:8020/bosch/flume/applogs
agent.sinks.hdfs-sink.hdfs.filePrefix=logs
agent.sinks.hdfs-sink.hdfs.rollInterval=60
agent.sinks.hdfs-sink.hdfs.rollSize=0

agent.channels.ch1.type=memory
agent.channels.ch1.capacity=1000

agent.sources.exec-source.channels=ch1
agent.sinks.hdfs-sink.channel=ch1

on doing a cat on the file created in HDFS -

SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable���*q�CJv�/ESmP�ź
                                                                                           some textP�żc
                                                                                                           some more textP���K
                                                                                                                             textP��ߌangels and deamonsP��%�
          text bla blaP��1�angels and deamonsP��1�
                                                     testP��1�hmmmP��1�anything

Any suggestions?

Thanks

1

1 Answers

0
votes

Looks like nothing Wrong with Interceptor.

In your Flume Agent config.

You are not specifying this property (hdfs.fileType) so it is taking this as a default SequenceFile

Try adding this line to your HDFS SINK and let me know if this works.

agent.sinks.hdfs-sink.hdfs.fileType=DataStream