0
votes

We are using Apache-spark with mongo-spark library(for connecting with MongoDB) and spark-redshift library(for connecting with Amazon Redshift DWH). And we are experiencing very bad performance for our job.

So I am hoping to get some help to understand whether we are doing anything wrong in our program or this is what we can expect with the infrastructure we have used.

We are running our job with MESOS resouce manager on 4 AWS EC2 nodes with following configuration with each node:

RAM: 16GB, CPU cores: 4, SSD: 200GB

We have 3 tables in Redshift cluster:

TABLE_NAME  SCHEMA                                    NUMBER_OF_ROWS
table1      (table1Id, table2FkId, table3FkId, ...)    50M
table2      (table2Id, phonenumber, email,...)         700M
table3      (table3Id, ...)                            2K

and In MongoDB we have a collection having 35 million documents with a sample document as below (all email and phone numbers are unique here, no duplication):

{
  "_id": "19ac0487-a75f-49d9-928e-c300e0ac7c7c",
  "idKeys": {
    "email": [
      "[email protected]",
      "[email protected]"
    ],
    "phonenumber": [
      "1111111111",
      "2222222222"
    ]
  },
  "flag": false,
  ...
  ...
  ...
}

Which we are filtering and flattening(see the code at the end for mongo-spark aggregation pipeline) with spark-mongo connector to following format (as we need to JOIN data from Redshift and Mongo ON email OR phonenumber match for which another option available is array_contains() in spark SQL which is a bit slow) :

  {"_id": "19ac0487-a75f-49d9-928e-c300e0ac7c7c", "email": "[email protected]", "phonenumber": null},
  {"_id": "19ac0487-a75f-49d9-928e-c300e0ac7c7c","email": "[email protected]","phonenumber": null},
  {"_id": "19ac0487-a75f-49d9-928e-c300e0ac7c7c","email": null,"phonenumber": "1111111111"},
  {"_id": "19ac0487-a75f-49d9-928e-c300e0ac7c7c","email": null,"phonenumber": "22222222222"}

Spark computation steps (please refer the code below to understand these steps better):

  1. First we are loading all data from 3 Redshift tables into table1Dataset, table2Dataset, table3Dataset respectively by using spark-redshift connector.
  2. joining these 3 tables with SparkSQL and creating new Dataset redshiftJoinedDataset. (this operation independently finishes in 6 hours)
  3. loading MongoDB data into mongoDataset by using mongo-spark connector.
  4. joining mongoDataset and redshiftJoinedDataset. (here is the bottleneck as we need to join over 50million rows from redshift with over 100million flattened rows from mongodb)
    Note:- Also mongo-spark seems to have some internal issue with its aggregation pipeline execution which might be making it very slow.
  5. then we are doing some aggregation and grouping the data on finalId

here is the code for the steps mentioned above:

import com.mongodb.spark.MongoSpark;
import com.mongodb.spark.rdd.api.java.JavaMongoRDD;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.bson.Document;

import java.util.Arrays;

public class SparkMongoRedshiftTest {

    private static SparkSession sparkSession;
    private static SparkContext sparkContext;
    private static SQLContext sqlContext;

