2
votes

I have a Spark application (version 2.1) writing data to an Ignite server cache (version 2.2). This is the code I use to create the cache from an IgniteContext in the Spark job:

object Spark_Streaming_Processing {

 case class Custom_Class(
                  @(QuerySqlField @field)(index = true) a: String,
                  @(QuerySqlField @field)(index = true) b: String,
                  @(QuerySqlField @field)(index = true) c: String,
                  @(QuerySqlField @field)(index = true) d: String,
                  @(QuerySqlField @field)(index = true) e: String,
                  @(QuerySqlField @field)(index = true) f: String,
                  @(QuerySqlField @field)(index = true) g: String,
                  @(QuerySqlField @field)(index = true) h: String

                )

//START IGNITE CONTEXT

val addresses=new util.ArrayList[String]()
addresses.add("127.0.0.1:48500..48520")

val igniteContext:IgniteContext=new IgniteContext(sqlContext.sparkContext,()=>
new IgniteConfiguration().setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(new TcpDiscoveryVmIpFinder().setAddresses(addresses))
  ).setCacheConfiguration(new CacheConfiguration[String,Custom_Class]()
  .setName("Spark_Ignite").setBackups(1).setIndexedTypes(classOf[String],classOf[Custom_Class]))
,true)


println(igniteContext.ignite().cacheNames())

val ignite_cache_rdd:IgniteRDD[String,Custom_Class] =igniteContext.fromCache[String,Custom_Class]("Spark_Ignite")

val processed_Pair:RDD[(String,Custom_Class)]=(...)// rdd with data, which as you can see has the correct datatypes as parameters

ignite_cache_rdd.savePairs(processed_PairRDD)

}
  }

Everything was working fine, but yesterday I decided to destroy the Spark_Ignite cache and restart the Ignite server. However, running the Spark application again, I am now getting the following error

javax.cache.CacheException: Failed to find data nodes for cache: Spark_Ignite
    at org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor.stableDataNodes(GridReduceQueryExecutor.java:447)
    at org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor.query(GridReduceQueryExecutor.java:591)
    at org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing$8.iterator(IgniteH2Indexing.java:1160)

If I go to the Ignite visor I can see that the cache was created on the Ignite server side, and I can see that the Ignite server detects the Spark application's Ignite client:

 [12:17:01] Topology snapshot [ver=4, servers=1, clients=1, CPUs=12, heap=1.9GB]

However, the Ignite client in the Spark application doesn't seem to detect the server node when it starts, even though the cache was created:

18/04/13 12:17:01 INFO GridDiscoveryManager: Topology snapshot [ver=4, servers=0, clients=1, CPUs=12, heap=0.89GB]
18/04/13 12:17:07 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2)
javax.cache.CacheException: Failed to find data nodes for cache: Spark_Ignite

I should add that the Ignite client won't start if the Ignite server is not up, it looks for it in the addresses given and just hangs. When I connect the server, the cache is created and only then do I get this error.

What might the issue be here? And how is it that it was working before?

Thank you.

UPDATE

Here is the xml config file that I use for my single Ignite server:

<?xml version="1.0" encoding="UTF-8"?>

 <beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:util="http://www.springframework.org/schema/util"
   xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
    http://www.springframework.org/schema/util
    http://www.springframework.org/schema/util/spring-util-2.0.xsd">

<bean id="ignite_cluster.cfg" 
 class="org.apache.ignite.configuration.IgniteConfiguration">
    <property name="igniteInstanceName" value="ignite_node1" />
    <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
     <property name="discoverySpi">
            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                    <!-- Initial local port to listen to. -->
                    <property name="localPort" value="48510"/>
                    <!-- Changing local port range. This is an optional action. -->
                    <property name="localPortRange" value="20"/>
                    <!-- Setting up IP finder for this cluster -->
                    <property name="ipFinder">
                            <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
                                    <property name="addresses">
                                            <list>
                                                    <!-- Addresses and port range of the nodes from the first cluster.
          127.0.0.1 can be replaced with actual IP addresses or host names.
          Port range is optional. -->

           <value>127.0.0.1:48500..48520</value>
                                            </list>
                                    </property>
                            </bean>
                    </property>
            </bean>
    </property>
    <property name="communicationSpi">
            <bean class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
                    <property name="localPort" value="48100"/>
            </bean>
    </property>
</bean>
 <bean class="org.apache.ignite.configuration.FileSystemConfiguration">
    <property name="secondaryFileSystem">
 <bean class="org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem">
  <property name="fileSystemFactory">
    <bean class="org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory">
      <property name="uri" value="hdfs://localhost:9000/"/>
      </bean>
     </property>
    </bean>
  </property>
</bean>
<bean class="org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory">
 <property name="uri" value="hdfs://localhost:9000/"/>
 <property name="configPaths">
    <list>
      <value>/etc/hadoop-2.9.0/etc/hadoop/core-site.xml</value>
    </list>
  </property>
 </bean>

 </beans>
1
as I see from Topology snapshot [ver=4, servers=0, clients=1], your client just can't connect to the server node, most possible it's a discovery problem. Can you please share full logs from all nodes? Have you tried to restart client node? - Evgenii Zhuravlev
@EvgeniiZhuravlev , I will add the xml conf file for the Ignite server I'm running (I'm running just one server locally) and full logs. And yes, everytime the Spark app is launched, the client node is recreated - manuel mourato
@EvgeniiZhuravlev, added the config file. You can see that I set the local port of my server to 48510, and then I discover it in my Spark app with the address range - manuel mourato
Weirdly after restarting my computer, everything seems to be working well....I checked if it was a port issue before, but everything was fine...o well, hopefully it wont happen again :) - manuel mourato

1 Answers

0
votes

I encountered a similar problem with the exact same error message:

javax.cache.CacheException: Failed to find data nodes for cache: <cache name>
    at org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor.stableDataNodes(GridReduceQueryExecutor.java:447)
    at org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor.query(GridReduceQueryExecutor.java:591)
    at org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing$8.iterator(IgniteH2Indexing.java:1160)

Did the following:

  1. Checked the ignite topology by: ./control.sh --baseline
  2. If there is any baseline node in offline state, either start that baseline node or you can remove it by ./control.sh --baseline remove <ConsistentID>

enter image description here

  1. Restart your client application