4
votes

I'm writing a Spark job in Scala that reads in parquet files on S3, does some simple transforms, and then saves them to a DynamoDB instance. Each time it runs we need to create a new table in Dynamo so I've written a Lambda function which is responsible for table creation. The first thing my Spark job does is generates a table name, invokes my Lambda function (passing the new table name to it), waits for the table to be created, and then proceeds normally with the ETL steps.

However it looks as though my Lambda function is consistently being invoked twice. I cannot explain that. Here's a sample of the code:

def main(spark: SparkSession, pathToParquet: String) {

  // generate a unique table name
  val tableName = generateTableName()

  // call the lambda function
  val result = callLambdaFunction(tableName)

  // wait for the table to be created
  waitForTableCreation(tableName)

  // normal ETL pipeline
  var parquetRDD = spark.read.parquet(pathToParquet)
  val transformedRDD = parquetRDD.map((row: Row) => transformData(row), encoder=kryo[(Text, DynamoDBItemWritable)])
  transformedRDD.saveAsHadoopDataset(getConfiguration(tableName))
  spark.sparkContext.stop()
}

The code to wait for table creation is pretty-straightforward, as you can see:

def waitForTableCreation(tableName: String) {
  val client: AmazonDynamoDB = AmazonDynamoDBClientBuilder.defaultClient()
  val waiter: Waiter[DescribeTableRequest] = client.waiters().tableExists()
  try {
    waiter.run(new WaiterParameters[DescribeTableRequest](new DescribeTableRequest(tableName)))
  } catch {
      case ex: WaiterTimedOutException =>
        LOGGER.error("Timed out waiting to create table: " + tableName)
        throw ex
      case t: Throwable => throw t
  }
}

And the lambda invocation is equally simple:

def callLambdaFunction(tableName: String) {
  val myLambda = LambdaInvokerFactory.builder()
    .lambdaClient(AWSLambdaClientBuilder.defaultClient)
    .lambdaFunctionNameResolver(new LambdaByName(LAMBDA_FUNCTION_NAME))
    .build(classOf[MyLambdaContract])
  myLambda.invoke(new MyLambdaInput(tableName))
}

Like I said, when I run spark-submit on this code, it definitely does hit the Lambda function. But I can't explain why it hits it twice. The result is that I get two tables provisioned in DynamoDB.

The waiting step also seems to fail within the context of running this as a Spark job. But when I unit-test my waiting code it seems to work fine on its own. It successfully blocks until the table is ready.

At first I theorized that perhaps spark-submit was sending this code to all of the worker nodes and they were independently running the whole thing. Initially I had a Spark cluster with 1 master and 2 workers. However I tested this out on another cluster with 1 master and 5 workers, and there again it hit the Lambda function exactly twice, and then apparently failed to wait for table creation because it dies shortly after invoking the Lambdas.

Does anyone have any clues as to what Spark might be doing? Am I missing something obvious?

UPDATE: Here's my spark-submit args which are visible on the Steps tab of EMR.

spark-submit --deploy-mode cluster --class com.mypackage.spark.MyMainClass s3://my-bucket/my-spark-job.jar

And here's the code for my getConfiguration function:

def getConfiguration(tableName: String) : JobConf = {
  val conf = new Configuration()
  conf.set("dynamodb.servicename", "dynamodb")
  conf.set("dynamodb.input.tableName", tableName)
  conf.set("dynamodb.output.tableName", tableName)
  conf.set("dynamodb.endpoint", "https://dynamodb.us-east-1.amazonaws.com")
  conf.set("dynamodb.regionid", "us-east-1")
  conf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
  conf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
  new JobConf(conf)
}

Also here is a Gist containing some of the exception logs I see when I try to run this.

