4
votes

I am using Flink v1.4.0 and I have set up two distinct jobs. The first is a pipeline that consumes data from a Kafka Topic and stores them into a Queryable State (QS). Data are keyed by date. The second submits a query to the QS job and processes the returned data.

Both jobs were working fine with Flink v.1.3.2. But with the new update, everything has broken. Here is part of the code for the first job:

private void runPipeline() throws Exception {
    StreamExecutionEnvironment env = configurationEnvironment();

    QueryableStateStream<String, DataBucket> dataByDate = env.addSource(sourceDataFromKafka())
        .map(NewDataClass::new)
        .keyBy(data.date)
        .asQueryableState("QSName", reduceIntoSingleDataBucket());
}

and here is the code on client side:

QueryableStateClient client = new QueryableStateClient("localhost", 6123);

// the state descriptor of the state to be fetched.
ValueStateDescriptor<DataBucket> descriptor = new ValueStateDescriptor<>(
    "QSName",
    TypeInformation.of(new TypeHint<DataBucket>() {}));

jobId = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx";
String key = "2017-01-06";

CompletableFuture<ValueState<DataBucket> resultFuture = client.getKvState(
    jobId, 
    "QSName", 
    key, 
    BasicTypeInfo.STRING_TYPE_INFO, 
    descriptor);

try {

    ValueState<DataBucket>  valueState = resultFuture.get();
    DataBucket bucket = valueState.value();          
    System.out.println(bucket.getLabel());

} catch (IOException | InterruptionException | ExecutionException e) {
    throw new RunTimeException("Unable to query bucket key: " + key , e);
}

I have followed the instructions as per the following link: https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/queryable_state.html

making sure to enable the queryable state on my Flink cluster by including the flink-queryable-state-runtime_2.11-1.4.0.jar from the opt/ folder of your Flink distribution to the lib/ folder and checked it runs in the task manager.

I keep getting the following error:

Exception in thread "main" java.lang.NullPointerException
    at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:84)
    at org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:253)
    at org.apache.flink.queryablestate.client.QueryableStateClient.getKvState(QueryableStateClient.java:210)
    at org.apache.flink.queryablestate.client.QueryableStateClient.getKvState(QueryableStateClient.java:174)
    at com.company.dept.query.QuerySubmitter.main(QuerySubmitter.java:37)

Any idea of what is happening? I think that my requests don't reach the QS at all ... Really don't know if and how I should change anything. Thanks.

1

1 Answers

2
votes

So, as it turned out, it was 2 things that were causing this error. The first was the use of the wrong constructor for creating a descriptor on the client side. Rather than using the one that only takes as input a name for the QS and a TypeHint, I had to use another one where a keySerialiser along with a default value are provided as per below:

ValueStateDescriptor<DataBucket> descriptor = new ValueStateDescriptor<>(
    "QSName",
    TypeInformation.of(new TypeHint<DataBucket>() {}).createSerializer(new ExecutionConfig()),
    DataBucket.emptyBucket());    // or anything that can be used as a default value

The second was relevant to the host and port values. The port was different from v1.3.2 now set to 9069 and the localhost was also different in my case. You can verify both by checking the logs of any task manager for the line: Started the Queryable State Proxy Server @ ....

Finally, in case you are here because you are looking to allow port-range for queryable state client proxy, I suggest you follow the respective issue (FLINK-7788) here: https://issues.apache.org/jira/browse/FLINK-7788.