3
votes

I would like to not use null value for field of a class used in dataset. I try to use scala Option and java Optional but it failed:

    @AllArgsConstructor // lombok
    @NoArgsConstructor  // mutable type is required in java :(
    @Data               // see https://stackoverflow.com/q/59609933/1206998
    public static class TestClass {
        String id;
        Option<Integer> optionalInt;
    }

    @Test
    public void testDatasetWithOptionField(){
        Dataset<TestClass> ds = spark.createDataset(Arrays.asList(
                new TestClass("item 1", Option.apply(1)),
                new TestClass("item .", Option.empty())
        ), Encoders.bean(TestClass.class));

        ds.collectAsList().forEach(x -> System.out.println("Found " + x));
    }

Fails, at runtime, with message File 'generated.java', Line 77, Column 47: Cannot instantiate abstract "scala.Option"


Question: Is there a way to encode optional fields without null in a dataset, using java?

Subsidiary question: btw, I didn't use much dataset in scala either, can you validate that it is actually possible in scala to encode a case class containing Option fields?


Note: This is used in an intermediate dataset, i.e something that isn't read nor write (but for spark internal serialization)

2

2 Answers

2
votes

This is fairly simple to do in Scala.

Scala Implementation

import org.apache.spark.sql.{Encoders, SparkSession}

object Test {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder
      .appName("Stack-scala")
      .master("local[2]")
      .getOrCreate()

    val ds = spark.createDataset(Seq(
      TestClass("Item 1", Some(1)),
      TestClass("Item 2", None)
    ))( Encoders.product[TestClass])

    ds.collectAsList().forEach(println)

    spark.stop()
  }

  case class TestClass(
    id: String,
    optionalInt: Option[Int] )
}

Java

There are various Option classes available in Java. However, none of them work out-of-the-box.

  1. java.util.Optional : Not serializable
  2. scala.Option -> Serializable but abstract, so when CodeGenerator generates the following code, it fails!
/* 081 */         // initializejavabean(newInstance(class scala.Option))
/* 082 */         final scala.Option value_9 = false ?
/* 083 */         null : new scala.Option();  // ---> Such initialization is not possible for abstract classes
/* 084 */         scala.Option javaBean_1 = value_9;
  1. org.apache.spark.api.java.Optional -> Spark's implementation of Optional which is serializable but has private constructors. So, it fails with error : No applicable constructor/method found for zero actual parameters. Since this is a final class, it's not possible to extend this.
/* 081 */         // initializejavabean(newInstance(class org.apache.spark.api.java.Optional))
/* 082 */         final org.apache.spark.api.java.Optional value_9 = false ?
/* 083 */         null : new org.apache.spark.api.java.Optional();
/* 084 */         org.apache.spark.api.java.Optional javaBean_1 = value_9;
/* 085 */         if (!false) {
1
votes

One option is to use normal Java Optionals in the data class and then use Kryo as serializer.

Encoder en = Encoders.kryo(TestClass.class);

Dataset<TestClass> ds = spark.createDataset(Arrays.asList(
        new TestClass("item 1", Optional.of(1)),
        new TestClass("item .", Optional.empty())
), en);

ds.collectAsList().forEach(x -> System.out.println("Found " + x));

Output:

Found TestClass(id=item 1, optionalInt=Optional[1])
Found TestClass(id=item ., optionalInt=Optional.empty)

There is a downside when using Kryo: this encoder encodes in a binary format:

ds.printSchema();
ds.show(false);

prints

root
 |-- value: binary (nullable = true)

+-------------------------------------------------------------------------------------------------------+
|value                                                                                                  |
+-------------------------------------------------------------------------------------------------------+
|[01 00 4A 61 76 61 53 74 61 72 74 65 72 24 54 65 73 74 43 6C 61 73 F3 01 01 69 74 65 6D 20 B1 01 02 02]|
|[01 00 4A 61 76 61 53 74 61 72 74 65 72 24 54 65 73 74 43 6C 61 73 F3 01 01 69 74 65 6D 20 AE 01 00]   |
+-------------------------------------------------------------------------------------------------------+

An udf-based solution to get the normal output columns of a dataset encoded with Kryo describes this answer.


Maybe a bit off-topic but probably a start to find a long-term solution is to look at the code of JavaTypeInference. The methods serializerFor and deserializerFor are used by ExpressionEncoder.javaBean to create the serializer and deserializer part of the encoder for Java beans.

In this pattern matching block

typeToken.getRawType match {
   case c if c == classOf[String] => createSerializerForString(inputObject)
   case c if c == classOf[java.time.Instant] => createSerializerForJavaInstant(inputObject)
   case c if c == classOf[java.sql.Timestamp] => createSerializerForSqlTimestamp(inputObject)
   case c if c == classOf[java.time.LocalDate] => createSerializerForJavaLocalDate(inputObject)
   case c if c == classOf[java.sql.Date] => createSerializerForSqlDate(inputObject)
   [...]

there is the handling for java.util.Optional missing. It could probably be added here as well as in the corresponding deserialize method. This would allow Java beans to have properties of type Optional.