4
votes

I am trying to write a Spark job that should put its output into HBase. AS far as I can tell, the right way to do this is to use the method saveAsHadoopDataset on org.apache.spark.rdd.PairRDDFunctions - this requires that my RDD is composed of pairs.

The method saveAsHadoopDataset requires a JobConf, and this is what I am trying to construct. According to this link, one thing I have to set on my JobConf is the output format (in fact it doens't work without), like

jobConfig.setOutputFormat(classOf[TableOutputFormat])

The problem is that apparently this does not compile, because TableOutputFormat is generic, even though it ignores its type parameter. So I have tried various combinations, such as

jobConfig.setOutputFormat(classOf[TableOutputFormat[Unit]])
jobConfig.setOutputFormat(classOf[TableOutputFormat[_]])

but in any case I get an error

required: Class[_ <: org.apache.hadoop.mapred.OutputFormat[_, _]]

Now, as far I can tell, Class[_ <: org.apache.hadoop.mapred.OutputFormat[_, _]] translates to Class[T] forSome { type T <: org.apache.hadoop.mapred.OutputFormat[_, _] }. Here is where I think I have a problem, because:

  • Class is invariant
  • TableOutputFormat[T] <: OutputFormat[T, Mutation], but
  • I am not sure how existential types interact with subtyping in the requirement T <: OutputFormat[_, _]

Is there a way to obtain a subtype of OutputFormat[_, _] from TableOutputFormat? It seems the problem arises from the differences between generics in Java and in Scala - what can I do for this?

edit:

It turns out this is even subtler. I have tried to define myself a method in the REPL

def foo(x: Class[_ <: OutputFormat[_, _]]) = x

and I can actually invoke it with

foo(classOf[TableOutputFormat[Unit]])

or even

foo(classOf[TableOutputFormat[_]])

for that matters. But I cannot call

jobConf.setOutputFormat(classOf[TableOutputFormat[_]])

The original signature of setOutputFormat in Java is void setOutputFormat(Class<? extends OutputFormat> theClass). How can I call it from Scala?

2

2 Answers

7
votes

That's very strange, are you 100% sure you have your imports correct (EDIT: yes, this was problem, see comments), and you have the correct versions of artefacts in your build file? Maybe it could help you if I provide a code snippet from my working project:

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.mapred.TableOutputFormat

val conf = HBaseConfiguration.create()

val jobConfig: JobConf = new JobConf(conf, this.getClass)
jobConfig.setOutputFormat(classOf[TableOutputFormat])
jobConfig.set(TableOutputFormat.OUTPUT_TABLE, outputTable)

and some deps I have:

"org.apache.hadoop" % "hadoop-client" % "2.3.0-mr1-cdh5.0.0",
"org.apache.hbase" % "hbase-client" % "0.96.1.1-cdh5.0.0", 
"org.apache.hbase" % "hbase-common" % "0.96.1.1-cdh5.0.0", 

"org.apache.hbase" % "hbase-hadoop-compat" % "0.96.1.1-cdh5.0.0",
"org.apache.hbase" % "hbase-it" % "0.96.1.1-cdh5.0.0", /
"org.apache.hbase" % "hbase-hadoop2-compat" % "0.96.1.1-cdh5.0.0",

"org.apache.hbase" % "hbase-prefix-tree" % "0.96.1.1-cdh5.0.0", 
"org.apache.hbase" % "hbase-protocol" % "0.96.1.1-cdh5.0.0", 
"org.apache.hbase" % "hbase-server" % "0.96.1.1-cdh5.0.0",
"org.apache.hbase" % "hbase-shell" % "0.96.1.1-cdh5.0.0", 

"org.apache.hbase" % "hbase-testing-util" % "0.96.1.1-cdh5.0.0", 
"org.apache.hbase" % "hbase-thrift" % "0.96.1.1-cdh5.0.0",
3
votes

Since import org.apache.hadoop.hbase.mapred.TableOutputFormat is deprecated, you can use following code as a draft:

import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
...
val hConf = HBaseConfiguration.create()

val job = Job.getInstance(hConf)
val jobConf = job.getConfiguration
jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
...
rdd.saveAsNewAPIHadoopDataset(jobConf)