4
votes

I need some help with Flink Streaming. I have produced a simple Hello-world type of code below. This streams Avro messages from RabbitMQ and persists it to HDFS. I hope someone can review the code, and maybe it can help others.

Most examples I've found for Flink streaming sends results to std-out. I actually wanted to save the data to Hadoop. I read that, in theory, you can stream with Flink to wherever you like. I haven't found any example saving data to HDFS actually. But, based on the examples I did find, and trials and errors, I have come with the below code.

The source of the data, here, is RabbitMQ. I use a client app to send "MyAvroObjects" to RabbitMQ. MyAvroObject.java - not included - is generated from avro IDL... Can be any avro message.

The code below, consumes the RabbitMQ messages, and saves this to HDFS, as avro files... Well, that's what I hope.

package com.johanw.flink.stackoverflow;

import java.io.IOException;

import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroOutputFormat;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.FileSinkFunctionByMillis;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RMQToHadoop {
    public class MyDeserializationSchema implements DeserializationSchema<MyAvroObject> {
        private static final long serialVersionUID = 1L;

        @Override
        public TypeInformation<MyAvroObject> getProducedType() {
             return TypeExtractor.getForClass(MyAvroObject.class);
        }

        @Override
        public MyAvroObject deserialize(byte[] array) throws IOException {
            SpecificDatumReader<MyAvroObject> reader = new SpecificDatumReader<MyAvroObject>(MyAvroObject.getClassSchema());
            Decoder decoder = DecoderFactory.get().binaryDecoder(array, null);
            MyAvroObject MyAvroObject = reader.read(null, decoder);
            return MyAvroObject;
        }

        @Override
        public boolean isEndOfStream(MyAvroObject arg0) {
            return false;
        }
    }

    private String hostName;
    private String queueName;

    public final static String path = "/hdfsroot";

    private static Logger logger = LoggerFactory.getLogger(RMQToHadoop.class);

    public RMQToHadoop(String hostName, String queueName) {
        super();
        this.hostName = hostName;
        this.queueName = queueName;
    }

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    public void run() {
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        logger.info("Running " + RMQToHadoop.class.getName());
        DataStream<MyAvroObject> socketStockStream = env.addSource(new RMQSource<>(hostName, queueName, new MyDeserializationSchema()));
        Job job;
        try {
            job = Job.getInstance();
            AvroJob.setInputKeySchema(job, MyAvroObject.getClassSchema());
        } catch (IOException e1) {
            e1.printStackTrace();
        }

        try {
            JobConf jobConf = new JobConf(Job.getInstance().getConfiguration());
            jobConf.set("avro.output.schema", MyAvroObject.getClassSchema().toString());
            org.apache.avro.mapred.AvroOutputFormat<MyAvroObject> akof = new AvroOutputFormat<MyAvroObject>();
            HadoopOutputFormat<AvroWrapper<MyAvroObject>, NullWritable> hof = new HadoopOutputFormat<AvroWrapper<MyAvroObject>, NullWritable>(akof, jobConf);
            FileSinkFunctionByMillis<Tuple2<AvroWrapper<MyAvroObject>, NullWritable>> fileSinkFunctionByMillis = new FileSinkFunctionByMillis<Tuple2<AvroWrapper<MyAvroObject>, NullWritable>>(hof, 10000l);
            org.apache.hadoop.mapred.FileOutputFormat.setOutputPath(jobConf, new Path(path));

            socketStockStream.map(new MapFunction<MyAvroObject, Tuple2<AvroWrapper<MyAvroObject>, NullWritable>>() {
                private static final long serialVersionUID = 1L;
                @Override
                public Tuple2<AvroWrapper<MyAvroObject>, NullWritable> map(MyAvroObject envelope) throws Exception {
                    logger.info("map");
                    AvroKey<MyAvroObject> key = new AvroKey<MyAvroObject>(envelope);
                    Tuple2<AvroWrapper<MyAvroObject>, NullWritable> tupple = new Tuple2<AvroWrapper<MyAvroObject>, NullWritable>(key, NullWritable.get());
                    return tupple;
                }
            }).addSink(fileSinkFunctionByMillis);
            try {
                env.execute();
            } catch (Exception e) {
                logger.error("Error while running " + RMQToHadoop.class + ".", e);
            }
        } catch (IOException e) {
            logger.error("Error while running " + RMQToHadoop.class + ".", e);
        }
    }

    public static void main(String[] args) throws IOException {
        RMQToHadoop toHadoop = new RMQToHadoop("localhost", "rabbitTestQueue");
        toHadoop.run();
    }
}

If you prefer another source, other than RabbitMQ, then it works fine using another source instead. E.g. using a Kafka consumer:

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082;

...

DataStreamSource<MyAvroObject> socketStockStream = env.addSource(new FlinkKafkaConsumer082<MyAvroObject>(topic, new MyDeserializationSchema(), sourceProperties));

Questions:

  1. Please review. Is this good practice for saving data to HDFS?

  2. What if the process of streaming is causing an issue, say during serialisation. It generates and exception, and the code just exits. Spark streaming depends on Yarn automatically restarting the app. Is this also good practice when using Flink?

  3. I'm using the FileSinkFunctionByMillis. I was actually hoping to use something like a HdfsSinkFunction, but that doesn't exist. So the FileSinkFunctionByMillis was the closest to this, which made sense to me. Again the documentation that I found lacks any explanation what to do, so I'm only guessing.

  4. When I run this locally, then a I find a directory structure like "C:\hdfsroot_temporary\0_temporary\attempt__0000_r_000001_0", which is... basare. Any ideas here?

By the way, when you want to save the data to Kafka back, I was able to do so using...

Properties destProperties = new Properties();
destProperties.setProperty("bootstrap.servers", bootstrapServers);
FlinkKafkaProducer<MyAvroObject> kafkaProducer = new FlinkKafkaProducer<L3Result>("MyKafkaTopic", new MySerializationSchema(), destProperties);

Many thanks in advance!!!!

1

1 Answers

0
votes

I think FileSinkFunctionByMillis can be used but this would mean that your streaming program is not fault-tolerant. Meaning that if your sources or machines or writing fail then your program will crash without being able to recover.

I suggest you look at using the RollingSink (https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#hadoop-filesystem). This can be used to create Flum-like pipelines to ingest data into HDFS (or other file systems). The rolling sink is a recoverable sink, meaning that your program would be fault-tolerant since the Kafka consumer is also fault-tolerant. Also you can specify a custom Writer to write the data in any format you want, for example Avro.