Writing to tables

Here are basic examples of writing data to tables:

dbconn = pq.dbconnect(pq.DW_NAME)

# Insert a row in a table
dbconn.insert('db_name', 'schema_name', 'table_name', record_dict)

# Update a row in a table
dbconn.update('db_name', 'schema_name', 'table_name', 'row_pk_value', record_dict)

# Upsert a row in a table (insert or update)
dbconn.upsert('db_name', 'schema_name', 'table_name', 'row_pk_value', record_dict)

# Execute an SQL query
dbconn.execute('db_name', query='TRUNCATE TABLE schema_name.table_name')

# Write a list of records to a table using pipeline logic, which will automatically create the table and columns (details below)
dbconn.write('schema_name', 'table_name', records, pk='id')

Difference between upsert() and write()

Difference between dbconn.upsert() and dbconn.write():

dbconn.upsert() dbconn.write()
Use case Insert and update records in operational tables. Implement a pipeline: write records to a target table.
Avoids duplicates Yes (based on PK). Yes (based on PK).
Creates columns No. Yes.
If record contains new keys, the columns in the table are automatically added.
If PK does not exist Inserts new row. Inserts new row.
If PK exists Updates row:
Columns not present in upsert() will remain unchanged. Replaces row:
Columns not present in write() will become empty.
Metadata columns None Adds and updates meta data columns:
_sdc_batched_at: timestamp update
_sdc_sequence etc.
Avoids unneeded updates (*) Yes Yes
Creates child tables for nested data No Yes

(*) The record is not updated if there are no actual changes. This means that any columns such as timestamp_lastupdate are not updated unnecessarily. This is important in incremental pipelines and data syncs that use timestamp fields to trigger updates in a target (avoid constant triggering of updates/syncs).

dbonn.write(): writing to tables using data pipeline logic

The function dbconn.write() uses pipeline logic (Singer) to write rows to a target table in a data warehouse.

This function will create the table if it does not exist, and it will add or alter columns when needed.

dbconn = pq.dbconnect(pq.DW_NAME)

# Write a record (object, dict) to a table
dbconn.write('schema_name', 'table_name', record_object, pk='id')

# Write a record (object, dict) to a table - example
dbconn.write('schema_name', 'table_name', [{'id': 1, 'name': 'John'}], pk='id')

# Write a list of records to a table
dbconn.write('schema_name', 'table_name', records_list, pk='id')

# Write a dataframe to a table
records_list = df.to_dict(orient = 'records')
dbconn.write('schema_name', 'table_name', records_list, pk='id')

# Write in batch
batch_size = 100
batches = [rows[i:i+batch_size] for i in range(0, len(rows), batch_size)]
for batch in batches:
    dbconn.write('schema_nema', 'table_name', batch, pk = 'id')

dbconn.write() splitting of columns when data changes

By default the function dbconn.write() performs introspection on the provided data to discover the column types.

If the column types change between two writes, an existing column will be split into 2 columns.

For example an existing numeric column age will be split into age__s and age__i when you send a text value for age.

The __s column will contain string values, the __i column will contain the integer values.

In a similar manner an __s (string) and __t (timestamp) column will be created when writing a string (text) to an existing timestamp column.

Example:

dbconn.write(schema_name, table_name, records=[{'id': 1, 'name': 'John', 'age': 30}], pk='id')
# Result: a new table will be created with an id, name and age column. The age column will be of type 'integer'.

dbconn.write(schema_name, table_name, records=[{'id': 2, 'name': 'Anne', 'age': '20-30'}], pk='id')
# Result: the age column will be split in age__i and age__s.