I am trying to investigate how Apache Ignite streaming works. I have 2 nodes cluster setup (both on localhost), and I start a client node which runs streaming code with StreamTransformer and EntryProcessor. As a result in one of my nodes I get cannot deserialize exception. My code is simplified WordCount example from Ignite documentation:
public class StreamingExample {`
public static class StreamingExampleCacheEntryProcessor implements CacheEntryProcessor<String, Long, Object> {
@Override
public Object process(MutableEntry<String, Long> e, Object... arg) throws EntryProcessorException {
Long val = e.getValue();
e.setValue(val == null ? 1L : val + 1);
return null;
}
}
public static void main(String[] args) throws IgniteException, IOException {
Ignition.setClientMode(true);
try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
IgniteCache<String, Long> stmCache = ignite.getOrCreateCache("mycache");
try (IgniteDataStreamer<String, Long> stmr = ignite.dataStreamer(stmCache.getName())) {
stmr.allowOverwrite(true);
stmr.receiver(StreamTransformer.from(new StreamingExampleCacheEntryProcessor()));
stmr.addData("word", 1L);
System.out.println("Finished");
}
}
}
}
Exception I get one ONE of two nodes is
[23:38:23] Topology snapshot [ver=5, servers=2, clients=1, CPUs=4, heap=3.3GB] Exception in thread "pub-#9%null%" class org.apache.ignite.binary.BinaryObjectException: Failed to unmarshal object with optimized marshaller at org.apache.ignite.internal.binary.BinaryUtils.doReadOptimized(BinaryUtils.java:1595) at org.apache.ignite.internal.binary.BinaryReaderExImpl.deserialize(BinaryReaderExImpl.java:1663) at org.apache.ignite.internal.binary.GridBinaryMarshaller.deserialize(GridBinaryMarshaller.java:298) at org.apache.ignite.internal.binary.BinaryMarshaller.unmarshal(BinaryMarshaller.java:109) at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.processRequest(DataStreamProcessor.java:278) at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.access$000(DataStreamProcessor.java:50) at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor$1.onMessage(DataStreamProcessor.java:80) at org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1238) at org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:866) at org.apache.ignite.internal.managers.communication.GridIoManager.access$1700(GridIoManager.java:106) at org.apache.ignite.internal.managers.communication.GridIoManager$5.run(GridIoManager.java:829) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: class org.apache.ignite.IgniteCheckedException: Failed to find class with given class loader for unmarshalling (make sure same versions of all classes are avai lable on all nodes or enable peer-class-loading): sun.misc.Launcher$AppClassLoader@4e857327 at org.apache.ignite.marshaller.optimized.OptimizedMarshaller.unmarshal(OptimizedMarshaller.java:224) at org.apache.ignite.internal.binary.BinaryUtils.doReadOptimized(BinaryUtils.java:1592) ... 13 more Caused by: java.lang.ClassNotFoundException: gridgaingames.StreamingExample$StreamingExampleCacheEntryProcessor at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:274) at org.apache.ignite.internal.util.IgniteUtils.forName(IgniteUtils.java:8350) at org.apache.ignite.internal.MarshallerContextAdapter.getClass(MarshallerContextAdapter.java:185) at org.apache.ignite.marshaller.optimized.OptimizedMarshallerUtils.classDescriptor(OptimizedMarshallerUtils.java:266) at org.apache.ignite.marshaller.optimized.OptimizedObjectInputStream.readObjectOverride(OptimizedObjectInputStream.java:318) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:364) at org.apache.ignite.marshaller.optimized.OptimizedObjectInputStream.readFields(OptimizedObjectInputStream.java:491) at org.apache.ignite.marshaller.optimized.OptimizedObjectInputStream.readSerializable(OptimizedObjectInputStream.java:579) at org.apache.ignite.marshaller.optimized.OptimizedClassDescriptor.read(OptimizedClassDescriptor.java:841) at org.apache.ignite.marshaller.optimized.OptimizedObjectInputStream.readObjectOverride(OptimizedObjectInputStream.java:324) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:364) at org.apache.ignite.marshaller.optimized.OptimizedMarshaller.unmarshal(OptimizedMarshaller.java:218) ... 14 more
There are several things I cannot get.
1) How can I fix it?
2) As this is not not "broadcast" or something, I supposed Ignite runs Streaming code on calling node only. Looks like I was wrong. So where my Streaming code is executed?
3) After printing "Finished" line my code doesn't stop. Why? Looks like some non-daemon thread is still there. Is this a Streaming code which prevents my client node from exiting?
PS
peer classloading is enabled. if I run some example with broadcast which executes code on many nodes - it works ok.