4
votes

I am currently exploring delta lake which is open sourced by databricks. I am reading kafka data and writing as stream using delta lake format. Delta lake creates many files during streaming write from kafka which i feel hearts hdfs file system.

I have tried following to compact multiple files to single file.

val spark =  SparkSession.builder
    .master("local")
    .appName("spark session example")
    .getOrCreate()

  val df = spark.read.parquet("deltalakefile/data/")

  df.repartition(1).write.format("delta").mode("overwrite").save("deltalakefile/data/")
  df.show()

  spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled","false")

  DeltaTable.forPath("deltalakefile/data/").vacuum(1)

But when i checked output it is creating new file and not removing any existing files.

Is there is way to achieve this. Also what is relation of retention period here? How should we configure it in HDFS while using? What should be my configuration for retention when i want to build raw/bronze layer with delta lake format and i want to preserve my all data for long period (years on premises/infinite time on cloud)?

1

1 Answers

5
votes

By design, Delta doesn't immediately remove files to prevent active consumers from being impacted. It also provides the versioning (aka time-travel) so you can see history if necessary. To remove previous versions or uncommitted files you need to run vacuum.

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, pathToTable)

deltaTable.vacuum() // use default retention period

In terms of your question on how to manage retention and compaction for bronze/silver/gold model, you should treat your landing table (aka bronze) as an append-only log. That means you don't need to perform compaction or any rewrites after the fact. The bronze table should be a record of the data you ingested from your upstream data source (e.g. Kafka) with minimal processing applied.

The bronze table is typically used as an incremental stream source to populate downstream datasets. Given that reading from Delta is done from the transaction log, small files is not such an issue compared to using the standard file readers that perform slow file listings.

However, there are still some some options to optimize the files as you write them to the bronze table: 1) compact your Kafka messages as you write out to Delta by first repartitioning to reduce the number of files, 2) increase your trigger interval so the ingest runs less frequently and is writing more messages to larger files.