0
votes

I am using pyspark to load csv to redshift. I want to query how manny rows got added. I create a new column using the withcolumn function:

csvdata=df.withColumn("file_uploaded", lit("test"))

I see that this column gets created and I can query using psql. But, when I try to query using pyspark sql context, I get a error:

py4j.protocol.Py4JJavaError: An error occurred while calling o77.showString.
: java.sql.SQLException: [Amazon](500310) Invalid operation: column "test" does not exist in billingreports;

Interestingly, I am able to query other columnns but not just the new column I added.

Appreciate any pointers on how to resolve this issue.

Complete code:

df=spark.read.option("header","true").csv('/mnt/spark/redshift/umcompress/' + 
filename)
csvdata=df.withColumn("fileuploaded", lit("test"))

countorig=csvdata.count()

## This executes without error
csvdata.write \
    .format("com.databricks.spark.redshift") \
    .option("url", jdbc_url) \
    .option("dbtable", dbname) \
    .option("tempformat", "CSV") \
    .option("tempdir", "s3://" + s3_bucket + "/temp") \
    .mode("append") \
    .option("aws_iam_role", iam_role).save()

select="select count(*) from " + dbname + " where fileuploaded='test'"

## Error occurs
df = spark.read \
.format("com.databricks.spark.redshift") \
.option("url", jdbc_url) \
.option("query", select) \
.option("tempdir", "s3://" + s3_bucket + "/test") \
.option("aws_iam_role", iam_role) \
.load()
newcounnt=df.count()

Thanks for responding. Dataframe does have the new column called file_uploaded Here is the query: select="select count(*) from billingreports where file_uploaded='test'"

I have printed the schema

|-- file_uploaded: string (nullable = true)

df.show() shows that the new column is added. I just want to add a pre determined string to this column as value.

1
There must be some other transformations not shown here. The exception is on a dataframe called billingreports while you add the column to csvdata.Shaido
I have a dataframe called csvdata which I use to write to redshift. U have the count of that dataframe. I want to validate with whats written to redshift. The dataframe I am trying to construct out of the sql query select="select count(*) from billingreports where file_uploaded='test'". This is where the error is showing up.Shekar Tippur
Please add all the code to the question. How is billingreports created? What is the command that gives the error?Shaido

1 Answers

-3
votes

Your Dataframe csvdata will have a new column named file_uploaded, with default value "test" in all rows of df. This error shows that it is trying to access a column named test, which does not exist in the dataframe billingreports thus the error. Print the schema before querying the column with billingreports.dtypes or better try to take a sample of your dataframe with billingreports.show() and see if the column has correct name and values.

It will be better if you share the query which resulted in this exception, as the exception is thrown for dataframe billingreports.