From 52827f86e1dd38368a63dd608eca9c420a41eb4c Mon Sep 17 00:00:00 2001 From: slawekrewaj Date: Tue, 17 Sep 2019 16:12:24 +0200 Subject: [PATCH] Supporting illegal BigQuery characters in table and field names --- target_bigquery.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/target_bigquery.py b/target_bigquery.py index 111d11f..fff724b 100644 --- a/target_bigquery.py +++ b/target_bigquery.py @@ -10,6 +10,7 @@ import http.client import urllib import pkg_resources +import re from jsonschema import validate import singer @@ -51,8 +52,11 @@ def emit_state(state): def clear_dict_hook(items): return {k: v if v is not None else '' for k, v in items} +def fix_name(name): + return re.sub('[^a-zA-Z0-9_]', '_', name) + def define_schema(field, name): - schema_name = name + schema_name = fix_name(name) schema_type = "STRING" schema_mode = "NULLABLE" schema_description = None @@ -139,7 +143,10 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida validate(msg.record, schema) # NEWLINE_DELIMITED_JSON expects literal JSON formatted data, with a newline character splitting each row. - dat = bytes(json.dumps(msg.record) + '\n', 'UTF-8') + new_record = {} + for key, value in msg.record.items(): + new_record[fix_name(key)] = value + dat = bytes(json.dumps(new_record) + '\n', 'UTF-8') rows[msg.stream].write(dat) #rows[msg.stream].write(bytes(str(msg.record) + '\n', 'UTF-8')) @@ -170,7 +177,7 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, valida raise Exception("Unrecognized message {}".format(msg)) for table in rows.keys(): - table_ref = bigquery_client.dataset(dataset_id).table(table) + table_ref = bigquery_client.dataset(dataset_id).table(fix_name(table)) SCHEMA = build_schema(schemas[table]) load_config = LoadJobConfig() load_config.schema = SCHEMA