1
votes

I am new to hazelcast-jet and my use case is to read from Kafka source and filter after checking its value in hazelcastIMDG.

I am getting and loading IMDG map even before creating pipeline. See below

 IMap<String, Policy> policyMap =jet.getHazelcastInstance().getMap(POLICY_MAP_NAME);
            Utility.populatePoliciesMap(policyMap);

passing policyMap as param in the buildPipeline method.

i have created a pipeline as below

StreamStage<TimestampedEntry<String, Long>> streamStage = pipeline.drawFrom(KafkaSources.kafka(brokerConsumerProperties(), projectionFn, getIngestTopic()))
                .addTimestamps()
                .flatMap(ingestData -> traverseArray(ingestData.getMapRequestParameterTree().toArray(new String[ingestData.getMapRequestParameterTree().size()])))
                .filter(hash -> policyMap.get(hash)!=null)
                .window(sliding(MINUTES.toMillis(1), SECONDS.toMillis(10)))
                .groupingKey(wholeItem())
                .aggregate(counting())
               .map((TimestampedEntry<String, Long> e) -> entry(e.getKey(), createBlacklistObjectEvent(Utility.fetchPolicy(e.getKey()), e.getTimestamp(), e.getValue())));
        timestampedEntryStreamStage.drainTo(Sinks.map(BL_MAP_NAME));

but with this i am getting below exception

Exception in thread "main" java.lang.IllegalArgumentException: "filterFn" must be serializable at com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:301) at com.hazelcast.jet.impl.pipeline.ComputeStageImplBase.attachFilter(ComputeStageImplBase.java:129) at com.hazelcast.jet.impl.pipeline.StreamStageImpl.filter(StreamStageImpl.java:71) at com.visa.rls.handler.HazelcastJetIngetstResultHandler.buildPipeline(HazelcastJetIngetstResultHandler.java:120) at com.visa.rls.handler.HazelcastJetIngetstResultHandler.run(HazelcastJetIngetstResultHandler.java:84) at com.visa.rls.handler.HazelcastJetIngetstResultHandler.main(HazelcastJetIngetstResultHandler.java:58) Caused by: java.io.NotSerializableException: com.hazelcast.map.impl.proxy.MapProxyImpl at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:299) ... 5 more

1

1 Answers

0
votes

You are using policyMap inside the filter function, but the IMap isn't serializable. It gets captured by the lambda expression. You have to get the IMap instance on each remote member, for which you can use filterUsingContext instead of filter:

.filterUsingContext(
    ContextFactory.withCreateFn(jetInstance -> jetInstance.getMap(POLICY_MAP_NAME)),
    (policyMap, hash) -> policyMap.get(hash) != null
)