0
votes

I have been trying to get my ignite continuous query code to work without setting the peer class loading to enabled. However I find that the code does not work.I tried debugging and realised that the call to cache.query(qry) errors out with the message "Failed to marshal custom event" error. When I enable the peer class loading , the code works as expected. Could someone provide guidance on how I can make this work without peer class loading? Following is the code snippet that calls the continuous query.

   public  void subscribeEvent(IgniteCache<String,String> cache,String inKeyStr,ServerWebSocket websocket ){
    System.out.println("in thread "+Thread.currentThread().getId()+"-->"+"subscribe event");
     //ArrayList<String> inKeys = new ArrayList<String>(Arrays.asList(inKeyStr.split(",")));

     ContinuousQuery<String, String> qry = new ContinuousQuery<>();

    /****
     * Continuous Query Impl
     */

    inKeys = ","+inKeyStr+",";
    qry.setInitialQuery(new ScanQuery<String, String>((k, v) -> inKeys.contains(","+k+",")));
    qry.setTimeInterval(1000);
    qry.setPageSize(1);

    // Callback that is called locally when update notifications are received.
  //  Factory<CacheEntryEventFilter<String, String>> rmtFilterFactory = new com.ccx.ignite.cqfilter.FilterFactory().init(inKeyStr);      

    qry.setLocalListener(new CacheEntryUpdatedListener<String, String>() {

          @Override public void onUpdated(Iterable<CacheEntryEvent<? extends String, ? extends String>> evts) {
            for (CacheEntryEvent<? extends String, ? extends String> e : evts)
            {

                          System.out.println("websocket locallsnr data in thread "+Thread.currentThread().getId()+"-->"+"key=" + e.getKey() + ", val=" + e.getValue());
                          try{
                          websocket.writeTextMessage("key=" + e.getKey() + ", val=" + e.getValue());
                          }
                          catch (Exception e1){
                              System.out.println("exception local listener "+e1.getMessage());
                              qry.setLocalListener(null) ;                }

            }

          }
        } );

   qry.setRemoteFilterFactory( new com.ccx.ignite.cqfilter.FilterFactory().init(inKeys));

    try{        

          cur = cache.query(qry);
          for (Cache.Entry<String, String> e : cur)
         { 
           System.out.println("websocket initialqry data in thread "+Thread.currentThread().getId()+"-->"+"key=" + e.getKey() + ", val=" + e.getValue());
           websocket.writeTextMessage("key=" + e.getKey() + ", val=" + e.getValue());
         }
    }
    catch (Exception e){
          System.out.println("exception cache.query "+e.getMessage());

          }        
    }

Following is the remote filter class that I have made into a self contained jar and pushed into the libs folder of ignite, so that this can be picked up by the server nodes

public class FilterFactory
{

public Factory<CacheEntryEventFilter<String, String>> init(String inKeyStr ){
System.out.println("factory init called jun22 ");

return new Factory <CacheEntryEventFilter<String, String>>() {
    private static final long serialVersionUID = 5906783589263492617L;

        @Override public CacheEntryEventFilter<String, String> create() {
            return new CacheEntryEventFilter<String, String>() {
                @Override  public boolean evaluate(CacheEntryEvent<? extends String, ? extends String> e) {
                    //List inKeys = new ArrayList<String>(Arrays.asList(inKeyStr.split(",")));
                    System.out.println("inside remote filter factory ");
                    String inKeys = ","+inKeyStr+",";

                    return inKeys.contains(","+e.getKey()+",");
                }
            };
        }
    };
}
}

Overall logic that I'm trying to implement is to have a websocket client subscribe to an event by specifying a cache name and key(s) of interest. The subscribe event code is called which creates a continuous query and registers a local listener callback for any update event on the key(s) of interest. The remote filter is expected to filter the update event based on the key(s) passed to it as a string and the local listener is invoked if the filter event succeeds. The local listener writes the updated key value to the web socket reference passed to the subscribe event code.

The version of ignite Im using is 1.8.0. However the behaviour is the same in 2.0 as well. Any help is greatly appreciated!

Here is the log snippet containing the relevant error

factory init called jun22 exception cache.query class org.apache.ignite.spi.IgniteSpiException: Failed to marshal custom event: StartRoutineDiscoveryMessage [startReqData=StartRequestData [prjPred=org.apache.ignite.configuration.CacheConfiguration$IgniteAllNodesPredicate@269707de, clsName=null, depInfo=null, hnd=CacheContinuousQueryHandlerV2 [rmtFilterFactory=com.ccx.ignite.cqfilter.FilterFactory$1@5dc301ed, rmtFilterFactoryDep=null, types=0], bufSize=1, interval=1000, autoUnsubscribe=true], keepBinary=false, routineId=b40ada9f-552d-41eb-90b5-3384526eb7b9]

2
Do you use any specific custom events? Could you please share the full logs?alexmagnus
Im not using any specific custom events. Im just listening to org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT. I will provide the logs as an edit to my original question. Thanks for your time.Nithyananda Thiagarajan

2 Answers

1
votes

From FilterFactory you are returning an instance of an anonymous class which in turn refers to the enclosing FilterFactory which is not serializable. Just replace the returned anonymous CacheEntryEventFilter based class with a corresponding nested static class.

0
votes

You need to explicitly deploy you CQ classes (remote filters specifically) on all nodes in topology. Just create a JAR file with them and put into libs folder prior to starting nodes.