Slow Changing Dimensions Type 2 (SCD T2) is a pattern in data warehouses where you store every version of a record, in order to keep track of historic data. In other words, when a record changes, we insert a new row in a history table. This allows users to go back in time and see what the data looked like on a specific date & time.
Example table Customers:
Id Name City
1 ACME Inc NY
2 Pepsi WA
3 Cola BR
Example history table for Customers using SCD T2 where the name of ACME was updated on 15th of Jan from “ACME Ltd” to “ACME Inc”:
Id Name City Timestamp History id (UUID)
1 ACME Ltd NY 2024-01-01 e341ac...
1 ACME Inc NY 2024-01-15 6a52bb...
2 Pepsi WA 2024-01-01 72bc34...
3 Cola BR 2024-01-01 d383a2...
We can implement this pattern in Peliqan with a small low-code script. It's the magic of the function dw.write()
that makes this implementation so simple. See source code below.
Note that this is only a boilerplate script, more logic needs to be added to make it work at scale:
pq.get_state()
and pq.set_state()
to keep track of the last timestamp processed from the source (e.g. based on a timestamp column).import uuid
dbconn = pq.dbconnect(connection_name)
# Change these settings
connection_name = 'dw_123' # See "Data warehouse" under My Connections
db_name = 'dw_123'
schema_name = 'chargebee'
table_name = 'transactions'
pk_field_name = 'transaction_id'
# No change needed
history_schema_name = 'history'
history_tabl_name = table_name + '_history'
history_pk_field_name = 'history_id'
def get_history_row(row):
query = (
"SELECT * FROM " + history_tabl_name + " WHERE " + pk_field_name + " = '" +
row[pk_field_name] + "' ORDER BY _sdc_batched_at DESC"
)
history_rows = dbconn.fetch(db_name, history_schema_name, query = query)
if history_rows:
return history_rows[0]
else:
return None
def is_different(current_row, history_row):
for key, val in current_row.items():
if key[0] != '_': # exclude metada (columns _sdc_...)
if key not in history_row:
return True
if history_row[key] != val:
return True
return False
def write_history(row):
row[history_pk_field_name] = str(uuid.uuid4())
result = dbconn.write(history_schema_name, history_tabl_name, row, pk = history_pk_field_name)
if result['status'] != 'success':
st.json(result)
rows = dbconn.fetch(db_name, schema_name, table_name)
for row in rows:
history_row = get_history_row(row)
if not history_row or is_different(row, history_row):
write_history(row)