2
votes

I'm having trouble reading data from AWS Redshift into my Spark cluster. The read is timing out and causing the spark job to fail. I'm using the following to retrieve the data into a dataframe:

def retrieveFromDate(date: String): org.apache.spark.sql.DataFrame = {
    val query = "tstamp >= '2018-01-01' and tstamp < '2018-01-02'"
    val predicates = Array[String](query)

    val props = new Properties()
    props.put("user", "username")
    props.put("password", "password")

    spark.read
        .jdbc(url=jdbcURL, 
              table="myschema.mytable", 
              predicates=predicates,
              connectionProperties=props)
}

The following query directly in SQL quickly returns 24 million rows:

select * from myschema.mytable
WHERE tstamp >= '2018-08-01'
  AND tstamp < '2018-08-02';

In Spark, the job fails the moment I perform any action on the dataframe, including just count.

If I provide an additional predicate, e.g. specifying another WHERE clause so that the result set is very small, everything works perfectly. Why is this query so slow in spark when it works just fine directly in SQL? Is there anything I can do to load a result table this large from redshift into spark?

My development AWS EMR cluster contains an M4.xlarge master and 2 M3.xlarge workers. That's about 15GB ram and 8 cpu cores per worker.

1

1 Answers

2
votes
>> The read is timing out and causing the spark job to fail

The predicates parameter just has

val query = "tstamp >= '2018-01-01' and tstamp < '2018-01-02'"

so, the result dataframe is partition of '1' as the result of single task and it holds 24 million. There is no parallelism in it.

could you alter and provide the predicates that divides the 24M data into multiple chunks. So, read can be parallelized ?

something like this,

val query = Arry[String]("column >= value1 and column < value2", "column >= value2 and column < value3","column >= value3 and column < value4", .......)

or

if you dont want to feed all the predicates, change the jdbc method to below type and provide lowerBound,upperBound & numPartitions and it is again subjected to values in that partition column uniformly distributed across its range or not.

public Dataset<Row> jdbc(String url,
                String table,
                String columnName,
                long lowerBound,
                long upperBound,
                int numPartitions,
                java.util.Properties connectionProperties)

more details here