We are using Apache-spark with mongo-spark library(for connecting with MongoDB) and spark-redshift library(for connecting with Amazon Redshift DWH). And we are experiencing very bad performance for our job.
So I am hoping to get some help to understand whether we are doing anything wrong in our program or this is what we can expect with the infrastructure we have used.
We are running our job with MESOS resouce manager on 4 AWS EC2 nodes with following configuration with each node:
RAM: 16GB, CPU cores: 4, SSD: 200GB
We have 3 tables in Redshift cluster:
TABLE_NAME SCHEMA NUMBER_OF_ROWS
table1 (table1Id, table2FkId, table3FkId, ...) 50M
table2 (table2Id, phonenumber, email,...) 700M
table3 (table3Id, ...) 2K
and In MongoDB we have a collection having 35 million documents with a sample document as below (all email and phone numbers are unique here, no duplication):
{
"_id": "19ac0487-a75f-49d9-928e-c300e0ac7c7c",
"idKeys": {
"email": [
"[email protected]",
"[email protected]"
],
"phonenumber": [
"1111111111",
"2222222222"
]
},
"flag": false,
...
...
...
}
Which we are filtering and flattening(see the code at the end for mongo-spark aggregation pipeline) with spark-mongo connector to following format (as we need to JOIN data from Redshift and Mongo ON email OR phonenumber match for which another option available is array_contains() in spark SQL which is a bit slow) :
{"_id": "19ac0487-a75f-49d9-928e-c300e0ac7c7c", "email": "[email protected]", "phonenumber": null},
{"_id": "19ac0487-a75f-49d9-928e-c300e0ac7c7c","email": "[email protected]","phonenumber": null},
{"_id": "19ac0487-a75f-49d9-928e-c300e0ac7c7c","email": null,"phonenumber": "1111111111"},
{"_id": "19ac0487-a75f-49d9-928e-c300e0ac7c7c","email": null,"phonenumber": "22222222222"}
Spark computation steps (please refer the code below to understand these steps better):
- First we are loading all data from 3 Redshift tables into table1Dataset, table2Dataset, table3Dataset respectively by using spark-redshift connector.
- joining these 3 tables with SparkSQL and creating new Dataset redshiftJoinedDataset. (this operation independently finishes in 6 hours)
- loading MongoDB data into mongoDataset by using mongo-spark connector.
- joining mongoDataset and redshiftJoinedDataset. (here is the bottleneck as we need to join over 50million rows from redshift with over 100million flattened rows from mongodb)
Note:- Also mongo-spark seems to have some internal issue with its aggregation pipeline execution which might be making it very slow. - then we are doing some aggregation and grouping the data on finalId
here is the code for the steps mentioned above:
import com.mongodb.spark.MongoSpark;
import com.mongodb.spark.rdd.api.java.JavaMongoRDD;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.bson.Document;
import java.util.Arrays;
public class SparkMongoRedshiftTest {
private static SparkSession sparkSession;
private static SparkContext sparkContext;
private static SQLContext sqlContext;
public static void main(String[] args) {
sparkSession = SparkSession.builder().appName("redshift-spark-test").getOrCreate();
sparkContext = sparkSession.sparkContext();
sqlContext = new SQLContext(sparkContext);
Dataset table1Dataset = executeRedshiftQuery("(SELECT table1Id,table2FkId,table3FkId FROM table1)");
table1Dataset.createOrReplaceTempView("table1Dataset");
Dataset table2Dataset = executeRedshiftQuery("(SELECT table2Id,phonenumber,email FROM table2)");
table2Dataset.createOrReplaceTempView("table2Dataset");
Dataset table3Dataset = executeRedshiftQuery("(SELECT table3Id FROM table3");
table3Dataset.createOrReplaceTempView("table3Dataset");
Dataset redshiftJoinedDataset = sqlContext.sql(" SELECT a.*,b.*,c.*" +
" FROM table1Dataset a " +
" LEFT JOIN table2Dataset b ON a.table2FkId = b.table2Id" +
" LEFT JOIN table3Dataset c ON a.table3FkId = c.table3Id");
redshiftJoinedDataset.createOrReplaceTempView("redshiftJoinedDataset");
JavaMongoRDD<Document> userIdentityRDD = MongoSpark.load(getJavaSparkContext());
Dataset mongoDataset = userIdentityRDD.withPipeline(
Arrays.asList(
Document.parse("{$match: {flag: false}}"),
Document.parse("{ $unwind: { path: \"$idKeys.email\" } }"),
Document.parse("{$group: {_id: \"$_id\",emailArr: {$push: {email: \"$idKeys.email\",phonenumber: {$ifNull: [\"$description\", null]}}},\"idKeys\": {$first: \"$idKeys\"}}}"),
Document.parse("{$unwind: \"$idKeys.phonenumber\"}"),
Document.parse("{$group: {_id: \"$_id\",phoneArr: {$push: {phonenumber: \"$idKeys.phonenumber\",email: {$ifNull: [\"$description\", null]}}},\"emailArr\": {$first: \"$emailArr\"}}}"),
Document.parse("{$project: {_id: 1,value: {$setUnion: [\"$emailArr\", \"$phoneArr\"]}}}"),
Document.parse("{$unwind: \"$value\"}"),
Document.parse("{$project: {email: \"$value.email\",phonenumber: \"$value.phonenumber\"}}")
)).toDF();
mongoDataset.createOrReplaceTempView("mongoDataset");
Dataset joinRedshiftAndMongoDataset = sqlContext.sql(" SELECT a.* , b._id AS finalId " +
" FROM redshiftJoinedData AS a INNER JOIN mongoDataset AS b " +
" ON b.email = a.email OR b.phonenumber = a.phonenumber");
//aggregating joinRedshiftAndMongoDataset
//then storing to mysql
}
private static Dataset executeRedshiftQuery(String query) {
return sqlContext.read()
.format("com.databricks.spark.redshift")
.option("url", "jdbc://...")
.option("query", query)
.option("aws_iam_role", "...")
.option("tempdir", "s3a://...")
.load();
}
public static JavaSparkContext getJavaSparkContext() {
sparkContext.conf().set("spark.mongodb.input.uri", "");
sparkContext.conf().set("spark.sql.crossJoin.enabled", "true");
return new JavaSparkContext(sparkContext);
}
}
Time estimation to finish this job on above mentioned infrastructure is over 2 months.
So to summarize the joins quantitatively:
RedshiftDataWithMongoDataJoin => (RedshiftDataJoin) INNER_JOIN (MongoData)
=> (50M LEFT_JOIN 700M LEFT_JOIN 2K) INNER_JOIN (~100M)
=> (50M) INNER_JOIN (~100M)
Any help with this will be appreciated.