0
votes

I find that my AWS Glue Job is appending duplicate data into my data catalog. I have a job that reads JSON, dedups it with Spark SQL, then attempts to save it into data catalog. But I must be doing it wrong, because the data catalog gets more duplicated each time the task runs

inputGDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": ["s3://..."], "recurse": True}, format = "csv", format_options = {"withHeader": True}, transformation_ctx="inputGDF")

inputDF = inputGDF.toDF()

print(inputDF.first())

inputDF.createOrReplaceTempView("p_places")

# Processing the data to dedup it based on ID
filteredDF = spark.sql("""
  SELECT id, parentid, code, type, name, createdat, updatedat 
  FROM (
    SELECT 
      ROW_NUMBER() OVER (PARTITION BY ID ORDER BY updatedat DESC) ROW_NUM,
      id, parentid, code, type, name, createdat, updatedat
    FROM p_places
  )
  WHERE ROW_NUM = 1
""")

filteredGDF = DynamicFrame.fromDF(filteredDF, glueContext, "filteredGDF")

filteredDF.createOrReplaceTempView('p_places_2')
verification = spark.sql("""
    SELECT COUNT(id) FROM p_places_2 WHERE id = '12542'
""")
print("VERIFICATION:")
print(verification.first()) # Correctly output 1 (no dups)

outputGDF = glueContext.write_dynamic_frame.from_options(frame = filteredGDF, connection_type = "s3", connection_options = {"path": "s3://..."}, format = "parquet", transformation_ctx = "outputGDF")

job.commit()

But when I use Athena to query the data, there is 1 additional duplicate row on each run. Why is that? I suspect the write to parquet file will always append? How can I resolve this?

1

1 Answers

0
votes

Your code removes duplicates from the input data only. However, if you don't want to have it in the destination location you need to load those existing data and then write new records only:

existingGDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": ["s3://..."], "recurse": True}, format = "parquet", transformation_ctx="existingGDF")

newOnlyDF = filteredDF.alias("new")
  .join(existingDf.alias("existing"), col("ID"), "left_outer")
  .where(col("existing.ID").isNull())
  .select("new.*")

outputGDF = glueContext.write_dynamic_frame.from_options(frame = newOnlyDF, connection_type = "s3", connection_options = {"path": "s3://..."}, format = "parquet", transformation_ctx = "outputGDF")