1
votes

I am testing the Spark Streaming API. The application is deployed on an Amazon EMR cluster with Spark 1.4.0 I am sorting data and saving files in S3.

The code of the pipeline (except the sort algorithm) is detailed below :

public KinesisPreProcessPipeline(JavaStreamingContext jssc, final KinesisPreProcessModuleConfiguration moduleConfiguration) {
    JavaReceiverInputDStream<byte[]> inputDStream = KinesisUtils.createStream(jssc, moduleConfiguration.getAppName(), moduleConfiguration.getStreamName(),
            "kinesis." + moduleConfiguration.getRegion() + ".amazonaws.com", moduleConfiguration.getRegion(), InitialPositionInStream.LATEST,
            Durations.seconds(5), StorageLevel.MEMORY_AND_DISK_SER());

    JavaDStream<StreamingMessage> messageJavaDStream = inputDStream.map(new Function<byte[], StreamingMessage>() {
        @Override
        public StreamingMessage call(byte[] bytes) throws Exception {
            return jsonParser.fromJson(new String(bytes), StreamingMessage.class);
        }
    });

    final String destinationFolder = moduleConfiguration.getDestinationFolder();

    StreamingPreProcessPipeline pipeline = new StreamingPreProcessPipeline().withInputDStream(messageJavaDStream)
            .withPreProcessStep(new SortPreProcess());

    JavaDStream<StreamingMessage> output = pipeline.execute();

    output.checkpoint(Durations.seconds(moduleConfiguration.getBatchInterval() * 2));

    JavaDStream<String> messagesAsJson = output.map(new Function<StreamingMessage, String>() {
        @Override
        public String call(StreamingMessage message) throws Exception {
            return jsonParser.toJson(message);
        }
    });

    messagesAsJson.foreachRDD(new Function<JavaRDD<String>, Void>() {
        @Override
        public Void call(JavaRDD<String> rdd) throws Exception {
                rdd.saveAsTextFile(destinationFolder + "/" + dateFormat.print(new DateTime()) + "-" + rdd.id());
            return null;
        }
    });
}

When the application is run on a cluster, it fails fast with the following error.

15/07/17 13:17:36 ERROR executor.Executor: Exception in task 0.1 in stage 8.0 (TID 90) java.lang.IllegalArgumentException: Comparison method violates its general contract! at org.apache.spark.util.collection.TimSort$SortState.mergeLo(TimSort.java:776) at org.apache.spark.util.collection.TimSort$SortState.mergeAt(TimSort.java:507) at org.apache.spark.util.collection.TimSort$SortState.mergeCollapse(TimSort.java:435) at org.apache.spark.util.collection.TimSort$SortState.access$200(TimSort.java:307) at org.apache.spark.util.collection.TimSort.sort(TimSort.java:135) at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37) at org.apache.spark.util.collection.PartitionedPairBuffer.partitionedDestructiveSortedIterator(PartitionedPairBuffer.scala:70) at org.apache.spark.util.collection.ExternalSorter.partitionedIterator(ExternalSorter.scala:690) at org.apache.spark.util.collection.ExternalSorter.iterator(ExternalSorter.scala:708) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:64) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745

The error happens on the foreachRDD step but I'm still searching why it fails...

1
What is StreamingMessage?Justin Pihony
It is a java bean containing the message fields. Basically a bunch of String or Number fields and Maps of key/values (String).Fabien COMTE
could you post the code for StreamingMessage ? probably its equals and|or hashCode are incorrectly implemented.maasg
You were right, equals and compareTo were inconsistent in the class used for sorting. In one special case, compareTo returned 0 but equals would return false. After fixing this bug, the job is working as expected.Fabien COMTE

1 Answers

2
votes

The class used for sorting had a bug in the compareTo implementation. The javadoc for Comparable recommend to implement compareTo in a consistent way with equals(). After fixing this bug, the spark job works as expected.