0
votes

I'm writing a dataframe to an external hive table from pyspark running on EMR. The work involves dropping/truncating data from an external hive table, writing the contents of a dataframe into aforementioned table, then writing the data from hive to DynamoDB. I am looking to write to an internal table on the EMR cluster but for now I would like the hive data to be available to subsequent clusters. I could write to the Glue catalog directly and force it to registered but that is a step further than I need to go.

All components work fine individually on a given EMR cluster: I can create an external hive table on EMR, either using a script or ssh and hive shell. This table can be queried by Athena and can be read from by pyspark. I can create a dataframe and INSERT OVERWRITE the data into the aforementioned table in pyspark. I can then use hive shell to copy the data from the hive table into a DynamoDB table.

I'd like to wrap all of the work into the one pyspark script instead of having to submit multiple distinct steps. I am able to drop tables using sqlContext.sql("drop table if exists default.my_table")

When I try to create a table using sqlContext.sql("create table default.mytable(id string,val string) STORED AS ORC") I get the following error:

org.apache.hadoop.net.ConnectTimeoutException: Call From ip-xx-xxx-xx-xxx/xx.xxx.xx.xx to ip-xxx-xx-xx-xx:8020 failed on socket timeout exception: org.apache.hadoop.net.ConnectTimeoutException: 20000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=ip-xxx-xx-xx-xx:8020]; For more details see: http://wiki.apache.org/hadoop/SocketTimeout

I can't figure out why I can create an external hive table in Glue using hive shell on the cluster, drop the table using hive shell or pyspark sqlcontext, but I can't create a table using sqlcontext. I have checked around and the solutions offered don't make sense in this context (copying hive-site.xml) as I can clearly write to the required addresses with no hassle, just not in pyspark. And it is doubly strange that I can drop the tables with them being definitely dropped when I check in Athena.

Running on: emr-5.28.0, Hadoop distribution Amazon 2.8.5 Spark 2.4.4 Hive 2.3.6 Livy 0.6.0 (for notebooks but my experimentation is via ssh and pyspark shell)

2

2 Answers

0
votes

Turns out I could create tables via a spark.sql() call as long as I provided a location for the tables. Seems like Hive shell doesn't require it, yet spark.sql() does. Not expected but not entirely unsurprising.

0
votes

Complementing @Zeathor's answer. After configuring the EMR and Glue connection and permission (you can check more in here: https://www.youtube.com/watch?v=w20tapeW1ME), you will just need to write sparkSQL commands:

spark = SparkSession.builder.appName('TestSession').getOrCreate()
spark.sql("create database if not exists test")

You can then create your tables from dataframes:

df.createOrReplaceTempView("first_table");
spark.sql("create table test.table_name as select * from first_table");

All the databases and tables metadata will then be stored in AWS Glue Catalogue.