2
votes

when trying to use spark 2.3 on HDP 3.1 to write to a Hive table without the warehouse connector directly into hives schema using:

spark-shell --driver-memory 16g --master local[3] --conf spark.hadoop.metastore.catalog.default=hive
val df = Seq(1,2,3,4).toDF
spark.sql("create database foo")
df.write.saveAsTable("foo.my_table_01")

fails with:

Table foo.my_table_01 failed strict managed table checks due to the following reason: Table is marked as a managed table but is not transactional

but a:

val df = Seq(1,2,3,4).toDF.withColumn("part", col("value"))
df.write.partitionBy("part").option("compression", "zlib").mode(SaveMode.Overwrite).format("orc").saveAsTable("foo.my_table_02")

Spark with spark.sql("select * from foo.my_table_02").show works just fine. Now going to Hive / beeline:

0: jdbc:hive2://hostname:2181/> select * from my_table_02;
Error: java.io.IOException: java.lang.IllegalArgumentException: bucketId out of range: -1 (state=,code=0)

A

 describe extended my_table_02;

returns

 +-----------------------------+----------------------------------------------------+----------+
|          col_name           |                     data_type                      | comment  |
+-----------------------------+----------------------------------------------------+----------+
| value                       | int                                                |          |
| part                        | int                                                |          |
|                             | NULL                                               | NULL     |
| # Partition Information     | NULL                                               | NULL     |
| # col_name                  | data_type                                          | comment  |
| part                        | int                                                |          |
|                             | NULL                                               | NULL     |
| Detailed Table Information  | Table(tableName:my_table_02, dbName:foo, owner:hive/[email protected], createTime:1571201905, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:value, type:int, comment:null), FieldSchema(name:part, type:int, comment:null)], location:hdfs://bd-sandbox.t-mobile.at:8020/warehouse/tablespace/external/hive/foo.db/my_table_02, inputFormat:org.apache.hadoop.hive.ql.io.orc.OrcInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.ql.io.orc.OrcSerde, parameters:{path=hdfs://bd-sandbox.t-mobile.at:8020/warehouse/tablespace/external/hive/foo.db/my_table_02, compression=zlib, serialization.format=1}), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{}), storedAsSubDirectories:false), partitionKeys:[FieldSchema(name:part, type:int, comment:null)], parameters:{numRows=0, rawDataSize=0, spark.sql.sources.schema.partCol.0=part, transient_lastDdlTime=1571201906, bucketing_version=2, spark.sql.create.version=2.3.2.3.1.0.0-78, totalSize=740, spark.sql.sources.schema.numPartCols=1, spark.sql.sources.schema.part.0={\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"part\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}, numFiles=4, numPartitions=4, spark.sql.partitionProvider=catalog, spark.sql.sources.schema.numParts=1, spark.sql.sources.provider=orc, transactional=true}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE, rewriteEnabled:false, catName:hive, ownerType:USER, writeId:-1) |

How can I use spark to write to hive without using the warehouse connector but still writing to the same metastore which can later on be read by hive? To my best knowledge external tables should be possible (thy are not managed, not ACID not transactional), but I am not sure how to tell the saveAsTable how to handle these.

edit

related issues:

Might be a workaround like the https://github.com/qubole/spark-acid like https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.4/integrating-hive/content/hive_hivewarehouseconnector_for_handling_apache_spark_data.html but I do not like the idea of using more duct tape where I have not seen any large scale performance tests just yet. Also, this means changing all existing spark jobs.

In fact Cant save table to hive metastore, HDP 3.0 reports issues with large data frames and the warehouse connector.

edit

I just found https://community.cloudera.com/t5/Support-Questions/Spark-hive-warehouse-connector-not-loading-data-when-using/td-p/243613

And:

execute() vs executeQuery()

ExecuteQuery() will always use the Hiveserver2-interactive/LLAP as it uses the fast ARROW protocol. Using it when the jdbc URL point to the non-LLAP Hiveserver2 will yield an error.

Execute() uses JDBC and does not have this dependency on LLAP, but has a built-in restriction to only return 1.000 records max. But for most queries (INSERT INTO ... SELECT, count, sum, average) that is not a problem.

But doesn't this kill any high-performance interoperability between hive and spark? Especially if there are not enough LLAP nodes available for large scale ETL.

In fact, this is true. This setting can be configured at https://github.com/hortonworks-spark/spark-llap/blob/26d164e62b45cfa1420d5d43cdef13d1d29bb877/src/main/java/com/hortonworks/spark/sql/hive/llap/HWConf.java#L39, though I am not sure of the performance impact of increasing this value

4
Did you try setting explicitly the table storage format to something non-default (i.e. non-ORC) that is not supported by Hive ACID, hence should not mess with the new ACID-by-default settings? Like Parquet, AVRO, CSV, whatever? - Samson Scharfrichter
IMHO the best way to deal with that is to disable the new "ACID-by-default" setting in Ambari. If and when you need ACID, make it explicit in the CREATE TABLE in Hive -- the way it was in HDP 2.x - Samson Scharfrichter
That sounds very sensible. Do you know where to change it / the key of this property? - Georg Heiler

4 Answers

0
votes

Did you try

    data.write \
        .mode("append") \
        .insertInto("tableName")
0
votes

Inside Ambari simply disabling the option of creating transactional tables by default solves my problem.

set to false twice (tez, llap)

hive.strict.managed.tables = false

and enable manually in each table property if desired (to use a transactional table).

0
votes

Creating an external table (as a workaround) seems to be the best option for me. This still involves HWC to register the column metadata or update the partition information.

Something along these lines:

val df:DataFrame = ...
val externalPath = "/warehouse/tablespace/external/hive/my_db.db/my_table"
import com.hortonworks.hwc.HiveWarehouseSession
val hive = HiveWarehouseSession.session(spark).build()
dxx.write.partitionBy("part_col").option("compression", "zlib").mode(SaveMode.Overwrite).orc(externalPath)
val columns = dxx.drop("part_col").schema.fields.map(field => s"${field.name} ${field.dataType.simpleString}").mkString(", ")
val ddl =
      s"""
         |CREATE EXTERNAL TABLE my_db.my_table ($columns)
         |PARTITIONED BY (part_col string)
         |STORED AS ORC 
         |Location '$externalPath'
       """.stripMargin

hive.execute(ddl)
hive.execute(s"MSCK REPAIR TABLE $tablename SYNC PARTITIONS")

Unfortunately, this throws a:

java.sql.SQLException: The query did not generate a result set!

from HWC

0
votes

"How can I use spark to write to hive without using the warehouse connector but still writing to the same metastore which can later on be read by hive?"

We are working on the same setting (HDP 3.1 with Spark 2.3). Using below code we were getting the same error messages as you got "bucketId out of range: -1". The solution was to run set hive.fetch.task.conversion=none; in Hive shell before trying to query the table.

The code to write data into Hive without the HWC:

  val warehouseLocation = new File("spark-warehouse").getAbsolutePath

  case class Record(key: Int, value: String)

  val spark = SparkSession.builder()
    .master("yarn")
    .appName("SparkHiveExample")
    .config("spark.sql.warehouse.dir", warehouseLocation)
    .enableHiveSupport()
    .getOrCreate()

  spark.sql("USE databaseName")
  val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
  recordsDF.write.mode(SaveMode.Overwrite).format("orc").saveAsTable("sparkhive_records")

[Example from https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html]