1
votes

I have a spooling directory where all json files are present,Incoming files will be added to this directory for every second, And i have to Deserialize incoming json files and fetch the requires fields and append it into HDFS directory.

What I did was I created a flume conf file where in it takes the files from spooling directory as a source and placed the json file directly into HDFS using 1 Sink.

I have to make this json into structure format before Sink and place it into HDFS. Most important thing that,it is not a twitter data. And I have to implement purely Flume.

I used the below flume configuration to get the job done:

agent_slave_1.channels.fileChannel1_1.type = file 
agent_slave_1.channels.fileChannel1_1.capacity = 200000
agent_slave_1.channels.fileChannel1_1.transactionCapacity = 1000
agent_slave_1.sources.source1_1.type = spooldir

agent_slave_1.sources.source1_1.spoolDir = /home/cloudera/runs/
agent_slave_1.sources.source1_1.fileHeader = false
agent_slave_1.sources.source1_1.fileSuffix = .COMPLETED
agent_slave_1.sinks.hdfs-sink1_1.type = hdfs
agent_slave_1.sinks.hdfs-sink1_1.hdfs.path =hdfs://localhost.localdomain:8020/user/cloudera/runs_scored/
agent_slave_1.sinks.hdfs-sink1_1.hdfs.batchSize = 1000
agent_slave_1.sinks.hdfs-sink1_1.hdfs.rollSize = 268435456
agent_slave_1.sinks.hdfs-sink1_1.hdfs.rollInterval = 0
agent_slave_1.sinks.hdfs-sink1_1.hdfs.rollCount = 50000000
agent_slave_1.sinks.hdfs-sink1_1.hdfs.writeFormat=Text

agent_slave_1.sinks.hdfs-sink1_1.hdfsfileType = DataStream
agent_slave_1.sources.source1_1.channels = fileChannel1_1
agent_slave_1.sinks.hdfs-sink1_1.channel = fileChannel1_1

agent_slave_1.sinks =  hdfs-sink1_1
agent_slave_1.sources = source1_1
agent_slave_1.channels = fileChannel1_1

But I don't know how to use deserializer.

Can someone help me with an idea how to deserialize the Incomming Json files? If I need to write any code in java please help me, what Interface I need to use? If possible give some hints.

1
did you find any answers for this.Please share.sp_user123
No. I am also looking for the same thing.user3782364

1 Answers

1
votes

The best guess is to write a custom interceptor which converts your JSON to the desired HDFS format. It also has the benefit of populating headers which can be used in your hdfs path.

Here's how to configure an interceptor:

agent_slave_1.sources.source1_1.interceptors = my_intercptor
agent_slave_1.sources.source1_1.interceptors.my_intercptor.type = com.mycompany.MyInteceptor

the class would look like this:

public class MyInteceptor implements Interceptor, Interceptor.Builder {

    private MyInteceptor interceptor;

    @Override
    public void initialize() {


    }

    @Override
    public Event intercept(Event event) {
        String bjson = event.getBody()));
        // decode your json, e.g. Jackson
        MyDecodedJsonObject record; // pseudo class
        event.getHeaders().put("timestamp", record.getTimestamp().toString());
        String newBody = record.getA() + "\t" + record.getB();
        event.setBody(newBody.getBytes())
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {

        for (Iterator<Event> iterator = events.iterator(); iterator.hasNext();) {
            Event next = intercept(iterator.next());
            if (next == null) {
                iterator.remove();
            }
        }
        return events;
    }

    @Override
    public void close() {


    }

    @Override
    public Interceptor build() {
        return interceptor;
    }

    @Override
    public void configure(Context context) {

        interceptor = new MyInteceptor();
    }

}

Don't forget to package this class in a jar and drop it into flume's lib directory.