2
votes

Looking for Spark understanding...

I am loading large amounts of data from MySQL into Spark, and it keeps dying :-(

org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:156)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)

Here is my code

val query =
  s"""
     (
      select 
      mod(act.AccountID, ${parts}) part,
      p.Value name, event.EventTime eventTime, act.AccountID accountID, act.UserGoal goalID,event.ActivityID activityID, id.CountryID countryID, arr.ConsumerID consumerID
      from DimIdentity as id
      join FactArrival as arr on  arr.IdentityID=id.IdentityID
      join FactActivityEvent as event on event.ArrivalID=arr.ArrivalID
      join DimAccount as act on  act.AccountID=event.AccountID
      join DimAccountRoleTypeMatch as role on role.AccountID=act.AccountID
      join DimDateTime as d on event.DateTimeID=d.DateTimeID
      join DimProperty as p on p.PropertyID=event.EventTypeID
      where
        id.Botness=0 and 
        d.DayOfYear>=${from} and d.DayOfYear<${to} and d.Year=${year} and
        (role.AccountRoleTypeID=1 or role.AccountRoleTypeID=2)
  ) a
  """.stripMargin

val events = sqlContext.read.format("jdbc").
  option("url", sqlURL).
  option("driver", "com.mysql.jdbc.Driver").
  option("useUnicode", "true").
  option("zeroDateTimeBehavior", "round").
  option("continueBatchOnError", "true").
  option("useSSL", "false").
  option("dbtable", query).
  option("user", sqlUser).
  option("password", sqlPassword).
  option("partitionColumn", "part").
  option("lowerBound", "0").
  option("upperBound", s"${parts - 1}").
  option("numPartitions", s"${parts}").
  load().as[Activity].toDF

Note that I am using partitionColumn, lowerBound, upperBound, numPartitions as recommended in other answers

I tried setting partitions from 4 to 512, but it always dies. Reading the same amount of data from file or Mongo has no problem. Is this an issue with the MySQL connector? Is there a solution?

Note that I found one answer that suggests I avoid Spark, and read the query into a file on HDFS, then load the file

Multiple Partitions in Spark RDD

Is this really the best way?

3
Sorry, did not realize I was supposed to do that. I'll fixopus111
If solution works for you you should. It is both a reward for the poster and sign for other user that it is a valid solution.user6022341

3 Answers

1
votes

Here is the answer I got to...

For me, the answer is to avoid the mysql-connection for Spark :-( I found it too difficult to avoid the crashing caused by partitioning. Mysql-connection requires hand tuning of the partitions, and does not yield any increase in speed. Much easier to write non-Spark code that reads the data into large text files, and the call Spark on the text file. Spark is really good with most data sources, but not mysql... at least not yet

1
votes

You can try increasing the fetch size without using dynamic partitioning for read.

sqlContext.read.options(options).jdbc(
url=sqlURL, table=query, columnName="part",
fetchSize=1000000,connectionProperties=new java.util.Properties())
0
votes

You can read data by changing sql query with limit offset. Then use shell script to automate the task using for loop. This worked for me