1
votes

Objective

We use Databricks cluster for our ETL process and Databricks Notebooks for DS, ML and QA activities.

Currently, we don't use Databricks Catalog or external Hive Metastore. We define schemas programatically in Spark StructType format, and hardcode paths as following:

tables/some_table.py

class SomeTable(TableBase):

   PATH = os.getenv('SOME_TABLE_PATH', /some_folder/some_subfolder/) # actually it's passed as constructor arg

   SCHEMA = {
       "type": "struct",
       "fields": [
          {
              "name": "some_field",
              "type": "string",
              "nullable": true
          },
          ...
       ]

   def schema() -> StructType:
       return StructType.fromJson(self.SCHEMA)

   def save(df: DataFrame):
       df.write.parquet(self.PATH)

   def read(year: str, month: str, day: str) -> DataFrame:
       return self.spark \
           .read \
           .parquet(self.PATH) \
           .filter((F.col('YEAR') == year) & ...)

The issue

From time to time we do some refactorings, changing table's path, schema or partitioning. This is a problem, since Databricks is a shared platform between developers, QA and data scientists. On each change we have to update all notebooks and documentation at multiple places.

Also I would like to use bucketing (clustering), table statistics, Delta Lake, SQL-syntax data exploration, views and some security features in future. Those features also requires tables definitions accessible for Databricks.

The question

How do you usually deploy Databricks schemas and their updates? Shall I use SQL scripts that are executed by infrastructure-as-a-code tool automatically on cluster start? Or is there a simpler/better solution?

Schemas for data frames that are written with Databricks/Spark can be created with df.write.saveAsTable('some_table'). But this is not the best solution, because:

  1. I want to have schema definition before the first write. For example, I'm transforming the dataset of 500 columns to 100 columns, and want to select only required columns based on schema definition.

  2. There are read-only data sets that are ingested (written) with other tools (like ADF or Nifi)

Update

I liked experience with AWS Glue (used as Hive Metastore by EMR) and deployed through Cloud Formation. I suppose Databricks has similar or even simpler experience, just wondering what is the best practice.

Update #2

Extra points for answer to question - how to not duplicate shcema definition between Databricks Catalog (or external Hive Meta Store) and our codebase?

If we'll describe our schemas with SQL syntax, we won't be able to reuse them in unit tests. Is there any clean solution for deploying schemas based on the format described above (see code snippet)?

PS: currently we use Azure cloud

1
Is this an obvious thing? Why downvote?VB_

1 Answers

1
votes

For Databricks on AWS, AWS Glue Catalog is a strong method for centralizing your meta store across all of your compute and query engines can use the same data definition. Glue Catalog promotes a cloud wide data strategy avoiding data silos created by using product specific data catalogs and access controls. See this Databricks blog post for more information: https://docs.databricks.com/data/metastores/aws-glue-metastore.html

Performance wise, you'll see a lift by having the schema defined and you'll have the ability to collect table & column statistics in the meta store. Delta Lake will collect file level statistics within the Delta Transaction log, enabling data skipping. Consistent use of the Glue Catalog will prevent schema duplication.

Spark can figure out the schema when it reads Parquet or Delta Lake tables. For Parquet and JSON tables, you can speed up schema inference by providing Spark with just one file to infer the schema from, then read the entire folder in the next pass. A meta store avoids this hassle and speeds your queries.