3
votes

I am writing a Flink application in Kotlin and data classes (as well as other Kotlin classes) are not identified as valid POJO types.

The Flink documentation states that a data type is recognized as a POJO type (and allows "by-name" field referencing) if the following conditions are fulfilled:

  • The class is public and standalone
  • The class has a public no-argument constructor
  • All non-static, non-transient fields in the class are either public (and non-final) or have public getter and setter methods that follow Java beans naming conventions.

I receive the following when implementing a Kotlin data class, which should meet the aforementioned conditions to be recognized as a POJO:

[main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - 
Class class <Class> cannot be used as a POJO type because not all 
fields are valid POJO fields, and must be processed as GenericType. 
Please read the Flink documentation on "Data Types & Serialization" 
for details of the effect on performance.

Investigating further, I reviewed Flink's TypeExtractor.isValidPojoField method @ https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java

In a separate project, I applied the field checks with java.lang.reflect.Modifier on a simple Kotlin data class in attempt to narrow down the issue.

data class SomeDataClass(
    val topic: String = "",
    val message: String = ""
)

While Kotlin class fields have public visibility by default, Modifier.isPublic recognizes the fields as private. Additionally, Modifier.isFinal recognizes the fields as final.

val clazz = SomeDataClass::class.java
val fields = clazz.declaredFields
fields.forEach { it ->
    println("field: $it")
    println(it.genericType)
    println("public? " + Modifier.isPublic(it.modifiers))
    println("final? " + Modifier.isFinal(it.modifiers))
    println("transient? " + Modifier.isTransient(it.modifiers))
    println("static? " + Modifier.isStatic(it.modifiers))
}

>
field: private final java.lang.String SomeDataClass.topic
class java.lang.String
public? false
final? true
transient? false
static? false

However, public getter and setter methods are created for these fields, so this object should still meet the POJO criteria.

println(clazz.declaredMethods.toList())

>
[public boolean SomeDataClass.equals(java.lang.Object), 
public java.lang.String SomeDataClass.toString(), 
public int SomeDataClass.hashCode(), 
**public final java.lang.String SomeDataClass.getMessage(),** 
public final SomeDataClass SomeDataClass.copy(java.lang.String,java.lang.String), 
**public final java.lang.String SomeDataClass.getTopic(),** 
public final java.lang.String SomeDataClass.component1(), 
public final java.lang.String SomeDataClass.component2(), 
public static SomeDataClass SomeDataClass.copy$default(SomeDataClass,java.lang.String,java.lang.String,int,java.lang.Object)]

The getter and setter methods, however, are final, which leads me to believe this is the issue.

I am relatively new to JVM development, so any help would be greatly appreciated. I have reviewed the Flink Jira, Stack Overflow, and Flink mailing list and have not found a similar issue reported.

1
If the problem turns out to only be the final modifiers of the data classes you can use the "all-open" kotlin plugin to remove them from the compiled code.Avi Cherry

1 Answers

2
votes

I see at least two POJO rules violation with provided data class.

1) The class has a public no-argument constructor

By default, Kotlin will not generate overloads to functions with default parameter values (https://kotlinlang.org/docs/reference/java-to-kotlin-interop.html#overloads-generation)

So your compiled class will have only one constructor with two-parameter constructor, and no-argument constructor will not be created. To force Kotlin compiler to generate multiple overloads one should use @JvmOverloads annotation. In your case it will be used on constructor so we also need to add constructor keyword:

data class SomeDataClass @JvmOverloads constructor

2) All non-static, non-transient fields in the class are either public (and non-final) or have public getter and setter methods that follow Java beans naming conventions.

Since you are using val keywords the generated fields will be final, and no setter will be generated for them. So you can change vals to vars and the fields will no longer be final and proper getters and setters will be generated too. (Or you could use another annotation to prevent generating getters and setters and expose a field as it is https://kotlinlang.org/docs/reference/java-to-kotlin-interop.html#instance-fields)

So final code should be like this:

data class SomeDataClass @JvmOverloads constructor(
    var topic: String = "",
    var message: String = ""
)