3
This should not happen - as long as that code is part of the driver, it gets executed only once. How is that main() function called? It could be a problem in the AWS SDK itself. You can use the Java remote debugging capability to debug code running in Spark client mode. - Hristo Iliev
Interesting case! You can check what happens with your application in the UI? I suppose you're submitting the app with EMR, right? If so, you can check "Application history" tab and verify whether the job was executed 1 or 2 times (look for "Attempts" keyword). Check also if you don't see any exception in the logs. Tip#2: check if your AWS clients aren't asynchronous. Maybe there are some concurrency issues. Tip#3: add assert(...checkIfMyTableExists(), "Table doesn't exist") after Waiter's code to see what happens Tip#4 you can share your Spark config and spark-submit args? - Bartosz Konieczny
Thanks guys. @bartosz25 that was a good thought, but under "Application history," I only see 1 execution per attempt, so it's not doubling up there. The way I've been invoking this has been to add a "Step" in the Steps tab of EMR with Step Type set to "Spark application," and I will edit my question to add the effective spark-submit args above. As far as Spark config, I can also add in the body of my getConfiguration function. Regarding synchronicity, I am NOT using the async version which involve Futures -- I'm just using the nomral/synchronous version. - soapergem
what i feel is happening here is when the first call is made your application is not waiting for the table to be created and it is firing the insert which is failing and aborting the job. subsequently the command from you first job creates the table in dynamo DB and your second run the insert works as the table is already created. however your second run also creates another new table. what i would suggest is put a boolean return to your waitforTableCreation and and see force your insert code on if the table is created. just to check the validity of that function. - Aaron
UPDATE: It looks like my code might work if I run it in "client" deploy mode, instead of "cluster" deploy mode? Does that offer any hints to anyone here? That mode is less convenient to deploy because whereas cluster mode allows you to store your JAR files in S3, client mode requires that you copy the files onto the master disk beforehand as some sort of bootstrap operation. - soapergem

3 Answers

3
votes

Thanks @soapergem for adding logging and options. I add an answer (a try one) since it may be a little bit longer than a comment :)

To wrap-up:

For your last question:

It looks like my code might work if I run it in "client" deploy mode, instead of "cluster" deploy mode? Does that offer any hints to anyone here?

For more information about the difference, please check https://community.hortonworks.com/questions/89263/difference-between-local-vs-yarn-cluster-vs-yarn-c.html In your case, it looks like the machine executing spark-submit in client mode has different IAM policies than the EMR jobflow. My supposition here is that your jobflow role is not allowed to dynamodb:Describe* and that's why you're getting the exception with 500 code (from your gist):

Caused by: com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException: Requested resource not found: Table: EmrTest_20190708143902 not found (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ResourceNotFoundException; Request ID: V0M91J7KEUVR4VM78MF5TKHLEBVV4KQNSO5AEMVJF66Q9ASUAAJG)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.doInvoke(AmazonDynamoDBClient.java:4243)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:4210)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.executeDescribeTable(AmazonDynamoDBClient.java:1890)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.describeTable(AmazonDynamoDBClient.java:1857)
    at org.apache.hadoop.dynamodb.DynamoDBClient$1.call(DynamoDBClient.java:129)
    at org.apache.hadoop.dynamodb.DynamoDBClient$1.call(DynamoDBClient.java:126)
at org.apache.hadoop.dynamodb.DynamoDBFibonacciRetryer.runWithRetry(DynamoDBFibonacciRetryer.java:80)

To confirm this hypothesis, you an execute your part creating the table and waiting for creation locally (no Spark code here, just a simple java command of your main function) and:

  • for the first execution ensure that you have all permissions. IMO it will be dynamodb:Describe* on Resources: * (if it's the reason, AFAIK you should use somthing Resources: Test_Emr* in production for principle of least privilege )
  • for the 2nd execution remove dynamodb:Describe* and check whether you're getting the same stack trace like in the gist
3
votes

I encountered the same problem in cluster mode too (v2.4.0). I workaround it by launching my apps programmatically using SparkLauncher instead of using spark-submit.sh. You could move your lambda logic into your main method that starts your spark app like this:

def main(args: Array[String]) = {
    // generate a unique table name
    val tableName = generateTableName()

    // call the lambda function
    val result = callLambdaFunction(tableName)

    // wait for the table to be created
    waitForTableCreation(tableName)

    val latch = new CountDownLatch(1);

    val handle = new SparkLauncher(env)
        .setAppResource("/path/to/spark-app.jar")
        .setMainClass("com.company.SparkApp")
        .setMaster("yarn")
        .setDeployMode("cluster")
        .setConf("spark.executor.instances", "2")
        .setConf("spark.executor.cores", "2")
        // other conf ... 
        .setVerbose(true)
        .startApplication(new SparkAppHandle.Listener {
            override def stateChanged(sparkAppHandle: SparkAppHandle): Unit = {
                latch.countDown()
            }

            override def infoChanged(sparkAppHandle: SparkAppHandle): Unit = {

            }
        })  

    println("app is launching...")
    latch.await()
    println("app exited")
}
2
votes

your spark job starts before the table is actually created because defining operations one by one doesn't mean they will wait until previous one is finished

you need to change the code so that block related to spark is starting after table is created, and in order to achieving it you have to either use for-comprehension that insures every step is finished or put your spark pipeline into the callback of waiter called after the table is created (if you have any, hard to tell)

you can also use andThen or simple map

the main point is that all the lines of code written in your main are executed one by one immediately without waiting for previous one to finish