    public static void main(String[] args) {

        sparkSession = SparkSession.builder().appName("redshift-spark-test").getOrCreate();
        sparkContext = sparkSession.sparkContext();
        sqlContext = new SQLContext(sparkContext);


        Dataset table1Dataset = executeRedshiftQuery("(SELECT table1Id,table2FkId,table3FkId FROM table1)");
        table1Dataset.createOrReplaceTempView("table1Dataset");

        Dataset table2Dataset = executeRedshiftQuery("(SELECT table2Id,phonenumber,email FROM table2)");
        table2Dataset.createOrReplaceTempView("table2Dataset");

        Dataset table3Dataset = executeRedshiftQuery("(SELECT table3Id FROM table3");
        table3Dataset.createOrReplaceTempView("table3Dataset");


        Dataset redshiftJoinedDataset = sqlContext.sql(" SELECT a.*,b.*,c.*" +
                                                       " FROM table1Dataset a " +
                                                       " LEFT JOIN table2Dataset b ON a.table2FkId = b.table2Id" +
                                                       " LEFT JOIN table3Dataset c ON a.table3FkId = c.table3Id");
        redshiftJoinedDataset.createOrReplaceTempView("redshiftJoinedDataset");

        JavaMongoRDD<Document> userIdentityRDD = MongoSpark.load(getJavaSparkContext());
        Dataset mongoDataset = userIdentityRDD.withPipeline(
                Arrays.asList(
                        Document.parse("{$match: {flag: false}}"),
                        Document.parse("{ $unwind: { path: \"$idKeys.email\"  } }"),
                        Document.parse("{$group: {_id: \"$_id\",emailArr: {$push: {email: \"$idKeys.email\",phonenumber: {$ifNull: [\"$description\", null]}}},\"idKeys\": {$first: \"$idKeys\"}}}"),
                        Document.parse("{$unwind: \"$idKeys.phonenumber\"}"),
                        Document.parse("{$group: {_id: \"$_id\",phoneArr: {$push: {phonenumber: \"$idKeys.phonenumber\",email: {$ifNull: [\"$description\", null]}}},\"emailArr\": {$first: \"$emailArr\"}}}"),
                        Document.parse("{$project: {_id: 1,value: {$setUnion: [\"$emailArr\", \"$phoneArr\"]}}}"),
                        Document.parse("{$unwind: \"$value\"}"),
                        Document.parse("{$project: {email: \"$value.email\",phonenumber: \"$value.phonenumber\"}}")
                )).toDF();
        mongoDataset.createOrReplaceTempView("mongoDataset");

        Dataset joinRedshiftAndMongoDataset = sqlContext.sql(" SELECT a.* , b._id AS finalId " +
                                                             " FROM redshiftJoinedData AS a INNER JOIN mongoDataset AS b " +
                                                             " ON b.email = a.email OR b.phonenumber = a.phonenumber");

        //aggregating joinRedshiftAndMongoDataset
        //then storing to mysql
    }

    private static Dataset executeRedshiftQuery(String query) {
        return sqlContext.read()
                .format("com.databricks.spark.redshift")
                .option("url", "jdbc://...")
                .option("query", query)
                .option("aws_iam_role", "...")
                .option("tempdir", "s3a://...")
                .load();
    }

    public static JavaSparkContext getJavaSparkContext() {
        sparkContext.conf().set("spark.mongodb.input.uri", "");
        sparkContext.conf().set("spark.sql.crossJoin.enabled", "true");
        return new JavaSparkContext(sparkContext);
    }
}

Time estimation to finish this job on above mentioned infrastructure is over 2 months.

So to summarize the joins quantitatively:

RedshiftDataWithMongoDataJoin => (RedshiftDataJoin)                INNER_JOIN (MongoData)
                              => (50M LEFT_JOIN 700M LEFT_JOIN 2K) INNER_JOIN (~100M)
                              => (50M)                             INNER_JOIN (~100M)

Any help with this will be appreciated.

1
How long does Stage 3 loading the data from MongoDB take? The partitioner shouldn't be the bottleneck at all and probably isnt but what are your figures? Have you checked if it is quicker to do the data transformations in MongoDB via the Aggregation Framework or do them natively in Spark? - Ross
Hi @Ross I haven't tested the mongo aggregation pipeline separately on entire data but I have tested that on limited data (just on 1000 documents) both from Mongo console and through spark-mongo connector. The loading seems faster but joining and internal shuffling may be making it slow. The job is creating around 16000 partitions. - Amit Valse
@Ross as you are saying "The partitioner shouldn't be the bottleneck at all and probably isnt" but as you can see in my question that I am able to perform LEFT_JOIN of (50M X 700M X 2K) on data loaded from Redshift in 6 hours, and these 50M rows are then being joined with 100M mongo documents. It may be possible that the bottleneck is not happening because of mongo-spark aggregation, but I am not able to find the problem anywhere else. I would really appreciate your help regarding this problem. - Amit Valse
And also as I was reading your comments on this spark-mongo aggregation pipeline issue you were saying that the aggregation pipeline gets ignored, so in that case is the entire data gets loaded in memory and then the pipeline executes? OR the pipeline gets ignored completely? - Amit Valse
The partitioner ignores the pipeline in the current release, meaning you may get empty partitions, the processing cost of those will be small but in the next release it wont happen. However, you will only load into Spark the data you need to action. Shuffling of data could be an issue what does the DAG look like? databricks.com/blog/2015/06/22/… - Ross

1 Answers

2
votes

So after a lot of investigation we came to know that 90% of data in table2 had either email or phonenumber null and I had missed to handle joins on null values in the query.

So that was the main problem for this performance bottleneck.

After fixing this problem the job now runs within 2 hours.

So there are no issues with spark-redshift or mongo-spark those are performing exceptionally well :)