Writing to tables

Here are basic examples of writing data to tables:

dbconn = pq.dbconnect(pq.DW_NAME)

# Insert a row in a table
dbconn.insert('db_name', 'schema_name', 'table_name', record_dict)

# Update a row in a table
dbconn.update('db_name', 'schema_name', 'table_name', 'row_pk_value', record_dict)

# Upsert a row in a table (insert or update)
dbconn.upsert('db_name', 'schema_name', 'table_name', 'row_pk_value', record_dict)

# Execute an SQL query
dbconn.execute('db_name', query='TRUNCATE TABLE schema_name.table_name')

# Write a list of records to a table using pipeline logic, which will automatically create the table and columns (details below)
dbconn.write('schema_name', 'table_name', records, pk='id')

Difference between upsert() and write()

Difference between dbconn.upsert() and dbconn.write():

dbconn.upsert() dbconn.write()
Use case Insert and update records in operational tables. Implement a pipeline: write records to a target table.
Avoids duplicates Yes (based on PK). Yes (based on PK).
Creates columns No. Yes.

If record contains new keys, the columns in the table are automatically added.

All columns (except PK field) are created as nullable. | | If PK does not exist | Inserts new row. | Inserts new row. | | If PK exists | Updates row: Columns not present in upsert() will remain unchanged. | Replaces row: Columns not present in write() will become empty. | | Metadata columns | None | Adds and updates meta data columns: _sdc_batched_at: timestamp update _sdc_sequence etc. | | Avoids unneeded updates (*) | Yes | Yes | | Creates child tables for nested data | No | Yes |

(*) The record is not updated if there are no actual changes. This means that any columns such as timestamp_lastupdate are not updated unnecessarily. This is important in incremental pipelines and data syncs that use timestamp fields to trigger updates in a target (avoid constant triggering of updates/syncs).

dbonn.write(): writing to tables using ELT data pipeline logic

The function dbconn.write() uses ELT data pipeline logic (Singer) to write rows to a target table in a data warehouse.

This function will create the table if it does not exist, and it will add or alter columns when needed.

dbconn.write(
    schema_name,                  # required
    table_name,                   # required
    records,                      # required: list of dicts, single dict, or DataFrame
    object_schema=None,           # optional: explicit column type definitions
    pk=None,                      # optional: primary key column(s), string or list
    db_name=None,                 # optional: database name
    transformer_mode='lossless',  # optional: 'strict', 'lossless', 'lossy', or None
    decimal_separator='.',        # optional: '.' or ','
)

Examples:

dbconn = pq.dbconnect(pq.DW_NAME)

# Write a record (object, dict) to a table
dbconn.write('schema_name', 'table_name', record_object, pk='id')

# Write a record (object, dict) to a table - example
dbconn.write('schema_name', 'table_name', [{'id': 1, 'name': 'John'}], pk='id')

# Write a list of records to a table
dbconn.write('schema_name', 'table_name', records_list, pk='id')

# Write a dataframe to a table
records_list = df.to_dict(orient = 'records')
dbconn.write('schema_name', 'table_name', records_list, pk='id')

# Write in batch
batch_size = 100
batches = [rows[i:i+batch_size] for i in range(0, len(rows), batch_size)]
for batch in batches:
    dbconn.write('schema_nema', 'table_name', batch, pk = 'id')

Overview of how data is written to the target data warehouse

db.write.png

Column types

dbconn.write() will first determine the column types of the target table.

On the first write, the target table does not exist yet. If an object_schema is set, this will be used to create the columns. If no object_schema is set, introspection will be performed on the rows (input data) to determine the column types to create.

From the second write into an existing table, the column types are fixed and will not change, unless an object_schema is set. If there is a conflict between the object_schema and an existing column, the existing column will be split, see detail below. New columns will be added as needed.

Once the object_schema is determined (because it is set or because the existing columns are used or because of introspection) each row if the input data is transformed, see betails below.