A testing framework for Snowflake Dynamic Tables
Following our challenges with trying to adopt Snowflake Dynamic Tables, we built a bespoke testing framework to support our scalability ambitions.
DATA QUALITYSOFTWARE ARCHITECTUREDATA ENGINEERINGSNOWFLAKE
Lucian
11/1/20247 min read
Context
In the previous article, we walked together through the preparation to replace the DBT & Prefect data pipelines for an Analytics Engineering team with Snowflake Dynamic Tables, in order to improve team throughput.
We explained how Snowflake Data Metric Functions are a long way from being a complete and reliable solution for us:
they are not parametrizable/dynamic
they don’t accept addition of custom DDL
they cannot easily be made to run incrementally (on new data only)
there is no native assertion mechanisms, requiring you to improvise
This post is about building a lightweight data testing framework that does all of the above. I feel that we went a long way with an initial prototype already, and it is so universal and easily portable to other data teams!
Solution design
We want the testing framework to address the following considerations:
as we continue to use DBT to manage our data transformations and infrastructure as code, we want to continue to define data tests centrally — in DBT metadata, and for whatever framework we use to be able to extract tests from there
the framework should provide the possibility to implement hooks for custom logic, i.e. regarding notifications to Slack or logging
test failures need to block the propagation of data errors to downstream Dynamic Tables, similar to the way that DBT tests work
for efficient runs, only incremental data should be tested where that is possible
For none of these issues, the native Data Metric Functions didn’t provide a satisfactory solution (or even no solution at all).
Going back to the drawing board, we brainstormed through a couple of thoughts and realizations:
Dynamic Tables support good-ol’ Snowflake Streams (not only that they support streams, but they are built upon streams, as a higher-level abstraction)
We could use streams to scope data quality testing 🚨. This would organically allow us to test only incremental data for most of our tests. Practically all tests except primary key/unique tests could be based only on incremental data. The ability to test only incremental data is not new, but it’s definitely the cherry-on-top of our solution if we can achieve it because not even native DBT offered a mechanism for that 💯.
We could build a custom testing logic (ie stored procedure) for each model, yet that building can happen in DBT/Jinja, thus it would be able to natively access the centralized test definitions in DBT metadata.
If we build custom testing logic, then we can make it handle whatever extensions/hooks we need and we can make it however complex tests we have.
Given that Dynamic Tables offer an API to suspend refreshes (which in turn leads to suspending the refresh of all downstream models), we already found the mechanism to stop propagation of data test failures.
Side thoughts / alternative designs
Can we really not make Snowflake Data Metric Functions work
We did our best (including resorting to dirty tricks), but no, unfortunately not. It seems DMFs are not yet mature enough.
Is it really a good idea to have custom testing infrastructure (stored procedure) per table?
The solution envisioned above involves building a dedicated testing logic (stored procedure) for each table, at build time. Of course, this is automatic - not manual. And of course, this logic is templated - there is a template for a PK test, a template for a FK test, a template for a soft quality test etc, and these templates are put together (using the composition principle in software architecture) when assembling the dedicated testing logic for a table, depending on what tests are defined in DBT metadata for that table.


