data_contracts.png

Define the expected schemas in a data contract and test your datasets against defined contracts.

Defining a data contract

Peliqan supports the open source data contract standard defined by Paypal:

https://github.com/paypal/data-contract-template

Here’s a very simple example in YAML format, with many properties omitted:

dataset:
  - table: campaigns
    columns:
      - column: id
        logicalType: int
        isNullable: false
        description: unique id of the marketing campaign
      - column: name
        logicalType: string
        isNullable: true
        description: name of the marketing campaign

Testing a dataset for compliance with a data contract

Once you have defined a data contract, you can use a script to test one or more tables for compliance with the given contract.

Here’s a simplified end-to-end example:

import yaml

datacontract="""
dataset:
  - table: campaigns_template
    columns:
      - column: id
        logicalType: int
        isNullable: false
        description: unique id of the marketing campaign
      - column: name__1
        logicalType: string
        isNullable: true
        description: name of the marketing campaign
"""

def main():
	table_to_test = "campaigns"
	datacontract_test_table(datacontract, "campaigns_template", table_to_test)

### Helper functions ###

def datacontract_get_table(datacontract, tablename):
  dc = yaml.safe_load(datacontract)
  if 'dataset' not in dc:
    raise Exception("Key 'dataset' missing in data contract.")
    return
  for table in dc['dataset']:
    if 'table' in table and table['table']==tablename:
      return table

def datacontract_to_sql(datacontract, datacontract_tablename, tablename):
  table = datacontract_get_table(datacontract, datacontract_tablename)
  sql_columns = []
  if 'columns' in table:
    for column in table['columns']:
      if 'column' in column:
        columnselect = column['column']
        if 'logicalType' in column:
          if column['logicalType'] == 'int':
            columnselect = "round(%s)" % columnselect # test if value is numeric
          elif column['logicalType'] == 'string':
            columnselect = "trim(%s)" % columnselect # test if value is string
        sql_columns.append(columnselect)
  sql_columns_csv = ','.join(sql_columns)
  sql = "SELECT %s FROM %s" % (sql_columns_csv, tablename)
  return sql

def datacontract_test_table(datacontract, datacontract_tablename, tablename):
  sql = datacontract_to_sql(datacontract, datacontract_tablename, tablename)
  try:
    sample_sql = sql + " LIMIT 1"
    df = pq.load_table(query = sample_sql)
  except Exception as e:
    st.error("Table %s does not comply with the data contract ! Detailed error: %s" % (tablename, e))
    exit()

  st.success("Table %s complies with the data contract !" % tablename)

main()

When the test fails, you could send out an alert to e.g. a channel in Slack:

Click here for more info on the Slack connector