0
votes

Im trying to retrieve the cached value for every element in the JavaPairRDD. Im using the LOCAL cache mode as i want to minimize data shuffling of cached data. The ignite nodes are started in embedded mode within a spark job. The following code works fine if i run it on a single node. However, when i run it on a cluster of 5 machines, i get zero results.

The first attempt i had was using the IgniteRDD sql method:

 dataRDD.sql("SELECT v.id,v.sub,v.obj FROM VPRow v JOIN table(id bigint = ?) i ON v.id = i.id",new Object[] {objKeyEntries.toArray()});

where objKeyEntries is a collected set of entries in an RDD. The second attempt was using AffinityRun:

JavaPairRDD<Long, VPRow> objEntries = objKeyEntries.mapPartitionsToPair(new PairFlatMapFunction<Iterator<Tuple2<Long, Boolean>>, Long, VPRow>() {
    @Override
    public Iterator<Tuple2<Long, VPRow>> call(Iterator<Tuple2<Long, Boolean>> tuple2Iterator) throws Exception {
        ApplicationContext ctx = new ClassPathXmlApplicationContext("ignite-rdd.xml");
        IgniteConfiguration igniteConfiguration = (IgniteConfiguration) ctx.getBean("ignite.cfg");
        Ignite ignite = Ignition.getOrStart(igniteConfiguration);
        IgniteCache<Long, VPRow> cache = ignite.getOrCreateCache("dataRDD");

        ArrayList<Tuple2<Long,VPRow>> lst = new ArrayList<>();
        while(tuple2Iterator.hasNext()) {
            Tuple2<Long, Boolean> val = tuple2Iterator.next();
            ignite.compute().affinityRun("dataRDD", val._1(),()->{
                lst.add(new Tuple2<>(val._1(),cache.get(val._1())));
            });
        }
        return lst.iterator();
    }
});

The following is the ignite-rdd.xml configuration file:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd">

    <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
        <property name="memoryConfiguration">
            <bean class="org.apache.ignite.configuration.MemoryConfiguration">
                <property name="systemCacheInitialSize" value="#{100 * 1024 * 1024}"/>
                <property name="defaultMemoryPolicyName" value="default_mem_plc"/>
                <property name="memoryPolicies">
                    <list>
                        <bean class="org.apache.ignite.configuration.MemoryPolicyConfiguration">
                            <property name="name" value="default_mem_plc"/>
                            <property name="initialSize" value="#{5 * 1024 * 1024 * 1024}"/>
                        </bean>
                    </list>
                </property>
            </bean>
        </property>
        <property name="cacheConfiguration">
            <list>
                <bean class="org.apache.ignite.configuration.CacheConfiguration">
                    <!-- Set a cache name. -->
                    <property name="name" value="dataRDD"/>
                    <!-- Set a cache mode. -->
                    <property name="cacheMode" value="LOCAL"/>
                    <!-- Index Integer pairs used in the example. -->
                    <property name="indexedTypes">
                        <list>
                            <value>java.lang.Long</value>
                            <value>edu.code.VPRow</value>
                        </list>
                    </property>
                    <property name="affinity">
                        <bean class="org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction">
                            <property name="partitions" value="50"/>
                        </bean>
                    </property>
                </bean>
            </list>
        </property>
        <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
        <property name="discoverySpi">
            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <property name="ipFinder">
                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
                        <property name="addresses">
                            <list>
                                <value>[IP5]</value>
                                <value>[IP4]</value>
                                <value>[IP3]</value>
                                <value>[IP2]</value>
                                <value>[IP1]</value>
                            </list>
                        </property>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>
</beans>
1
Check if you have any data in your cache: dataRDD.query(new ScanQuery()) - Evgenii Zhuravlev
@Evgenii : The cache seems to be empty when running the code on the cluster. However, using the same code, the cache contains elements in a single node setup. - alexandria
As it said here: apacheignite-fs.readme.io/docs/… IgniteRDD utilizes partitioned nature of Ignite caches and provides partitioning information to Spark executor. So, you should use partitioned cache mode - Evgenii Zhuravlev

1 Answers

0
votes

Are you sure that you need to use LOCAL cache mode?

Most likely you filled cache only on one node and local caches on other nodes still empty.

affinityRun doesn't work because you have LOCAL cache, not PARTITIONED, so, it's not possible to determine owner node for key with AffinityFunction.