2
votes

I have the following issue:

  • I have an AWS S3 pipeline that on a daily basis a single json.gz files is spit.
  • I wish to take that file with dbt and put it into snowflake (no snowpipe use atm)

I have managed to do this by creating a storage integration and I have manually created with my role (used for running dbt) a schema and assing usage on that schema. So far so good.

Then I read about this:

https://github.com/fishtown-analytics/dbt-external-tables

Problem is that this is the only way this runs properly, I had to alter my dbt profiles.yml, set the default schema to be S3_MIXPANEL with default database RAW_DEV, run a different target and role on that with --target 'ingest_dev' parameter.

I keep thinking that there should be a more sophisticated solution, where I can create schema's and query metadata and use something like {{ source() }} so I can point my documentation somehow that this is an external source. This dbt-external-tables is not really well explained for my case here I think?

Please can anyone help me and share how to create schemas and query from external stages properly without changing default schema macro & dbtprofiles.yml each time?

I have succeeded to run the following code:

{{
  config(
    materialized ='incremental',
    schema = generate_schema_name('S3_MIXPANEL')
  )
}}
 
  SELECT
    metadata$filename as file_name,
    to_date(SUBSTR(metadata$filename,16,10),'yyyy/mm/dd') as event_date,
    $1 as payload,
    CONVERT_TIMEZONE('Europe/London',TO_TIMESTAMP_tz($1:properties:mp_processing_time_ms::int / 1000)) as  event_timestamp_converted,
    CONVERT_TIMEZONE('Europe/London', current_timestamp) as ingested_at

 from

    @my_s3_stage

    
{% if is_incremental() %}
    -- this filter will only be applied on an incremental run
    WHERE event_date>(
    SELECT
        max(event_date)
    FROM
        {{ this }}
    )
{% endif %}

{{ row_limit() }} 

EDIT 22-06-20:

I have added the src_mixpanel.yml file in my models and ran the dbt command, however I had to also specify the data_types, so I added them too, then I apparently had to add the "macro" in my macros too (btw maybe a stupid question but I don't really know how to install your package, so I manually added all macros from yours into mine).

Now when I run this code:

dbt run-operation stage_external_sources

with

version: 2

sources:

  - name: s3_mixpanel
    database: RAW_DEV
    tables:
      - name: events
        external:
          location: '@my_s3_stage'
          auto_refresh: false # depends on your S3 setup
          partitions:
            - name: event_date
              expression: to_date(SUBSTR(metadata$filename,16,10),'yyyy/mm/dd')
              data_type: date
            - name: file_name
              expression: metadata$filename
              data_type: string
          columns:
            - name: properties
              data_type: variant

I get an error:

Encountered an error while running operation: Compilation Error in macro stage_external_sources (macros/stage_external_sources.sql)
'dict object' has no attribute 'sources'

1

1 Answers

6
votes

As the maintainer of the dbt-external-tables package, I'll share its opinionated view. The package believes that you should stage all external sources (S3 files) as external tables or with snowpipes first, in a process that includes as little confounding logic as possible. Then you can select from them, as sources, in dbt models, alongside all requisite business logic.

If my understanding is correct, you would stage your mixpanel data as below, in a file called (e.g.) models/staging/mixpanel/src_mixpanel.yml:

version: 2

sources:

  - name: s3_mixpanel
    database: raw_dev
    tables:
      - name: events
        external:
          location: '@my_s3_stage'
          file_format: "( type = json )"  # or a named file format
          auto_refresh: false # depends on your S3 setup
          partitions:
            - name: event_date
              expression: to_date(SUBSTR(metadata$filename,16,10),'yyyy/mm/dd')
          columns:
            - name: properties
              data_type: variant

You would run this macro from the package to create the external table—and, after creation, to update its partition metadata if you don't have auto_refresh enabled (see Snowflake docs):

dbt run-operation stage_external_sources

You can then select from this source in an incremental model, like the one you have above. Now, event_date is a partition column on this external table, so filtering on it should enable Snowflake to prune files (though that's been inconsistent historically for dynamic, subquery-derived filters).

{{
  config(
    materialized ='incremental'
  )
}}
 
  SELECT
    metadata$filename as file_name,
    event_date,
    value as payload,
    properties:mp_processing_time_ms::int / 1000 as event_timestamp_converted,
    CONVERT_TIMEZONE('Europe/London', current_timestamp) as modeled_at

 from {{ source('s3_mixpanel', 'events' }} 

    
{% if is_incremental() %}
    -- this filter will only be applied on an incremental run
    WHERE event_date >(
    SELECT
        max(event_date)
    FROM
        {{ this }}
    )
{% endif %}

{{ row_limit() }}