0
votes

I have a Flink Job reading events from a Kafka queue then calling another service if certain conditions are met.
I wanted to use Retrofit2 to call the REST endpoint of that service but I get a is not Serializable Exception. I have several Flat Maps connected to each other (in series) then calling the service happens in the last FlatMap. The exception I get:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the RichFlatMapFunction is not serializable. The object probably contains or references non serializable fields.
...
Caused by: java.io.NotSerializableException: retrofit2.Retrofit$1
...

The way I am initializing retrofit:

RetrofitClient.getClient(BASE_URL).create(NotificationService.class);

And the NotificationService interface

public interface NotificationService {

    @PUT("/test")
    Call<String> putNotification(@Body Notification notification);
}

The RetrofitClient class

public class RetrofitClient {

    private static Retrofit retrofit = null;

    public static Retrofit getClient(String baseUrl) {
        if (retrofit == null) {
            retrofit = new Retrofit.Builder().baseUrl(baseUrl).addConverterFactory(GsonConverterFactory.create())
                    .build();
        }
        return retrofit;
    }
1

1 Answers

1
votes

Put your Notification class code for more details, but looks like this answer helps java.io.NotSerializableException with "$1" after class