My kafka topics have millions of messages in them, unfortunately Spark Streaming seems to queue all of the messages no matter the time limit I get. I have set my standalone server with 16 cores and 64GB RAM, I've given my driver 12G and the Executor 12G of Memory.
16/09/14 19:27:45 WARN TaskSetManager: Lost task 4.0 in stage 1.0 (TID 7, 10.206.41.172): java.lang.OutOfMemoryError: GC overhead limit exceeded
at com.twitter.chill.Tuple4Serializer.read(TupleSerializers.scala:68)
at com.twitter.chill.Tuple4Serializer.read(TupleSerializers.scala:59)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:708)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:396)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:307)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:708)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:551)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790)
at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
at org.apache.spark.serializer.DeserializationStream.readValue(Serializer.scala:159)
at org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.readNextItem(ExternalAppendOnlyMap.scala:515)
at org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.hasNext(ExternalAppendOnlyMap.scala:535)
at scala.collection.Iterator$$anon$1.hasNext(Iterator.scala:1004)
at org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.org$apache$spark$util$collection$ExternalAppendOnlyMap$ExternalIterator$$readNextHashCode(ExternalAppendOnlyMap.scala:332)
at org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator$$anonfun$5.apply(ExternalAppendOnlyMap.scala:316)
at org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator$$anonfun$5.apply(ExternalAppendOnlyMap.scala:314)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.<init>(ExternalAppendOnlyMap.scala:314)
at org.apache.spark.util.collection.ExternalAppendOnlyMap.iterator(ExternalAppendOnlyMap.scala:288)
at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:43)
at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:91)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:695)
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:691)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:691)
at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:145)
at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
org.apache.spark.shuffle.FetchFailedException: /tmp/spark-e4238a07-bf89-4a7d-9de3-176cba0a076d/executor-93a11b25-1cb1-4e13-b1b6-b1d64d3a9602/blockmgr-c11cd046-7c37-429c-9137-936d391d3cbc/30/shuffle_0_0_0.index (No such file or directory)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:154)
at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:41)
at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:91)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException: /tmp/spark-e4238a07-bf89-4a7d-9de3-176cba0a076d/executor-93a11b25-1cb1-4e13-b1b6-b1d64d3a9602/blockmgr-c11cd046-7c37-429c-9137-936d391d3cbc/30/shuffle_0_0_0.index (No such file or directory)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.<init>(FileInputStream.java:138)
at org.apache.spark.shuffle.IndexShuffleBlockResolver.getBlockData(IndexShuffleBlockResolver.scala:192)
at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:278)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchLocalBlocks(ShuffleBlockFetcherIterator.scala:258)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:292)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:120)
at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45)
... 9 more
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)
Any recommendations on what I can do to handle this? Is my solution going to be just feed it more memory? I pointing at a shuffle issue because one exeuctor has:
Shuffle Read Size/Records: 815.5 MB / 27445419
Shuffle Spill (memory): 13GB
Shuffle Spill (Disk): 697.4 MB
I have no idea what it might be shuffling.
val messageStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, Int, Long, String)](ssc, getKafkaBrokers(), getKafkaTopics("raw"), (mmd: MessageAndMetadata[String, String]) => {
(mmd.topic, mmd.partition, mmd.offset, mmd.message)
})
//first step is to take our RDD of messages and group them by the topic (x._1) and partition (x._2)
messageStream.foreachRDD(x => x.groupBy(x => (x._1, x._2)).foreach(x => {
//Populate parameters based on the grouping and get the instruction sets to run on each message
val rawTopic: String = x._1._1
val partitionID: Int = x._1._2
val sourceSystemName: String = rawTopic.split("_")(0)
val cleanTopic = sourceSystemName + "_clean"
val topicID = getTopicID(rawTopic)
val schemaID = getPartitionByTopic(topicID).filter(x => x._1 == partitionID)(0)._2
val classpath = getClasspath(schemaID.toInt)
val classPath = bcClassMap.value.get(classpath)
val cleanerObj = classPath.newInstance()
val cleanMethod = classPath.getMethod("clean", Class.forName("java.lang.String"))
//For each message run the instruction sets we've gotten above
x._2.foreach(kafkaMessage => {
val offset: Long = kafkaMessage._3
val rawRecord: String = kafkaMessage._4
val cleanRecord = cleanMethod.invoke(cleanerObj, rawRecord).asInstanceOf[String]
if (cleanRecord != null) {
sendKafka(cleanRecord, cleanTopic, partitionID.toString)
}
offsetUpdate(schemaID, offset.toInt, topicID)
})
}))
groupBysmells bad. Is there anything that really justifies it? Since it looks like you process individual messages separately anyway what is the point of that? - zero323