0
votes

I am working with pyspark connected to an AWS instance (r5d.xlarge 4 vCPUs 32 GiB) running a data base 25 GB, when I run some tables I got the error:

Py4JJavaError: An error occurred while calling o57.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.OutOfMemoryError: GC overhead limit exceeded

I tried to find out the error for myself but unfortunately the is not much information regarding this issue.

code


from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local').\
     config('spark.jars.packages', 'mysql:mysql-connector-java:5.1.44').\
     appName('test').getOrCreate()

df = spark.read.format('jdbc').\
        option('url', 'jdbc:mysql://xx.xxx.xx.xxx:3306').\
        option('driver', 'com.mysql.jdbc.Driver').\
        option('user', 'xxxxxxxxxxx').\
        option('password', 'xxxxxxxxxxxxxxxxxxxx').\
        option('dbtable', 'dbname.tablename').\
        load()

  df.printSchema()

here I get the printSchema but then:


df_1 = df.select(['col1', 'col2', 'col3', 'col4', 
                  'col4', 'col5', 'col6']).show()

Py4JJavaError: An error occurred while calling o57.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task            
  in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 
  0.0 (TID 0, localhost, executor driver): java.lang.OutOfMemoryError: GC 
  overhead limit exceeded

Anybody an idea how can I solve this problem?

1
show() doesn't return an object that could be assigned. What values have you set for spark.executor.memory and spark.driver.memory?cronoik
I did not set any of them, but I will try adding to the code the following: conf.set("spark.executor.memory", "4g") conf.set("spark.driver.memory", "4g") conf.set("spark.cores.max", "2") Do you have any recommendation for numbers I should use?fachc
unfortunately does not work with the configuration above, the notebook keep running until the server is disconnected.... any suggestions ?fachc
Is the error message still the same? Check the envionment tab of the spark ui if the values are assigned correctly.cronoik
check your partitions df.rdd.getNumPartitions() ... since you are reading from JDBC you will only get 1 partition hence you need to create a row boundary so the data can be split and distributed ... right now you are trying to process 25GB on a single machine with no parallelism hence OOM errorthePurplePython

1 Answers

0
votes

Here is a method to parallelize serial JDBC reads across multiple spark workers ... you can use this as a guide to customize it to your source data ... basically the main prerequisite is to have some kind of unique key to split on.

Please refer to this documentation specifically parameters partitionColumn, lowerBound, upperBound, numPartitions

https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

Some code examples:

# find min and max for column used to split on
from pyspark.sql.functions import min, max

minDF = df.select(min("id")).first()[0] # replace 'id' with your key col
maxDF = df.select(max("id")).first()[0] # replace 'id' with your key col
numSplits = 125 # you will need to tailor this value to your dataset ... you mentioned your source as 25GB so try 25000 MB / 200 MB = 125 partitions

print("df min: {}\df max: {}".format(minDF, maxDF))

# your code => add a few more parameters
df = spark.read.format('jdbc').\
        option('url', 'jdbc:mysql://xx.xxx.xx.xxx:3306').\
        option('driver', 'com.mysql.jdbc.Driver').\
        option('user', 'xxxxxxxxxxx').\
        option('password', 'xxxxxxxxxxxxxxxxxxxx').\
        option('dbtable', 'dbname.tablename').\
        option('partitionColumn', 'id').\ # col to split on
        option('lowerBound', minDF).\ # min value
        option('upperBound', maxDF).\ # max value
        option('numPartitions', numSplits).\ # num of splits (partitions) spark will distribute across executor workers
        load()

print(df.rdd.getNumPartitions())

Another example connection string => if you are using spark 2.4 / refer to this doc it uses some cleaner code

https://docs.databricks.com/spark/latest/data-sources/sql-databases.html#manage-parallelism

sourceDF = spark.read.jdbc(
  url=jdbcUrl, 
  table="dbname.tablename",
  column='"id"',
  lowerBound=minDF,
  upperBound=maxDF,
  numPartitions=125,
  properties=connectionProps
)