5
votes

tldr; Can't use Kinesis Spark Streaming integration, because it receives no data.

  1. Testing stream is set up, nodejs app sends 1 simple record per second.
  2. Standard Spark 1.5.2 cluster is set up with master and worker nodes (4 cores) with docker-compose, AWS credentials in environment
  3. spark-streaming-kinesis-asl-assembly_2.10-1.5.2.jar is downloaded and added to classpath
  4. job.py or job.jar (just reads and prints) submitted.
  5. Everything seems to be okay, but no records what-so-ever are received.

From time to time the KCL Worker thread says "Sleeping ..." - it might be broken silently (I checked all the stderr I could find, but no hints). Maybe swallowed OutOfMemoryError... but I doubt that, because of the amount of 1 record per second.


    -------------------------------------------
    Time: 1448645109000 ms
    -------------------------------------------

    15/11/27 17:25:09 INFO JobScheduler: Finished job streaming job 1448645109000 ms.0 from job set of time 1448645109000 ms
    15/11/27 17:25:09 INFO KinesisBackedBlockRDD: Removing RDD 102 from persistence list
    15/11/27 17:25:09 INFO JobScheduler: Total delay: 0.002 s for time 1448645109000 ms (execution: 0.001 s)
    15/11/27 17:25:09 INFO BlockManager: Removing RDD 102
    15/11/27 17:25:09 INFO KinesisInputDStream: Removing blocks of RDD KinesisBackedBlockRDD[102] at createStream at NewClass.java:25 of time 1448645109000 ms
    15/11/27 17:25:09 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer(1448645107000 ms)
    15/11/27 17:25:09 INFO InputInfoTracker: remove old batch metadata: 1448645107000 ms
    15/11/27 17:25:10 INFO JobScheduler: Added jobs for time 1448645110000 ms
    15/11/27 17:25:10 INFO JobScheduler: Starting job streaming job 1448645110000 ms.0 from job set of time 1448645110000 ms
    -------------------------------------------
    Time: 1448645110000 ms
    -------------------------------------------
          <----- Some data expected to show up here!
    15/11/27 17:25:10 INFO JobScheduler: Finished job streaming job 1448645110000 ms.0 from job set of time 1448645110000 ms
    15/11/27 17:25:10 INFO JobScheduler: Total delay: 0.003 s for time 1448645110000 ms (execution: 0.001 s)
    15/11/27 17:25:10 INFO KinesisBackedBlockRDD: Removing RDD 103 from persistence list
    15/11/27 17:25:10 INFO KinesisInputDStream: Removing blocks of RDD KinesisBackedBlockRDD[103] at createStream at NewClass.java:25 of time 1448645110000 ms
    15/11/27 17:25:10 INFO BlockManager: Removing RDD 103
    15/11/27 17:25:10 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer(1448645108000 ms)
    15/11/27 17:25:10 INFO InputInfoTracker: remove old batch metadata: 1448645108000 ms
    15/11/27 17:25:11 INFO JobScheduler: Added jobs for time 1448645111000 ms
    15/11/27 17:25:11 INFO JobScheduler: Starting job streaming job 1448645111000 ms.0 from job set of time 1448645111000 ms

Please let me know any hints, I'd really like to use Spark for real time analytics... everything but this small detail of not receiving data :) seems to be ok.

PS: I find strange that somehow Spark ignores my settings of Storage level (mem and disk 2) and Checkpoint interval (20,000 ms)


    15/11/27 17:23:26 INFO KinesisInputDStream: metadataCleanupDelay = -1
    15/11/27 17:23:26 INFO KinesisInputDStream: Slide time = 1000 ms
    15/11/27 17:23:26 INFO KinesisInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
    15/11/27 17:23:26 INFO KinesisInputDStream: Checkpoint interval = null
    15/11/27 17:23:26 INFO KinesisInputDStream: Remember duration = 1000 ms
    15/11/27 17:23:26 INFO KinesisInputDStream: Initialized and validated org.apache.spark.streaming.kinesis.KinesisInputDStream@74b21a6

Source code (java):


    public class NewClass {

        public static void main(String[] args) {
            SparkConf conf = new SparkConf().setAppName("appname").setMaster("local[3]");
            JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));
            JavaReceiverInputDStream kinesisStream = KinesisUtils.createStream(
                    ssc, "webassist-test", "test", "https://kinesis.us-west-1.amazonaws.com", "us-west-1",
                    InitialPositionInStream.LATEST,
                    new Duration(20000),
                    StorageLevel.MEMORY_AND_DISK_2()
            );
            kinesisStream.print();
            ssc.start();
            ssc.awaitTermination();
        }
    }

