Gone But Not Forgotten: Navigating Kafka tombstones with DBT
A story on how we supported an Analytics Engineering department of a top Digital Marketing enterprise adopt a consistent framework for modeling entities deleted by the sources using Kafka tombstones, the challenges we encountered and how this was made to work using DBT.
Lucian
7/17/20246 min read
Organizational context
One of the teams I’ve been working with was heavily investing in data agility and domain ownership by moving internal “data products” towards Kafka.
This approach was going smooth with Analytics Engineering folks who were already modeling the core entities and exposing historical data as SCD Type-4 (hybrid slowly-changing-dimension modeling using history tables). This modeling was highly compatible (“impedance-matching”, for electronics afficionados like me) with the upstream data producers who were using Kafka transport to publish updated versions of entire objects (records) whenever changes occurred. In other words, sources were producing a topic for i.e. ‘marketing campaigns’ and whenever a change to the respective campaign occurred (i.e. a change of the budget) they issued a message reflecting the complete snapshot of a campaign object. This approach is functionally similar to CDC (Change Data Capture) rather than to a pure native-event system architecture (which would have meant issuing a `CampaignBudgetChanged` event), but that’s a topic for a different data architecture discussion.
Usage of Kafka tombstones
Naturally, the question of how to model deleted entities (or rather, how to propagate the information about deleted entities to downstream consumers) came up.
There are several aspects to this on the producer side:
what is a “deletion”?
who would need to access traceability data regarding deleted objects, and in what way?
how should business-driven deletions (i.e. a marketing campaign which is stopped and abandoned) be handled differently than privacy-driven deletions (i.e. the removal of PII as per GDPR)?
is the technical deletion of a “wrong record” (due to whatever reason on the producer side) handled the same way as a regular deletion?
In turn, there are at least two mechanisms to model deletions that I’m immediately aware about, with varying degrees of complexity:
“soft deletes” via payload — when a record is to be deleted, producers send an updated payload including a flag column `is_deleted` (or even better, a timestamp field reflecting the time of deletion, `deleted_at` (whose presence in itself indicates that the respective record was deleted yet which bears more information about the context/timestamp of deletion), while maintaining the remaining fields with their last known values (or setting them all to null), or
“hard deletes” via Kafka tombstones - issuing null-payload messages which contain only the partition key and a timestamp metadata
Although we didn’t plan to implement GDPR in the first design, we realized that the solution of “hard deletes” via Kafka tombstones would more naturally work to address this.
Data modeling on top of tombstones
The organization adopted a hybrid approach — the producers by the preference to use standardized mechanisms, thus they adopted the usage of Kafka tombstones (a decision outside the control of Analytics Engineering). Yet in the data lake, and further downstream in the data warehouse, we still wanted to retain historical data.
Thus on Analytics Engineering we planned to build an adapter that interprets Kafka tombstones as “soft deletions”, in the SCD Type-4 target structures in the data warehouse.
And not only that, but we needed a complete history on a bitemporal timeline. For more details of what a bitemporal timeline is and what it serves, read this comprehensive post from Christian Kaul — a data modeler and writer that I highly recommend you to follow.
In our case, the bitemporal timeline was required because of the need to handle late-arriving data or replays. Data was thus historized along two timelines, namely:
one timeline that we controlled (the timestamp in the Kafka metadata, which is universal across messages and tombstones and provides a global serializability of these), and
one timeline that we didn’t control (a business change timestamp in the payload, which was only available for messages containing created or updated records, but not for tombstones)
Modeling standardization in DBT
Analytics Engineering also had a north-star to standardize upon the upstream processes (in this case, the usage of Kafka tombstones) in order to ensure consistent and predictable handling across the team and to abstract that complexity away in the data tool-of-choice — DBT.
We relied on Jinja templating in a DBT macro to handle repetitive and predictable transformations, that is:
combining regular messages (for entity creation/update) with tombstone messages (for deletion), thus creating a unified timeline;
ensuring a correct and robust global serializability of messages and tombstones in the timeline;
fixing business-logic glitches in tombstone usage by the producers ; i.e. in our case, producers were sometimes sending tombstones for entities which were not yet created, or sending several deletions for an entity without any recreation between them; we wanted to flag such occurrences internally for awareness, yet discard them from our curated output data;
exposing a standardized output data structure (i.e. we wanted deletion records to represent a simple copy of the last known state of the record, yet with a `deleted_at` field set), including metadata fields.
Challenges
Increased technical complexity
In the end, Analytics Engineering had to add to the amount of work already expended by other middleware teams in order to support a fancy mechanism to do “soft deletes”. Was it worth it?
Time will tell.
Explainability of outcomes
Thanks to a SCD Type-4 data modeling in the data warehouse, all entities for which tombstones were implemented had a historical table available (and were fully exposed to consumers), thus a complete and traceable view of each entity was available. Questions like “When was my entity deleted?” or “Why do I miss this entity X from my current table?” were thus easy to answer and totally “self-service” for consumers.
Clock synchronization disaster
This came in unexpected and required an imperfect workaround in order to address it, but this is the best we could brainstorm. Open to your feedback in case you can suggest a better approach :)
As Analytics Engineering was planning to support replay of historical data, it was imperative for the payload to contain a business timestamp (playing the role of an effective_at date/timestamp for each data record), in addition to the technical metadata timestamp (reflecting when the message was produced to Kafka).
Inherently, Kafka tombstones would NOT contain the business timestamp (as the payload of a tombstone record is, by definition, null).
Thus the conundrum —
we cannot use the technical timestamp to create a unified timeline (otherwise replays wouldn’t be possible), yet
we cannot use the business timestamp either, because it’s not available for tombstone records (…unless we infer it somehow)
Our first thought about inferring the business timestamp for tombstones was to set it equal to the technical timestamp. All nice & shiny until our first DBT tests started failing ⚠️, raising our awareness about an interesting situation — some entities appeared to exist when they actually were expected to be deleted, and that was because we observed an inconsistent bias between the business timestamp (producer system clock) and the technical timestamp (Kafka clock), with a pretty significant portion of the records appearing to have been created (=business timestamp) after they were received in Kafka (=technical timestamp).
This was definitely surprising, but we traced this to be due to clock misalignment in distributed systems. This is a typical occurrence in real-time distributed systems, so we had to work around it.
Our workaround was to infer the business timestamp of the tombstones starting from the technical timestamp (as in the baseline approach) but adding an offset representing the last known offset between business and technical timestamp in the topic for the respective entity, by partition key. Thus if the observed technical timestamp on a tombstone is Jul 8, 2024 00:00:00, and the latest observed business timestamp and technical timestamp for the same entity are Jul 2, 2024 12:30:00 and Jul 2, 2024 12:29:48 respectively, we thus calculate the last known offset to to be 12 seconds and we apply it upon the tombstone technical timestamp, thus obtaining a proxy business timestamp for the tombstone of Jul 8, 2024 00:00:12.
This works under specific assumptions - which we made sure to make explicit to our consumers and/or to cover via data tests wherever possible, yet it’s a solution we are proud to have come up with and which didn’t generate issues for our consumers ever since.
Conclusions
The final code for the DBT macro looks like this:
{% macro get_filled_tombstone_timeline(main_cte_name, tombstone_cte_name, key_column_name, technical_time_column_name, business_time_column_name, columns=[]) -%}
,
--create an intertwining timeline of main and tombstone entities
unified_timeline as (
select
*,
false as _is_tombstone,
cast(null as timestamp_tz) as _deleted_at
from {{main_cte_name}}
union all
select
{%- for c in columns -%}
{%- if c == key_column_name or c == technical_time_column_name %}
{{c}},
{%- else %}
null as {{c}},
{%- endif -%}
{% endfor %}
true as _is_tombstone,
{{technical_time_column_name}} as _deleted_at
from {{tombstone_cte_name}}
),
final as (
select
{%- for c in columns -%}
{%- if c == key_column_name or c == technical_time_column_name %}
{{c}}, --retain from tombstone
{%- elif c == business_time_column_name %}
iff(_is_tombstone,
timestampadd(
millisecond,
greatest(
timestampdiff(
millisecond,
lag({{technical_time_column_name}}) over (
partition by {{key_column_name}}
order by {{technical_time_column_name}}
),
{{technical_time_column_name}}
),
1 --an arbitrary minimal offset to ensure PK is not violated
),
lag({{business_time_column_name}}) over (
partition by {{key_column_name}}
order by {{technical_time_column_name}}
)
),
{{business_time_column_name}}
) as {{business_time_column_name}},
{%- else %}
iff(
_is_tombstone,
lag({{c}}) over (
partition by {{key_column_name}}
order by {{technical_time_column_name}}
),
{{c}}
) as {{c}},
{%- endif -%}
{% endfor %}
_is_tombstone,
_deleted_at
from unified_timeline
)
This macro is expected to be called from within a DBT model which already provides one CTE for the main topic and another CTE for the tombstones respectively, both following harmonized naming conventions. This provides a nice separation of concerns, leaving the business logic for data preparation in the main model while extracting the reproducible/ standardized aspects regarding tombstone handling into the reusable macro. Cannot get more maintainable than that! 💯
If you found this article insightful and want to leverage data to drive your business forward, I’m here to help. My company offers expert data consulting services tailored to your specific needs. Let’s explore how we can turn your data into actionable insights and measurable results.
Location
US
Europe
Middle East
