Slow Changing Dimensions Type 2 (SCD T2) is a pattern in data warehouses where you store every version of a record, in order to keep track of historic data. In other words, when a record changes, we insert a new row in a history table. This allows users to go back in time and see what the data looked like on a specific date & time.

Example table Customers:

Id    Name       City
1     ACME Inc   NY
2     Pepsi      WA
3     Cola       BR

Example history table for Customers using SCD T2 where the name of ACME was updated on 15th of Jan from “ACME Ltd” to “ACME Inc”:

Id    Name       City    Timestamp      History id (UUID)
1     ACME Ltd   NY      2024-01-01     e341ac...
1     ACME Inc   NY      2024-01-15     6a52bb...
2     Pepsi      WA      2024-01-01     72bc34...
3     Cola       BR      2024-01-01     d383a2...

We can implement this pattern in Peliqan with a small low-code script. It's the magic of the function dw.write() that makes this implementation so simple. See source code below.

SCD T2 flow.png

Note that this is only a boilerplate script, more logic needs to be added to make it work at scale:

import uuid

dbconn = pq.dbconnect(connection_name)

# Change these settings
connection_name = 'dw_123' # See "Data warehouse" under My Connections
db_name = 'dw_123'
schema_name = 'chargebee'
table_name = 'transactions'
pk_field_name = 'transaction_id'

# No change needed
history_schema_name = 'history'
history_tabl_name = table_name + '_history'
history_pk_field_name = 'history_id'

def get_history_row(row):
    query = (
        "SELECT * FROM " + history_tabl_name + " WHERE " + pk_field_name + " = '" + 
        row[pk_field_name] + "' ORDER BY _sdc_batched_at DESC"
    )
    history_rows = dbconn.fetch(db_name, history_schema_name, query = query)
    if history_rows:
        return history_rows[0]
    else:
        return None

def is_different(current_row, history_row):
    for key, val in current_row.items():
        if key[0] != '_':  # exclude metada (columns _sdc_...)
            if key not in history_row:
                return True
            if history_row[key] != val:
                return True
    return False

def write_history(row):
    row[history_pk_field_name] = str(uuid.uuid4())        
    result = dbconn.write(history_schema_name, history_tabl_name, row, pk = history_pk_field_name)
    if result['status'] != 'success':
        st.json(result)

rows = dbconn.fetch(db_name, schema_name, table_name)
for row in rows:
    history_row = get_history_row(row)
    if not history_row or is_different(row, history_row):
        write_history(row)