1
votes

I am using Spark SQL (I mention that it is in Spark in case that affects the SQL syntax - I'm not familiar enough to be sure yet) and I have a table that I am trying to re-structure. I have an approach that works locally but when I try to run the same command on an AWS EC2 instance I get an error reporting that I have an 'unresolved operator'

Basically I have data that looks like:

userId    someString      varA   
   1      "example1"     [0,2,5] 
   2      "example2"     [1,20,5] 

and I use an 'explode' command in an sqlContext on varA. When I run this locally things return correctly, but on AWS they fail.

I can reproduce this with the following commands:

val data = List(
  ("1", "example1", Array(0,2,5)), ("2", "example2", Array(1,20,5)))
val distData = sc.parallelize(data)
val distTable = distData.toDF("userId", "someString", "varA")
distTable.registerTempTable("distTable_tmp")
val temp1 = sqlContext.sql("select userId, someString, varA from distTable_tmp")
val temp2 = sqlContext.sql(
  "select userId, someString, explode(varA) as varA from distTable_tmp")

Locally, temp1.show() and temp2.show() return what I'd expect, namely:

scala> temp1.show()
+------+----------+----------+
|userId|someString|      varA|
+------+----------+----------+
|     1|  example1| [0, 2, 5]|
|     2|  example2|[1, 20, 5]|
+------+----------+----------+

scala> temp2.show()
+------+----------+----+
|userId|someString|varA|
+------+----------+----+
|     1|  example1|   0|
|     1|  example1|   2|
|     1|  example1|   5|
|     2|  example2|   1|
|     2|  example2|  20|
|     2|  example2|   5|
+------+----------+----+

but on AWS the temp1 sqlContext command works fine, but temp2 fails with the message:

scala> val temp2 = sqlContext.sql("select userId, someString, explode(varA) as varA from distTable_tmp")
15/11/05 22:46:49 INFO parse.ParseDriver: Parsing command: select userId, someString, explode(varA) as varA from distTable_tmp
15/11/05 22:46:49 INFO parse.ParseDriver: Parse Completed
org.apache.spark.sql.AnalysisException: unresolved operator 'Project [userId#3,someString#4,HiveGenericUdtf#org.apache.hadoop.hive.ql.udf.generic.GenericUDTFExplode(varA#5) AS varA#6];
...

Many thanks.

1
Can you show how you create sqlContext? Also what version of Spark do you use?zero323
@zero323 - I was just opening the Spark Scala shell via './bin/spark-shell' locally and 'MASTER=yarn-client /home/hadoop/spark/bin/spark-shell' on AWS - I didn't think to use anything except the default sqlContext that seems to be loaded for me. The Spark versions are 1.5.1 (local) and 1.3.1 (AWS).anthr

1 Answers

1
votes

The source of the problem is a Spark version you use on EC2. explode function has been introduced in Spark 1.4, hence it cannot work on 1.3.1. It is possible to use RDD and flatMap like this:

import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StructType, StructField, IntegerType}

val rows: RDD[Row] = distTable.rdd.flatMap(
  row => row.getAs[Seq[Int]](2).map(v => Row.fromSeq(row.toSeq :+ v)))
val newSchema = StructType(
  distTable.schema.fields :+ StructField("varA_exploded", IntegerType, true))

sqlContext.createDataFrame(rows, newSchema).show

// userId someString varA                 varA_exploded
// 1      example1   ArrayBuffer(0, 2, 5) 0            
// 1      example1   ArrayBuffer(0, 2, 5) 2            
// 1      example1   ArrayBuffer(0, 2, 5) 5            
// 2      example2   ArrayBuffer(1, 20... 1            
// 2      example2   ArrayBuffer(1, 20... 20           
// 2      example2   ArrayBuffer(1, 20... 5      

but it doubt it is worth all the fuss.