2
votes

I am trying to access a table (ORC format) stored on a remote cluster using Spark's JDBC:

val jdbcDF = spark.read
      .format("jdbc")
      .option("url", url)
      .option("dbtable", "metrics")
      .option("user", user)
      .option("password", password)
      .load()

However, no matter what I do, I keep getting this error:

Caused by: java.sql.SQLException: Cannot convert column 2 to long: java.lang.NumberFormatException: For input string: "metrics.t" at org.apache.hive.jdbc.HiveBaseResultSet.getLong(HiveBaseResultSet.java:372) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$8.apply(JdbcUtils.scala:365) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$8.apply(JdbcUtils.scala:364) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:286) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:268) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NumberFormatException: For input string: "metrics.t" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Long.parseLong(Long.java:589) at java.lang.Long.parseLong(Long.java:631) at org.apache.hive.jdbc.HiveBaseResultSet.getLong(HiveBaseResultSet.java:368) ... 22 more

The input string "metrics.t" corresponds to the tablename and the name of the 2nd column, "t", which features timestamps as long.

How do I skip past the headers with the JDBC format?

The CSV option ("header", true) has no effect in my case.

PS: Spark version 2.1.0

3
What's the url? Are you reading from Hive perhaps?Jacek Laskowski
@JacekLaskowski Yes, I am reading from Hive. This is the url: val url = "jdbc:hive2://localhost:10000/default"Jeanine Harb

3 Answers

1
votes

The code does not throw any exceptions with the following implementation:

val jdbcUrl = s"jdbc:hive2://$jdbcHostname:$jdbcPort/$jdbcDatabase"

val connectionProperties = new java.util.Properties()
connectionProperties.setProperty("user", jdbcUsername)
connectionProperties.setProperty("password", jdbcPassword)

val jdbcDF = spark.read.jdbc(jdbcUrl, "metrics", Array(), connectionProperties)

Weirdly though, if I remove the empty predicate Array(), the exception is back again.

1
votes

Because Spark JdbcDialect use double quotation mark as quoteIdentifier and it does not provide a HiveDialect (unlike e.g MySQL).

Therefore, Spark will send such SQL to Hive through JDBC: select "some_column_name" from table, and "some_column_name" turns out to be a string scalar rather than a column name.

val jdbcDF = spark.read.jdbc(jdbcUrl, "metrics", Array(), connectionProperties) by this line of code, you're telling Spark to generate a JDBC DataFrame without any partition. So no actual data fetch SQL was sent to Hive, and Spark just gives you an empty DataFrame.

The only correct way is to implement a corresponding dialect: How to specify sql dialect when creating spark dataframe from JDBC?

1
votes

I enabled Hive Support when initializing the SparkSession, worked for me:

SparkSession spark = new SparkSession.Builder()
                .master("local")
                .appName("test")
                .enableHiveSupport()
                .getOrCreate();