I am trying to write a Spark job that should put its output into HBase. AS far as I can tell, the right way to do this is to use the method saveAsHadoopDataset
on org.apache.spark.rdd.PairRDDFunctions
- this requires that my RDD
is composed of pairs.
The method saveAsHadoopDataset
requires a JobConf
, and this is what I am trying to construct. According to this link, one thing I have to set on my JobConf
is the output format (in fact it doens't work without), like
jobConfig.setOutputFormat(classOf[TableOutputFormat])
The problem is that apparently this does not compile, because TableOutputFormat
is generic, even though it ignores its type parameter. So I have tried various combinations, such as
jobConfig.setOutputFormat(classOf[TableOutputFormat[Unit]])
jobConfig.setOutputFormat(classOf[TableOutputFormat[_]])
but in any case I get an error
required: Class[_ <: org.apache.hadoop.mapred.OutputFormat[_, _]]
Now, as far I can tell, Class[_ <: org.apache.hadoop.mapred.OutputFormat[_, _]]
translates to Class[T] forSome { type T <: org.apache.hadoop.mapred.OutputFormat[_, _] }
. Here is where I think I have a problem, because:
Class
is invariantTableOutputFormat[T] <: OutputFormat[T, Mutation]
, but- I am not sure how existential types interact with subtyping in the requirement
T <: OutputFormat[_, _]
Is there a way to obtain a subtype of OutputFormat[_, _]
from TableOutputFormat
? It seems the problem arises from the differences between generics in Java and in Scala - what can I do for this?
edit:
It turns out this is even subtler. I have tried to define myself a method in the REPL
def foo(x: Class[_ <: OutputFormat[_, _]]) = x
and I can actually invoke it with
foo(classOf[TableOutputFormat[Unit]])
or even
foo(classOf[TableOutputFormat[_]])
for that matters. But I cannot call
jobConf.setOutputFormat(classOf[TableOutputFormat[_]])
The original signature of setOutputFormat
in Java is void setOutputFormat(Class<? extends OutputFormat> theClass)
. How can I call it from Scala?