The alternative we could have considered was to separate out testing into two components:
a component, running at DBT build time, that would just push the test definitions from DBT metadata to Snowflake; this could have been achieved using the dbt_artifacts package from brooklyn-data, for example
another component, running exclusively in Snowflake and triggered by the same Dynamic Table refreshes (via streams) that would evaluate on-the-fly what tests need to be run (based on the the test definitions pushed before) and execute them
Although this decoupled approach (described here) would have been more scalable, we still decided to go with our initial design (dedicated test logic created at model build time) because it was simpler and it involved less moving pieces.
If we are ever to "platformize" our framework in order for other teams to leverage it, it would still be reasonably easy to do a lift-and-shift of the logic that we have in order to switch gears towards the decoupled approach.
Does this cover all possible use cases?
One notable increment that we are aware about is improving the behavior of Foreign Key tests with deleted data. Currently, we just consider the added records in the stream and check that they match the foreign relationship.
However, deletion of records from a table is also meaningful - albeit not for the table from which the records are deleted, yet from other tables which reference it! Thus when refreshing table A, if you delete rows then you would ideally check tables B and C which depend on it, making sure there are no orphaned references left.
This is the same mechanism that traditional databases like PostgreSQL use when enforcing referential integrity, yet in a cloud-data-warehouse world this is not the status-quo anymore.
What about Dynamic Tables which cannot be made incremental?
This happens in certain situations when the transformation code is complex and Snowflake is not able to properly parse and build incremental pipelines out of it - thus it resorts to simply refreshing the table fully.
This is not ideal, as it leads to increased costs for refreshing the model, but it also adds complexity to our testing framework (-> streams cannot be built on top of full-refresh Dynamic Tables) and reduces its' key selling points (-> incremental testing cannot be done anymore, as there is no "increment" when data is fully refreshed every time).
The north star of our migration plan was to alter the design of the transformation pipelines slightly, so Dynamic Tables can be built with incremental refresh.
Prototype
This is the functional process that we expect to build -