Python code (tried both pprinting before and sending to MongoDB):


    from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
    from pyspark import SparkContext, StorageLevel
    from pyspark.streaming import StreamingContext
    from sys import argv

    sc = SparkContext(appName="webassist-test")
    ssc = StreamingContext(sc, 5)

    stream = KinesisUtils.createStream(ssc,
         "appname",
         "test",
         "https://kinesis.us-west-1.amazonaws.com",
         "us-west-1",
         InitialPositionInStream.LATEST,
         5,
         StorageLevel.MEMORY_AND_DISK_2)

    stream.pprint()
    ssc.start()
    ssc.awaitTermination()

Note: I also tried sending data to MongoDB with stream.foreachRDD(lambda rdd: rdd.foreachPartition(send_partition)) but not pasting it here, since you'd need a MongoDB instance and it's not related to the problem - no records come in on the input already.

One more thing - the KCL never commits. The corresponding DynamoDB looks like this:

leaseKey  checkpoint  leaseCounter  leaseOwner  ownerSwitchesSinceCheckpoint
shardId-000000000000  LATEST  614  localhost:d92516...  8

The command used for submitting:

spark-submit --executor-memory 1024m --master spark://IpAddress:7077 /path/test.py

In the MasterUI I can see:

 Input Rate
   Receivers: 1 / 1 active
   Avg: 0.00 events/sec
 KinesisReceiver-0
   Avg: 0.00 events/sec
...
 Completed Batches (last 76 out of 76)

Thanks for any help!

2
Can you paste the source code for Stream context creation, DStream transformations and printing, please? We can take a look.MiguelPeralvo
Having the same issue as well.Richard Clayton

2 Answers

2
votes

I've had issues with no record activity being shown in Spark Streaming in the past when connecting with Kinesis.

I'd try these things to get more feedback/a different behaviour from Spark:

  1. Make sure that you force the evaluation of your DStream transformation operations with output operations like foreachRDD, print, saveas...

  2. Create a new KCL Application in DynamoDB using a new name for the "Kinesis app name" parameter when creating the stream or purge the existing one.

  3. Switch between TRIM_HORIZON and LATEST for initial position when creating the stream.

  4. Restart the context when you try these changes.

EDIT after code was added: Perhaps I'm missing something obvious, but I cannot spot anything wrong with your source code. Do you have n+1 cpus running this application (n is the number of Kinesis shards)?

If you run a KCL application (Java/Python/...) reading from the shards in your docker instance, does it work? Perhaps there's something wrong with your network configuration, but I'd expect some error messages pointing it out.

If this is important enough / you have a bit of time, you can quickly implement kcl reader in your docker instance and will allow you to compare with your Spark Application. Some urls:

Python

Java

Python example

Another option is to run your Spark Streaming application in a different cluster and to compare.

P.S.: I'm currently using Spark Streaming 1.5.2 with Kinesis in different clusters and it processes records / shows activity as expected.

0
votes

I was facing this issue when I used the suggested documentation and examples for the same, the following scala code works fine for me(you can always use java instead)--

val conf = ConfigFactory.load

val config = new SparkConf().setAppName(conf.getString("app.name"))

val ssc = new StreamingContext(config, Seconds(conf.getInt("app.aws.batchDuration")))

val stream = if (conf.hasPath("app.aws.key") && conf.hasPath("app.aws.secret")){
logger.info("Specifying AWS account using credentials.")
    KinesisUtils.createStream(
      ssc,
      conf.getString("app.name"),
      conf.getString("app.aws.stream"),
      conf.getString("app.aws.endpoint"),
      conf.getString("app.aws.region"),
      InitialPositionInStream.LATEST,
      Seconds(conf.getInt("app.aws.batchDuration")),
      StorageLevel.MEMORY_AND_DISK_2,
      conf.getString("app.aws.key"),
      conf.getString("app.aws.secret")
    )
  } else {
    logger.info("Specifying AWS account using EC2 profile.")
    KinesisUtils.createStream(
      ssc,
      conf.getString("app.name"),
      conf.getString("app.aws.stream"),
      conf.getString("app.aws.endpoint"),
      conf.getString("app.aws.region"),
      InitialPositionInStream.LATEST,
      Seconds(conf.getInt("app.aws.batchDuration")),
      StorageLevel.MEMORY_AND_DISK_2
    )
  }

stream.foreachRDD((rdd: RDD[Array[Byte]], time) => {
      val rddstr: RDD[String] = rdd
         .map(arrByte => new String(arrByte))
      rddstr.foreach(x => println(x))
}