0
votes

I've been trying to integrate ignite and spark. The goal of my application is to write and read spark dataframes to/from ignite. However, I'm facing several issues with larger datasets (> 200 000 000 rows).

I have a 6-node Ignite cluster running on YARN. It has 160Gb of memory and 12 cores. I am trying to save the dataframe using spark (around 20Gb of raw text data) in an Ignite cache (partitioned 1 backup):

def main(args: Array[String]) {
    val ignite = setupIgnite

    closeAfter(ignite) { _ ⇒

      implicit val spark: SparkSession = SparkSession.builder
        .appName("Ignite Benchmark")
        .getOrCreate()

      val customer = readDF("csv", "|", Schemas.customerSchema, "hdfs://master.local:8020/apps/hive/warehouse/ssbplus100/customer")
      val part = readDF("csv", "|", Schemas.partSchema, "hdfs:// master.local:8020/apps/hive/warehouse/ssbplus100/part")
      val supplier = readDF("csv", "|", Schemas.supplierSchema, "hdfs:// master.local:8020/apps/hive/warehouse/ssbplus100/supplier")
      val dateDim = readDF("csv", "|", Schemas.dateDimSchema, "hdfs:// master.local:8020/apps/hive/warehouse/ssbplus100/date_dim")
      val lineorder = readDF("csv", "|", Schemas.lineorderSchema, "hdfs:// master.local:8020/apps/hive/warehouse/ssbplus100/lineorder")

      writeDF(customer, "customer", List("custkey"), TEMPLATES.REPLICATED)
      writeDF(part, "part", List("partkey"), TEMPLATES.REPLICATED)
      writeDF(supplier, "supplier", List("suppkey"), TEMPLATES.REPLICATED)
      writeDF(dateDim, "date_dim", List("datekey"), TEMPLATES.REPLICATED)
      writeDF(lineorder.limit(200000000), "lineorder", List("orderkey, linenumber"), TEMPLATES.NO_BACKUP)

    }
  }

At some point, the spark application retrieves this error:

    class org.apache.ignite.internal.mem.IgniteOutOfMemoryException: Out of memory in data region [name=default, initSize=256.0 MiB, maxSize=12.6 GiB, persistenceEnabled=false] Try the following:
  ^-- Increase maximum off-heap memory size (DataRegionConfiguration.maxSize)
  ^-- Enable Ignite persistence (DataRegionConfiguration.persistenceEnabled)
  ^-- Enable eviction or expiration policies
        at org.apache.ignite.internal.pagemem.impl.PageMemoryNoStoreImpl.allocatePage(PageMemoryNoStoreImpl.java:304)
        at org.apache.ignite.internal.processors.cache.persistence.freelist.AbstractFreeList.allocateDataPage(AbstractFreeList.java:463)
        at org.apache.ignite.internal.processors.cache.persistence.freelist.AbstractFreeList.insertDataRow(AbstractFreeList.java:501)
        at org.apache.ignite.internal.processors.cache.persistence.RowStore.addRow(RowStore.java:97)
        at org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl$CacheDataStoreImpl.createRow(IgniteCacheOffheapManagerImpl.java:1302)
        at org.apache.ignite.internal.processors.cache.GridCacheMapEntry$UpdateClosure.call(GridCacheMapEntry.java:4426)
        at org.apache.ignite.internal.processors.cache.GridCacheMapEntry$UpdateClosure.call(GridCacheMapEntry.java:4371)
        at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree$Invoke.invokeClosure(BPlusTree.java:3083)
        at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree$Invoke.access$6200(BPlusTree.java:2977)
        at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.invokeDown(BPlusTree.java:1726)
        at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.invokeDown(BPlusTree.java:1703)
        at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.invokeDown(BPlusTree.java:1703)
        at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.invoke(BPlusTree.java:1610)
        at org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl$CacheDataStoreImpl.invoke(IgniteCacheOffheapManagerImpl.java:1249)
        at org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl.invoke(IgniteCacheOffheapManagerImpl.java:352)
        at org.apache.ignite.internal.processors.cache.GridCacheMapEntry.storeValue(GridCacheMapEntry.java:3602)
        at org.apache.ignite.internal.processors.cache.GridCacheMapEntry.initialValue(GridCacheMapEntry.java:2774)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$IsolatedUpdater.receive(DataStreamerImpl.java:2125)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamerUpdateJob.call(DataStreamerUpdateJob.java:140)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.localUpdate(DataStreamProcessor.java:400)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.processRequest(DataStreamProcessor.java:305)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.access$000(DataStreamProcessor.java:60)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor$1.onMessage(DataStreamProcessor.java:90)
        at org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1556)
        at org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:1184)
        at org.apache.ignite.internal.managers.communication.GridIoManager.access$4200(GridIoManager.java:125)
        at org.apache.ignite.internal.managers.communication.GridIoManager$9.run(GridIoManager.java:1091)
        at org.apache.ignite.internal.util.StripedExecutor$Stripe.run(StripedExecutor.java:511)
        at java.lang.Thread.run(Thread.java:748)

I think the problem lies in the ignite server being initiated before the spark session, as in the official ignite examples. This server starts caching data that I am writing to the ignite cache and exceeds its default region size max (12Gb, which is different from the 20GB I defined for my yarn cluster). However, I don’t understand how the examples and documentation tells us to create an ignite server before the spark context (and session I assume). I understand that without this the application will hang once all the spark jobs are terminated, but I don’t understand the logic of having a server on the spark application that starts caching data. I’m very confused by this concept, and for now I have setup this ignite instance inside spark to be a client.

This is a strange behavior as all my ignite nodes (running on YARN) have 20GB defined for the default region (I changed it and verified it). This indicates me that the error must come from the ignite servers started on Spark (I think it is one on the driver and one per worker), as I did not changed the default region size in the ignite-config.xml of the spark application (defaults to 12GB as the error demonstrates). However, does this make sense? Should Spark throw out this error being its only goal to read and write data from/to ignite? Is Spark participating in caching any data and does this mean that I should set client mode in the ignite-config.xml of my application, despite the fact that the official examples are not using client mode?

Best regards, Carlos

1

1 Answers

2
votes

First, the Spark-Ignite connector already connects in client mode.

I'm going to assume that you have enough memory, but you can follow the example in the Capacity Planning guide to be sure.

However, I think the problem is that you're following the sample application a bit too closely(!). The sample -- so as to be self-contained -- includes both a server and a Spark client. If you already have an Ignite cluster, you don't need to start a server in your Spark client.

This is a slightly hacked down example from a real application (in Java, sorry):

    try (SparkSession spark = SparkSession
        .builder()
        .appName("AppName")
        .master(sparkMaster)
        .config("spark.executor.extraClassPath", igniteClassPath())
        .getOrCreate()) {

        // Get source DataFrame
        DataSet<Row> results = ....

        results.write()
            .outputMode("append")
            .format(IgniteDataFrameSettings.FORMAT_IGNITE())
            .option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), igniteCfgFile)
            .option(IgniteDataFrameSettings.OPTION_TABLE(), "Results")
            .option(IgniteDataFrameSettings.OPTION_STREAMER_ALLOW_OVERWRITE(), true)
            .option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS(), "name")
            .option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PARAMETERS(), "backups=1")
            .write();
    }

I didn't test, but you should get the idea: you need to provide a URL to an Ignite configuration file; it creates the client to connect to that server behind the scenes.