1
votes

I'm just start to learn Hazelcast Jet. My source is UDP datagrams. I want to process it in parallel at some nodes of Jet and resend them to other addresses by 'domain'. I want to use Hazelcast IMDG IMap with loader to get 'domain' by 'source ip'.

DAG dag = new DAG();        
Vertex source = dag.newVertex("datagram-source",
                UdpSocketP.supplier("0.0.0.0", 41813));
        source.localParallelism(1);

        Vertex mapper = dag.newVertex("map",
                map(new DomainMapper(instance.getMap("mysqlNas"))));

        Vertex sink = dag.newVertex("sink",
                Sinks.writeFile("logs"));
        sink.localParallelism(1);

But when I try to use IMap at DistributedFunction i get exception

Exception in thread "main" java.lang.IllegalArgumentException: "metaSupplier" must be serializable
    at com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:185)
    at com.hazelcast.jet.Vertex.<init>(Vertex.java:101)
    at com.hazelcast.jet.Vertex.<init>(Vertex.java:78)
    at com.hazelcast.jet.DAG.newVertex(DAG.java:79)
    at org.eltex.softwlc.sorm.replicator.JetServer.main(JetServer.java:46)
Caused by: java.io.NotSerializableException: com.hazelcast.jet.stream.impl.MapDecorator

DomainMapper code:

package org.eltex.softwlc.sorm.replicator;

import com.hazelcast.core.IMap;
import com.hazelcast.jet.function.DistributedFunction;

import java.io.Serializable;
import java.net.DatagramPacket;

/**
 * Created by mickey on 21.07.17.
 */
public class DomainMapper implements DistributedFunction<DatagramPacket, IpData>, Serializable {

    private final IMap<String, NasValue> map;

    public DomainMapper(IMap<String, NasValue> map) {
        this.map = map;
    }

    @Override
    public IpData apply(DatagramPacket datagramPacket) {
        final IpData d = new IpData(datagramPacket, datagramPacket.getAddress().getHostAddress());
        System.out.println(d);

        final NasValue nasValue = map.get(datagramPacket.getAddress().getHostAddress());
        if (nasValue!=null) {
            d.setDomain(nasValue.getDomain());
        }

        return d;
    }
}

What is my mistake? Or Hazelcast Jet is wrong choice for my purpose.

1

1 Answers

2
votes

The problem is that you're trying to serialize the entire IMap inside the function. A direct fix would be to write a custom processor that gets access to the Hazelcast Jet instance inside its init() method and looks up its IMap from that. Since init() code is executed on the target member, after all deserialization, this would work.

However, on a more general level your goal seems to be of the "data enrichment" kind. The way we want to support this in Jet is via a "hash join" operation, which is currently not first-class; however there is a code sample showing the approach. You can either funnel the entire IMap contents to a vertex that will turn it into a plain HashMap and distribute to all enriching processors, or you can prepare a Hazelcast ReplicatedMap that will be used directly by the enriching processor.

The first approach means you work against a snapshot of the IMap; in the second one you can continue to update the ReplicatedMap as the job is running.

It is best to go and check the samples: HashMapEnrichment and ReplicatedMapEnrichment.