Here are basic examples of writing data to tables:
dbconn = pq.dbconnect(pq.DW_NAME)
# Insert a row in a table
dbconn.insert('db_name', 'schema_name', 'table_name', record_dict)
# Update a row in a table
dbconn.update('db_name', 'schema_name', 'table_name', 'row_pk_value', record_dict)
# Upsert a row in a table (insert or update)
dbconn.upsert('db_name', 'schema_name', 'table_name', 'row_pk_value', record_dict)
# Execute an SQL query
dbconn.execute('db_name', query='TRUNCATE TABLE schema_name.table_name')
# Write a list of records to a table using pipeline logic, which will automatically create the table and columns (details below)
dbconn.write('schema_name', 'table_name', records, pk='id')
Difference between dbconn.upsert()
and dbconn.write()
:
dbconn.upsert() | dbconn.write() | |
---|---|---|
Use case | Insert and update records in operational tables. | Implement a pipeline: write records to a target table. |
Avoids duplicates | Yes (based on PK). | Yes (based on PK). |
Creates columns | No. | Yes. |
If record contains new keys, the columns in the table are automatically added. | ||
If PK does not exist | Inserts new row. | Inserts new row. |
If PK exists | Updates row: | |
Columns not present in upsert() will remain unchanged. | Replaces row: | |
Columns not present in write() will become empty. | ||
Metadata columns | None | Adds and updates meta data columns: |
_sdc_batched_at: timestamp update | ||
_sdc_sequence etc. | ||
Avoids unneeded updates (*) | Yes | Yes |
Creates child tables for nested data | No | Yes |
(*) The record is not updated if there are no actual changes. This means that any columns such as timestamp_lastupdate
are not updated unnecessarily. This is important in incremental pipelines and data syncs that use timestamp fields to trigger updates in a target (avoid constant triggering of updates/syncs).
The function dbconn.write()
uses pipeline logic (Singer) to write rows to a target table in a data warehouse.
This function will create the table if it does not exist, and it will add or alter columns when needed.
dbconn = pq.dbconnect(pq.DW_NAME)
# Write a record (object, dict) to a table
dbconn.write('schema_name', 'table_name', record_object, pk='id')
# Write a record (object, dict) to a table - example
dbconn.write('schema_name', 'table_name', [{'id': 1, 'name': 'John'}], pk='id')
# Write a list of records to a table
dbconn.write('schema_name', 'table_name', records_list, pk='id')
# Write a dataframe to a table
records_list = df.to_dict(orient = 'records')
dbconn.write('schema_name', 'table_name', records_list, pk='id')
# Write in batch
batch_size = 100
batches = [rows[i:i+batch_size] for i in range(0, len(rows), batch_size)]
for batch in batches:
dbconn.write('schema_nema', 'table_name', batch, pk = 'id')
By default the function dbconn.write()
performs introspection on the provided data to discover the column types.
If the column types change between two writes, an existing column will be split into 2 columns.
For example an existing numeric column age
will be split into age__s
and age__i
when you send a text value for age.
The __s
column will contain string values, the __i
column will contain the integer values.
In a similar manner an __s
(string) and __t
(timestamp) column will be created when writing a string (text) to an existing timestamp column.
Example:
dbconn.write(schema_name, table_name, records=[{'id': 1, 'name': 'John', 'age': 30}], pk='id')
# Result: a new table will be created with an id, name and age column. The age column will be of type 'integer'.
dbconn.write(schema_name, table_name, records=[{'id': 2, 'name': 'Anne', 'age': '20-30'}], pk='id')
# Result: the age column will be split in age__i and age__s.