0
votes

I am trying to load data which is on S3 in parquet format to aws redshift directly using pyspark. I am able to do this, but when I see the encoding for columns in the table definition it is consistent. I would like to make it consistent in particular I would like all of them to be lzo. Below is the list of datatypes from single table with inconsistency.

+-------------------------------+-------------------+
|    data_type                  |  encoding         |
+-------------------------------+-------------------+
| bigint                        | delta             |
| bigint                        | delta32k          | 
| character varying(256)        | lzo               |
| bigint                        | runlength         |
| bigint                        | bytedict          |
| timestamp without time zone   | bytedict          |
| integer                       | runlength         |
+-------------------------------+-------------------+

Can someone help me how to perform this in pyspark. I don't see any option for column encoding in com.databricks:spark-redshift_2.10:1.0.0

 x.write.format("com.databricks.spark.redshift")
.option("url","jdbc:redshift://<url>:<port>/<schema>?user=<user>&password=<pass>")
.option("dbtable","<tbl_nm>")
.option("diststyle","KEY").option("distkey","<key>")
.option("sortkeyspec","SORTKEY(<sort1>)")
.option("tempdir","<path>")
.mode("error").save()
1

1 Answers

2
votes

I found the relevant bit for specifying column encoding in PR 178.

So you don't specify encoding via something like .read.option('encoding', 'lzo'). You need to create a schema object with metadata that specifies the encoding at dataframe creation. In Python, for example:

%pyspark 

from pyspark.sql.types import IntegerType, StringType, StructType, StructField

metadata = {'encoding':'LZO'}

schema = StructType([
    StructField("id", IntegerType(), True, metadata),
    StructField("name", StringType(), True, metadata)])

df = spark.createDataFrame([(1, 'Alice')], schema)

df.write \
  .format("com.databricks.spark.redshift") \
  .option("url", "jdbc:redshift://example.com:5439/db_foo?user=user_bar&password=pass_baz") \
  .option("dbtable", "foo") \
  .option("tempdir", "s3a://foo/bar") \
  .mode("error") \
  .save()

Verification:

select "column", "encoding" from pg_table_def where tablename = 'foo';
 column | encoding
--------+----------
 id     | lzo
 name   | lzo
(2 rows)