And below is the scaffolding of the DBT macro that handles steps 2, 3 and 4 above (covering all the artifacts of the testing framework). This exemplifies the not-null test and assertions, but I am happy to expose the rest if you are interested!
Conclusions
Software architecture decisions in general, and data engineering decisions in particular, are all subject to tradeoffs. We created a testing framework that is efficient and tailored to our needs, that is deeply integrated with DBT, and indeed we started seeing the benefits as soon as we started more extensive tests in the QA environment. 💯
But as there were aspects to fine-tune with the approach (and sneaky bugs to fix), we also saw the drawdown of a static testing procedure (created at the object build time) - for any change in the testing logic or in the generic infrastructure for test result collection, redeploying the testing procedure according to current logic required rerunning the macro. Which, in order to avoid manual hacks, required to rerun the model - thus recreating the entire dynamic table (not the most efficient thing one could do).
We envision the same need arising even for business-as-usual scenarios, like adding a new test to a model. For this one, the pull-mode alternative architecture would have been a better solution.
Yet data engineering is a journey! We are now working on addressing the drawdowns and are ready to reap the benefits of it!
Article also available on Medium.com.
------------
{% macro create_tests(model) -%}
{% if execute and 'config' in model and 'materialized' in model.config and model.config.materialized == 'dynamic_table'%}
{# log( 'Model: ' ~ tojson(model) ~ '\n\n', True) #}
{% set query %}
CREATE OR REPLACE STREAM {{model.database}}.{{model.schema}}.{{model.alias}}__stream
ON DYNAMIC TABLE {{model.database}}.{{model.schema}}.{{model.alias}}
{# --insert_only = true; --wouldn't work for self-ref/FK tests (inbound relationship) #}
{% endset %}
{# log(query, True) #}
{% set results = run_query(query) %}
{% set queries_intask = [] %}
{% set query %}
ALTER DYNAMIC TABLE {{model.relation_name}} SUSPEND
{% endset %}
{% do queries_intask.append(query) %}
{% set query %}
CREATE OR REPLACE TEMPORARY TABLE tmp_stream AS (
SELECT * FROM {{model.relation_name}}__stream
)
{% endset %}
{% do queries_intask.append(query) %}
{% set query %}
CREATE OR REPLACE TEMPORARY TABLE tmp_violations (
ERR_TYPE STRING,
ERR_COUNT INTEGER,
DETAILS STRING
)
{% endset %}
{% do queries_intask.append(query) %}
{# --PARSE TESTS: Column NOT NULL #}
{# log('(Testing) Parsing NOTNULL tests...', True) #}
{% set all_tests_notnull = graph.nodes.values()
| selectattr("resource_type", "equalto", "test")
| selectattr("test_metadata.name", "equalto", "not_null") %}
{% for tst in all_tests_notnull %}
{% if 'refs' in tst and 'source_not_null' not in tst.name and tst.refs[0].name == model.name %}
{# log(tst, True) #}
{# log('(Testing) Found related test! [' ~ tst.name ~ ']', True) #}
{% set local_col = tst.column_name %}
{% if 'config' in tst and 'where' in tst.config and tst.config.get('where') is not none %}
{% set test_where_clause = "and " ~ tst.config.where %}
{% else %}
{% set test_where_clause = "" %}
{% endif %}
{{ log('(Testing) Preparing NOT_NULL constraint on [' ~ model.name ~ '].[' ~ local_col ~ ']...', True) }}
{% set query %}
INSERT INTO tmp_violations
SELECT
'NOTNULL' as err_type,
count(*) as err_count,
'{{local_col}}' as details
FROM ${WITH_FULL_TEST_COVERAGE ? '{{model.relation_name}}' : 'tmp_stream'}
WHERE {{local_col}} IS NULL
${WITH_FULL_TEST_COVERAGE ? "" : "AND METADATA$ACTION = 'INSERT'"}
{{ test_where_clause }}
GROUP BY ALL
{% endset %}
{% do queries_intask.append(query) %}
{% endif %}
{% endfor %}
...
{% set query_intask_insert_results %}
INSERT INTO DATA_QUALITY_{{target.name}}.TEST.EVAL_FAILURE
SELECT
convert_timezone('UTC', current_timestamp),
'{{model.database}}' as database_name,
'{{model.schema}}' as schema_name,
'{{model.name}}' as table_name,
err_type,
err_count,
details
FROM tmp_violations;
{% endset %}
{% do queries_intask.append(query_intask_insert_results) %}
{% call statement('cp', fetch_result=False) %}
CREATE OR REPLACE PROCEDURE {{model.relation_name}}__dq_proc(WITH_FULL_TEST_COVERAGE BOOLEAN)
RETURNS STRING
LANGUAGE JAVASCRIPT
AS $$
{% for q in queries_intask %}
snowflake.execute({sqlText: `{{q}}`});
{% endfor %}
var result = snowflake.execute({sqlText: `
SELECT COUNT(*) AS row_count FROM tmp_violations
`});
result.next();
if (result.getColumnValue(1) > 0) {
snowflake.execute({sqlText: `
DECLARE
mess string;
BEGIN
LET crs CURSOR FOR SELECT '[Data quality check failure] {{model.relation_name}}' || CHAR(10) || CHAR(10) || 'Test type: [' || err_type || '] Number of violations: [' || err_count || '] Details: [' || details || ']' AS MSG FROM tmp_violations;
FOR r IN crs DO
mess := r.msg;
call system$send_snowflake_notification(
snowflake.notification.text_plain(:mess),
snowflake.notification.integration('SNOWFLAKE_SNS_INTEGRATION')
);
END FOR;
END;
`});
return 'test failures found and raised!';
} else {
snowflake.execute({sqlText: `
ALTER DYNAMIC TABLE {{model.relation_name}} RESUME; --only resume DT refresh if no test failures occurred
`});
return 'no test failures!';
}
$$
{% endcall %}
{% call statement('ct', fetch_result=False) %}
CREATE OR REPLACE TASK {{model.relation_name}}__dq_tsk
WAREHOUSE = WH_XS
ERROR_INTEGRATION = SNOWFLAKE_SNS_INTEGRATION
WHEN SYSTEM$STREAM_HAS_DATA('{{model.relation_name}}__stream')
AS CALL {{model.relation_name}}__dq_proc(false)
{% endcall %}
{% call statement('query_task_resume', fetch_result=False) %}
ALTER TASK {{model.relation_name}}__dq_tsk RESUME
{% endcall %}
{% endif %}
{% endmacro %}
Contact us
Whether you have a request, a query, or want to work with us, use the form below to get in touch with our team.


Location
US
Europe
Middle East
