1
votes

I am trying to read a Mysql table in PySpark using JDBC read. The tricky part here is that the table is considerably big, and therefore causes our Spark executor to crash when it does a non-partitioned vanilla read of the table.

Hence, the objective function is basically that we want to do a partitioned read of the table. Couple of things that we have been trying -

  1. We looked at the "numPartitions-partitionColumn-lowerBound-upperBound" combo. This does not work for us since our indexing key of the original table is a string, and this only works with integral types.
  2. The other alternative that is suggested in the docs is the predicate option. This does not seem to work for us, in the sense that the number of partitions seem to still be 1, instead of the number of predicates that we are sending.

The code snippet that we are using is as follows -

input_df = self._Flow__spark.read \
            .format("jdbc") \
            .option("url", url) \
            .option("user", config.user) \
            .option("password", config.password) \
            .option("driver", "com.mysql.cj.jdbc.Driver") \
            .option("dbtable", "({}) as query ".format(get_route_surge_details_query(start_date, end_date))) \
            .option("predicates", ["recommendation_date = '2020-11-14'",
                                   "recommendation_date = '2020-11-15'",
                                   "recommendation_date = '2020-11-16'",
                                   "recommendation_date = '2020-11-17'",
                                   ]) \
            .load()

It seems to be doing a full table scan ( non-partitioned ), whilst completely ignoring the passed predicates. Would be great to get some help on this.

1
I think predicates can not be an option. You have to use the jdbc function instead. github.com/apache/spark/blob/v2.4.4/sql/core/src/main/scala/org/… - pPanda_beta

1 Answers

2
votes

Try the following :

spark_session\
  .read\
  .jdbc(url=url,
        table= "({}) as query ".format(get_route_surge_details_query(start_date, end_date)),
        predicates=["recommendation_date = '2020-11-14'",
                    "recommendation_date = '2020-11-15'",
                    "recommendation_date = '2020-11-16'",
                    "recommendation_date = '2020-11-17'"],
        properties={
          "user": config.user,
          "password": config.password,
          "driver": "com.mysql.cj.jdbc.Driver"
        }
)

Verify the partitions by

df.rdd.getNumPartitions() # Should be 4

I found this after digging the docs at https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=jdbc#pyspark.sql.DataFrameReader.jdbc