2
votes

We have our data in SQL Server at the moment, we are trying to move them to our s3 bucket as parquet files. The intention is to analyse this s3 data in AWS EMR (Spark, Hive & Presto mainly). We don't want to store our data in HDFS.

  1. What are the choices here? so far from our knowledge, it seems we can use either spark or sqoop for this import. Though sqoop is faster than Spark in this case due to parallelism (parallel db connections), it seems writing parquet file from sqoop to s3 is not possible - Sqoop + S3 + Parquet results in Wrong FS error . Workaround is to move to hdfs and then to s3. However this seems to be non-efficient. How about using SparkSQL to pull this data from SQL Server and write as parquet in s3 ?

  2. Once we load this data as parquet in this format

    s3://mybucket/table_a/day_1/(parquet files 1 ... n).
    s3://mybucket/table_a/day_2/(parquet files 1 ... n).
    s3://mybucket/table_a/day_3/(parquet files 1 ... n).
    

How can I combine them together as a single table and query using Hive. I understand that we can create hive external table pointing to s3, but can we point to multiple files?

Thanks.

EDIT: Adding this as requested.

org.apache.hive.service.cli.HiveSQLException: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask at org.apache.hive.service.cli.operation.Operation.toSQLException(Operation.java:380) at org.apache.hive.service.cli.operation.SQLOperation.runQuery(SQLOperation.java:257) at org.apache.hive.service.cli.operation.SQLOperation.access$800(SQLOperation.java:91) at org.apache.hive.service.cli.operation.SQLOperation$BackgroundWork$1.run(SQLOperation.java:348) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) at org.apache.hive.service.cli.operation.SQLOperation$BackgroundWork.run(SQLOperation.java:362) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

4

4 Answers

0
votes

You can create the hive external table as follows

create external table table_a (
 siteid                    string,
 nodeid                    string,
 aggregation_type          string
 )
 PARTITIONED BY (day string)
 STORED AS PARQUET
 LOCATION 's3://mybucket/table_a';

Then you can run the following command to register the partition files stored under each days directory into HiveMatastore

 MSCK REPAIR TABLE table_a;

Now you can access your files through hive queries. We have used this approach in our project and working well. After the above command, you can run the query

 select * from table_a where day='day_1';

Hope this helps.

-Ravi

3
votes

Though I am little late, however for future reference. In our project, we are exactly doing this and I would prefer Sqoop over Spark.

Reason: I used Glue to read data from Mysql to S3 and the reads are not parallel (Has AWS Support looks at it and that's how Glue(which uses Pyspark) work but writing to S3 once the read is complete its parallel). This is not efficient and its slow. 100GB of data to be read and written to S3 takes 1.5Hr.

So i used Sqoop on EMR with Glue Catalog turned on(so hive metastore is on AWS) and i am able to write to S3 directly from Sqoop which is way faster 100GB of data read takes 20mins.

You will have to set the set hive.metastore.warehouse.dir=s3:// and you should see you data being written to S3 if you do an hive-import or just direct write.

2
votes

The Spark read jdbc pull the data with mutliple connections. Here is the link

def
jdbc(url: String, table: String, columnName: String, lowerBound: Long, upperBound: Long, numPartitions: Int, connectionProperties: Properties): 

Construct a DataFrame representing the database table accessible via JDBC URL url named table. Partitions of the table will be retrieved in parallel based on the parameters passed to this function.

Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash your external database systems.

url
JDBC database url of the form jdbc:subprotocol:subname.

table
Name of the table in the external database.

columnName
the name of a column of integral type that will be used for partitioning.

lowerBound
the minimum value of columnName used to decide partition stride.

upperBound
the maximum value of columnName used to decide partition stride.

numPartitions
the number of partitions. This, along with lowerBound (inclusive), upperBound (exclusive), form partition strides for generated WHERE clause expressions used to split the column columnName evenly. When the input is less than 1, the number is set to 1.

connectionProperties
JDBC database connection arguments, a list of arbitrary string tag/value. Normally at least a "user" and "password" property should be included. "fetchsize" can be used to control the number of rows per fetch.DataFrame

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader

http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases

Create hive table with partition columns as date and save and specify the following location

create table table_name (
  id                int,
  dtDontQuery       string,
  name              string
)
partitioned by (date string) Location s3://s3://mybucket/table_name/

Add a column in your data called as date and populate it with sysdate. You no need to add the column if it is not required, we can just populate the location. But it can be an audit column for your analytics also. Use spark dataframe.partitionBy(date).write.parquet.location(s3://mybucket/table_name/)

Daily Perform the MSCK repair on the hive table So the New Partition is added to the table.

Apply the numPartitions on non numerical columns is by creating the hash function of that column into number of connections you want and use that column

1
votes

Spark is a pretty good utility tool. You can easily connect to a JDBC data source, and you can write to S3 by specifying credentials and an S3 path (e.g. Pyspark Save dataframe to S3).

If you're using AWS, your best bet for Spark, Presto and Hive is to use the AWS Glue Metastore. This is a data catalog that registers your s3 objects as tables within databases, and provides an API for locating those objects.

The answer to your Q2 is yes, you can have a table that refers to multiple files. You'd normally want to do this if you have partitioned data.