0
votes

Using: spark 1.5.2, hive 1.2 I have an external hive table in parquet format. I created a .py script that selects from my_table into a dataframe, does some transforms and then attempts to write back into the original table.

I've tried the following methods:

  1. df.write.insertInto('table_name', overwrite='true').

This throws the following error:

pyspark.sql.utils.AnalysisException: Cannot insert overwrite into table that is also being read from.

  1. df.write.mode('overwrite').parquet('my_path')
  2. df.write.parquet('my_path', mode='overwrite')
  3. df.write.save('my_path', format='parquet', mode = 'overwrite')

These all seem to throw this error:

ERROR Client fs/client/fileclient/cc/client.cc:1802 Thread: 620 Open failed for file /my_path/part-r-00084-9, LookupFid error No such file or directory(2) 2016-04-26 16:47:17,0942 ERROR JniCommon fs/client/fileclient/cc/jni_MapRClient.cc:2488 Thread: 620 getBlockInfo failed, Could not open file /my_path/part-r-00084-9 16/04/26 16:47:17 WARN DAGScheduler: Creating new stage failed due to exception - job: 16

**Note that method 1 above works fine if the file format is orc, but throws that error for parquet.

Any suggestions would be greatly appreciated!

2

2 Answers

1
votes

For that you need to use a temporary path to write to.

The problem is that Spark is actively using the actual path to read data and, when you try to write to the same parquet file, it generates an error.

Stella has already given a hint on what should be done, but the order is not right. So I decided to give you a better and more complete answer as I had to solve it myself:

In the code below, I'm trying to save a dataframe with a temporary file:

def write_dataframe(df, table_name):
   # caches dataframe
   df.cache()

   dirout_tmp = PATH + table_name + "_tmp/"
   dirout = PATH + table_name + "/"
   # writing parquet file to a temporary location
   df.write.parquet(dirout_tmp, mode='overwrite')
   # removing original parquet
   shutil.rmtree(dirout, ignore_errors=True)
   # renaming the temp to the original path
   os.rename(dirout_tmp, dirout)
0
votes

From everything I've found thus far, the solution for reading and writing back into a parquet formatted file seems to be to write to a temporary/staging directory, delete the original directory, and then rename the temporary directory to your original. To do this in pyspark you will need the following commands:

import os
import shutil
shutil.rmtree('my_tmp_path')
os.rename('my_tmp_path', 'my_path)