I'm currently writing a Scala application made of a Producer and a Consumer. The Producers get some data from and external source and writes em inside Kafka. The Consumer reads from Kafka and writes to Elasticsearch.
The consumer is based on Spark Streaming and every 5 seconds fetches new messages from Kafka and writes them to ElasticSearch. The problem is I'm not able to write to ES because I get a lot of errors like the one below :
ERROR] [2015-04-24 11:21:14,734] [org.apache.spark.TaskContextImpl]: Error in TaskCompletionListener org.elasticsearch.hadoop.EsHadoopException: Could not write all entries [3/26560] (maybe ES was overloaded?). Bailing out... at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:225) ~[elasticsearch-spark_2.10-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:236) ~[elasticsearch-spark_2.10-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.RestService$PartitionWriter.close(RestService.java:125) ~[elasticsearch-spark_2.10-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.spark.rdd.EsRDDWriter$$anonfun$write$1.apply$mcV$sp(EsRDDWriter.scala:33) ~[elasticsearch-spark_2.10-2.1.0.Beta3.jar:2.1.0.Beta3] at org.apache.spark.TaskContextImpl$$anon$2.onTaskCompletion(TaskContextImpl.scala:57) ~[spark-core_2.10-1.2.1.jar:1.2.1] at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:68) [spark-core_2.10-1.2.1.jar:1.2.1] at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:66) [spark-core_2.10-1.2.1.jar:1.2.1] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) [na:na] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) [na:na] at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66) [spark-core_2.10-1.2.1.jar:1.2.1] at org.apache.spark.scheduler.Task.run(Task.scala:58) [spark-core_2.10-1.2.1.jar:1.2.1] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) [spark-core_2.10-1.2.1.jar:1.2.1] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_65] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_65] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
Consider that the producer is writing 6 messages every 15 seconds so I really don't understand how this "overload" can possibly happen (I even cleaned the topic and flushed all old messages, I thought it was related to an offset issue). The task executed by Spark Streaming every 5 seconds can be summarized by the following code :
val result = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Map("wasp.raw" -> 1), StorageLevel.MEMORY_ONLY_SER_2)
val convertedResult = result.map(k => (k._1 ,AvroToJsonUtil.avroToJson(k._2)))
//TO-DO : Remove resource (yahoo/yahoo) hardcoded parameter
log.info(s"*** EXECUTING SPARK STREAMING TASK + ${java.lang.System.currentTimeMillis()}***")
convertedResult.foreachRDD(rdd => {
rdd.map(data => data._2).saveToEs("yahoo/yahoo", Map("es.input.json" -> "true"))
})
If I try to print the messages instead of sending to ES, everything is fine and I actually see only 6 messages. Why can't I write to ES?
For the sake of completeness, I'm using this library to write to ES : elasticsearch-spark_2.10 with the latest beta version.