3
votes

I'm currently writing a Scala application made of a Producer and a Consumer. The Producers get some data from and external source and writes em inside Kafka. The Consumer reads from Kafka and writes to Elasticsearch.

The consumer is based on Spark Streaming and every 5 seconds fetches new messages from Kafka and writes them to ElasticSearch. The problem is I'm not able to write to ES because I get a lot of errors like the one below :

ERROR] [2015-04-24 11:21:14,734] [org.apache.spark.TaskContextImpl]: Error in TaskCompletionListener org.elasticsearch.hadoop.EsHadoopException: Could not write all entries [3/26560] (maybe ES was overloaded?). Bailing out... at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:225) ~[elasticsearch-spark_2.10-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:236) ~[elasticsearch-spark_2.10-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.RestService$PartitionWriter.close(RestService.java:125) ~[elasticsearch-spark_2.10-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.spark.rdd.EsRDDWriter$$anonfun$write$1.apply$mcV$sp(EsRDDWriter.scala:33) ~[elasticsearch-spark_2.10-2.1.0.Beta3.jar:2.1.0.Beta3] at org.apache.spark.TaskContextImpl$$anon$2.onTaskCompletion(TaskContextImpl.scala:57) ~[spark-core_2.10-1.2.1.jar:1.2.1] at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:68) [spark-core_2.10-1.2.1.jar:1.2.1] at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:66) [spark-core_2.10-1.2.1.jar:1.2.1] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) [na:na] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) [na:na] at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66) [spark-core_2.10-1.2.1.jar:1.2.1] at org.apache.spark.scheduler.Task.run(Task.scala:58) [spark-core_2.10-1.2.1.jar:1.2.1] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) [spark-core_2.10-1.2.1.jar:1.2.1] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_65] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_65] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]

Consider that the producer is writing 6 messages every 15 seconds so I really don't understand how this "overload" can possibly happen (I even cleaned the topic and flushed all old messages, I thought it was related to an offset issue). The task executed by Spark Streaming every 5 seconds can be summarized by the following code :

  val result = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Map("wasp.raw" -> 1), StorageLevel.MEMORY_ONLY_SER_2)
  val convertedResult = result.map(k => (k._1 ,AvroToJsonUtil.avroToJson(k._2)))



  //TO-DO : Remove resource (yahoo/yahoo) hardcoded parameter
  log.info(s"*** EXECUTING SPARK STREAMING TASK  + ${java.lang.System.currentTimeMillis()}***")


  convertedResult.foreachRDD(rdd => {
      rdd.map(data => data._2).saveToEs("yahoo/yahoo", Map("es.input.json" -> "true"))

  })

If I try to print the messages instead of sending to ES, everything is fine and I actually see only 6 messages. Why can't I write to ES?

For the sake of completeness, I'm using this library to write to ES : elasticsearch-spark_2.10 with the latest beta version.

6
I was getting the same error trying to write back a large table to ES from a spark dataframe (not streaming though). My default setup was using 100 executors so basically 100 concurrent connections to our smallish ES cluster. The solution that worked for me was to repartition the dataframe into a small number of partitions (10 in my case), to limit the max number of concurrent connections spark could make.patricksurry

6 Answers

3
votes

I found, after many retries, a way to write to ElasticSearch without getting any error. Basically passing the parameter "es.batch.size.entries" -> "1" to the saveToES method solved the problem. I don't understand why using the default or any other batch size leads to the aforementioned error considering that I would expect an error message if I'm trying to write more stuff than the allowed max batch size, not less.

Moreover I've noticed that actually I was writing to ES but not all my messages, I was losing between 1 and 3 messages per batch.

2
votes

When I pushed dataframe to ES on Spark, I had the same error message. Even with "es.batch.size.entries" -> "1" configuration,I had the same error. Once I increased thread pool in ES, I could figure out this issue.

for example,

Bulk pool

threadpool.bulk.type: fixed
threadpool.bulk.size: 600
threadpool.bulk.queue_size: 30000
2
votes

Like it was already mentioned here, this is a document write conflict.

Your convertedResult data stream contains multiple records with the same id. When written to elastic as part of the same batch produces the error above.

Possible solutions:

  1. Generate unique id for each record. Depending on your use case it can be done in a few different ways. As example, one common solution is to create a new field by combining the id and lastModifiedDate fields and use that field as id when writing to elastic.
  2. Perform de-duplication of records based on id - select only one record with particular id and discard other duplicates. Depending on your use case, this could be the most current record (based on time stamp field), most complete (most of the fields contain data), etc.

The #1 solution will store all records that you receive in the stream.

The #2 solution will store only the unique records for a specific id based on your de-duplication logic. This result would be the same as setting "es.batch.size.entries" -> "1", except you will not limit the performance by writing one record at a time.

1
votes

One of the possibility is the cluster/shard status being RED. Please address this issue which may be due to unassigned replicas. Once status turned GREEN the API call succeeded just fine.

0
votes

This is a document write conflict.

For example:
Multiple documents specify the same _id for Elasticsearch to use.
These documents are located in different partitions.
Spark writes multiple partitions to ES simultaneously.

Result is Elasticsearch receiving multiple updates for a single Document at once - from multiple sources / through multiple nodes / containing different data

"I was losing between 1 and 3 messages per batch."

  • Fluctuating number of failures when batch size > 1
  • Success if batch write size "1"
0
votes

Just adding another potential reason for this error, hopefully it helps someone. If your Elasticsearch index has child documents then:

  1. if you are using a custom routing field (not _id), then according to the documentation the uniqueness of the documents is not guaranteed. This might cause issues while updating from spark.
  2. If you are using the standard _id, the uniqueness will be preserved, however you need to make sure the following options are provided while writing from Spark to Elasticsearch:
    • es.mapping.join
    • es.mapping.routing