3
votes

We are currently exploring how we can process a big amount of data store in a Google Cloud SQL database (MySQL) using Apache Beam/Google Dataflow.

The database stores about 200GB of data in a single table.

We successfully read rows from the database using JdbcIO, but so far this was only possible if we LIMIT the number of rows queried. Otherwise we will run into memory issue. I assume by default a SELECT query tries to load all resulting rows in-memory.

What is the idiomatic approach for this? Batching the SQL queries? Streaming the results?

We tried adjusting the fetch size of the statement executed, without much success.

This is what our JDBC read setup looks like:

JdbcReadOptions(
  connectionOptions = connOpts,
  query = "SELECT data FROM raw_data",
  statementPreparator = statement => statement.setFetchSize(100),
  rowMapper = result => result.getString(1)
)

I haven't found any resources regarding stream from sql so far.

EDIT

I'm gonna list a view approaches I took, so others can learn something (for example how not to do it). To have a bit more context, the database table in question is really badly structured: It has a column containing a JSON string, and id column (primary key) plus a added and modified column (both TIMESTAMP types). At the time of the first approach it had no further indices. The table contains 25 mio rows. So this is probably more an database issue rather than a Apache Beam/JDBC issue. But nevertheless:

Approach 1 (above) - Query everything

Basically it looked like this:

val readOptions = JdbcReadOptions(
  connectionOptions = connOpts,
  query = "SELECT data FROM raw_data",
  rowMapper = result => result.getString(1)
)

context
  .jdbcSelect(readOptions)
  .map(/*...*/)

This worked if I added a LIMIT to the query. But obviously was very slow.

Approach 2 - Keyset pagination

val queries = List(
  "SELECT data from raw_data LIMIT 5000 OFFSET 0",
  "SELECT data from raw_data LIMIT 5000 OFFSET 5000",
  "SELECT data from raw_data LIMIT 5000 OFFSET 10000"
  // ...
)

context
  .parallelize(queries)
  .map(query => {
      val connection = DriverManager.getConnection(/* */)
      val statement = connection.prepareStatement(query)
      val result = statement.executeQuery()

      makeIterable(result) // <-- creates a Iterator[String]
  })
  .flatten
  .map(/* processing */)

This worked somewhat better, though I quickly learned that a LIMIT _ OFFSET _ combination also starts scanning from the first row. So each subsequent query took longer, converging to way to long times.

Approach 2.5 - Keyset pagination with ordering

Like the above approach, but we created an index on the added column and updated the query to

SELECT data FROM raw_data ORDER BY added LIMIT 5000 OFFSET x

This sped things up, but eventually the query time grew to long.

Approach 3 - No Beam/Dataflow

val connection = DriverManager.getConnection(/* */)
val statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
statement.setFetchSize(Integer.MIN_VALUE)

val rs = statement.executeQuery("SELECT data FROM raw_data")

while(rs.next()) {
  writer writeLine rs.getString(1)
}

This streams the resultset back row-by-row and writes the rows into files. It ran about 2 hours for all 25 mio records. Finally. It would be great if someone could point out how this solution can be achieved with Beam.

BTW: Now that I have the raw data as CSV files processing with Beam is a breeze. It's about 80GB of raw data which can be transformed to another CSV format in about 5 min with autoscaling etc.

2
Oh I see. Pagination using LIMIT/OFFSET will not give you any parallelism because databases typically can't give you the 10th page of results without scanning and discarding the first 9 pages, so basically your queries are scanning the entire database N times. You need to partition it by values of the primary key, e.g. "select data from raw_data where added between X and Y", where the total range of [X, Y) pairs you supply covers all possible values of "added". - jkff
Also it seems you're not using JdbcIO.readAll(). I suppose scio doesn't explicitly support it, but it doesn't prevent you from using it either, right? (you can apply arbitrary Beam transforms to PCollections, not just those that scio has a wrapper for) - jkff
I probably could use readAll() with scio. I'm new to both scio and Beam/Dataflow, that's why I'm still not fully aware of the possibilities. Though looking back, I found the solution which worked in the end (w/o Beam) was the most obvious and elegant one for my use case. Thanks for all the hints & explanations though! - Scarysize

2 Answers

2
votes

It seems that the MySQL JDBC driver requires some special measures to make it not load the entire result set into memory; e.g. I was able to find this code solving the problem in a different project. JdbcIO will need to do the same, or at least be configurable enough to let a user do it. I filed issue https://issues.apache.org/jira/browse/BEAM-3714.

Meanwhile, as a workaround, you can use JdbcIO.readAll() to partition your query into many smaller queries, e.g. you might partition it by a range of IDs. Note that no transactional consistency will be enforced between them - they will be independent queries as far as MySQL is concerned.

1
votes

I think JDBCIO doesn't scale very well due to its inherent limitations (single SELECT). I'm not aware of streaming support coming from MySQL and BEAM.

You can probably dump your DB to something easier for data processing systems to process (e.g., csv). Does it work for you?