0
votes

The goal: transpose a set of rows collected from 30,000 tables in a large, high-latency datastore:

  • extract a hand-full of rows from each table with spark.sql() into a DataFrame
  • convert each column into a separate CSV document, containing an array of the column values
  • write this document to a file server

My approach to solving the problem is below, and half-works.

  • for each table in the datastore, spark.sql() rows into a DataFrame
  • collect and extract each column with the following:
def getColumn(df: DataFrame, columnName: String): Seq[Option[String]] = {
      df.select(columnName).collect().map(_ (0)).toSeq
   }
  • convert the results of each column to CSV
  • post as document to file server

When Spark is running and I watch the job performance, the collect step is naturally a huge bottleneck; sometimes tens of minutes. As the job runs over several hours, it always slows it's extraction rate by 10 to 50 times. The entire sequence above is done within distinct threads. The threadpool size is two times the number of processors on the host.

My question is: I'm looking for suggestions on how to improve the design of this job, or ideas as to how to improve the efficiency of the column-extraction.

I wonder if my problem is not the best fit for Spark. Wouldn't it be better to have the contents of each dataframe on a single host to extract the columns. Parallelising the rows only to gather the columns seems to be not adding value.

1

1 Answers

0
votes

Some ideas:

  1. Don't fetch columns separately!
  2. Launch parallel jobs using multi-threading in the driver-code, e.g. using Scala's parallel collections looping over your tables
  3. Why collecting the data? Use spark-built in support to write CSV in a distributed way to HDFS, then move this to the file server once its done