JSON table schema to bigquery.TableSchema for BigQuerySink

bigquery get table schema
bigquery create table from json schema
bigquery schema generator
bigquery auto detect schema
bigquery export schema
bigquery avro schema
bigquery create table repeated
bigquery json column

I have a non-trivial table schema (involving nested and repeated fields) defined in JSON format (with the name, type, mode attributes) and stored in a file. It has been successfully used to populate a bigquery table with bq load command.

But when I try to do the same thing with Dataflow Python SDK and BigQuerySink, the schema argument needs to be either a comma-separated list of 'name':'type' elements, or a bigquery.TableSchema object.

Is there any convenient way of getting my JSON schema to a bigquery.TableSchema, or do I have to transform it to a name:value list?

Currently you cannot directly specify a JSON schema. You have to specify the schema either as a string that contains a comma separated list of fields or a bigquery.TableSchema object.

If the schema is complex and contains nested and/or repeated fields, we recommend building a bigquery.TableSchema object.

Here is an example bigquery.TableSchema object with nested and repeated fields.

from apitools.clients import bigquery

table_schema = bigquery.TableSchema()

# ‘string’ field
field_schema = bigquery.TableFieldSchema()
field_schema.name = 'fullName'
field_schema.type = 'string'
field_schema.mode = 'required'
table_schema.fields.append(field_schema)

# ‘integer’ field
field_schema = bigquery.TableFieldSchema()
field_schema.name = 'age'
field_schema.type = 'integer'
field_schema.mode = 'nullable'
table_schema.fields.append(field_schema)

# nested field
field_schema = bigquery.TableFieldSchema()
field_schema.name = 'phoneNumber'
field_schema.type = 'record'
field_schema.mode = 'nullable'

area_code = bigquery.TableFieldSchema()
area_code.name = 'areaCode'
area_code.type = 'integer'
area_code.mode = 'nullable'
field_schema.fields.append(area_code)

number = bigquery.TableFieldSchema()
number.name = 'number'
number.type = 'integer'
number.mode = 'nullable'
field_schema.fields.append(number)
table_schema.fields.append(field_schema)

# repeated field
field_schema = bigquery.TableFieldSchema()
field_schema.name = 'children'
field_schema.type = 'string'
field_schema.mode = 'repeated'
table_schema.fields.append(field_schema)

Specifying a schema | BigQuery, BigQuery lets you specify a table's schema when you load data into a table, and Manually specifying a schema is supported when you load CSV and JSON  BigQuery lets you specify a table's schema when you load data into a table, and when you create an empty table. Alternatively, you can use schema auto-detection for supported data formats. When you load Avro, Parquet, ORC, Firestore export files, or Datastore export files, the schema is automatically retrieved from the self-describing source data.

I had the same problem. In my case I already had some json loaded in bigquery with a schema automatically generated.

So I was able to get the autogenerated schemawith the command:

bq show --format prettyjson my-gcp-project:my-bq-table |jq .schema > my-bq-table.json

the schema can then be transformed into a bigquery.TableSchema with this snippet

from apache_beam.io.gcp.internal.clients import bigquery


def _get_field_schema(**kwargs):
    field_schema = bigquery.TableFieldSchema()
    field_schema.name = kwargs['name']
    field_schema.type = kwargs.get('type', 'STRING')
    field_schema.mode = kwargs.get('mode', 'NULLABLE')
    fields = kwargs.get('fields')
    if fields:
        for field in fields:
            field_schema.fields.append(_get_field_schema(**field))
    return field_schema


def _inject_fields(fields, table_schema):
    for field in fields:
        table_schema.fields.append(_get_field_schema(**field))


def parse_bq_json_schema(schema):
    table_schema = bigquery.TableSchema()
    _inject_fields(schema['fields'], table_schema)
    return table_schema

It will work with the bigquery json schema specification and if you are lazy like me you can avoid to specify type and mode if you are happy with a field that is a nullable string by default.

Modifying table schemas | BigQuery, This document describes how to modify the schema definitions for existing BigQuery tables. BigQuery natively supports the following schema modifications:. Because the table you're querying is in another project, the bigquery-public-data project, you add the project ID to the dataset in the following format: `project_id`.dataset.INFORMATION_SCHEMA.view; for example, `bigquery-public-data`.census_bureau_usa.INFORMATION_SCHEMA.TABLES.

The above snippet posted by Andrea Pierleoni works with older versions of the google-cloud-bigquery python client, for example for version 0.25.0 of google-cloud-bigquery that happens to install via pip install apache-beam[gcp].

However, the BigQuery Python client API has changed drastically in more recent versions of google-cloud-bigquery, for example in version 1.8.0 that I am currently on, bigquery.TableFieldSchema() and bigquery.TableSchema() don't work.

If you're on a more recent version of the google-cloud-bigquery package, here's how you can get the required SchemaField list (required to create the table, for example) from a JSON file. This is an adaptation of the code posted above by Andrea Pierleoni (thanks for that!)

def _get_field_schema(field):
    name = field['name']
    field_type = field.get('type', 'STRING')
    mode = field.get('mode', 'NULLABLE')
    fields = field.get('fields', [])

    if fields:
        subschema = []
        for f in fields:
            fields_res = _get_field_schema(f)
            subschema.append(fields_res)
    else:
        subschema = []

    field_schema = bigquery.SchemaField(name=name, 
        field_type=field_type,
        mode=mode,
        fields=subschema
    )
    return field_schema


def parse_bq_json_schema(schema_filename):
    schema = []
    with open(schema_filename, 'r') as infile:
        jsonschema = json.load(infile)

    for field in jsonschema:
        schema.append(_get_field_schema(field))

    return schema

Now, suppose you had a table's schema already defined in JSON. Say you had this particular "schema.json" file, then using the above helper methods, you could get the required SchemaField representation for the Python client like so:

>>> res_schema = parse_bq_json_schema("schema.json")

>>> print(res_schema)

[SchemaField(u'event_id', u'INTEGER', u'REQUIRED', None, ()), SchemaField(u'event_name', u'STRING', u'REQUIRED', None, ()), SchemaField(u'event_types', u'STRING', u'REPEATED', None, ()), SchemaField(u'product_code', u'STRING', u'REQUIRED', None, ()), SchemaField(u'product_sub_code', u'STRING', u'REPEATED', None, ()), SchemaField(u'source', u'RECORD', u'REQUIRED', None, (SchemaField(u'internal', u'RECORD', u'NULLABLE', None, (SchemaField(u'name', u'STRING', u'REQUIRED', None, ()), SchemaField(u'timestamp', u'TIMESTAMP', u'REQUIRED', None, ()))), SchemaField(u'external', u'RECORD', u'NULLABLE', None, (SchemaField(u'name', u'STRING', u'REQUIRED', None, ()), SchemaField(u'timestamp', u'TIMESTAMP', u'REQUIRED', None, ()))))), SchemaField(u'timestamp', u'TIMESTAMP', u'REQUIRED', None, ()), SchemaField(u'user_key', u'RECORD', u'REQUIRED', None, (SchemaField(u'device_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'cookie_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'profile_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'best_id', u'STRING', u'REQUIRED', None, ()))), SchemaField(u'message_id', u'STRING', u'REQUIRED', None, ()), SchemaField(u'message_type', u'STRING', u'REQUIRED', None, ()), SchemaField(u'tracking_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'funnel_stage', u'STRING', u'NULLABLE', None, ()), SchemaField(u'location', u'RECORD', u'NULLABLE', None, (SchemaField(u'latitude', u'FLOAT', u'REQUIRED', None, ()), SchemaField(u'longitude', u'FLOAT', u'REQUIRED', None, ()), SchemaField(u'geo_region_id', u'INTEGER', u'NULLABLE', None, ()))), SchemaField(u'campaign_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'topic', u'STRING', u'REQUIRED', None, ())]

Now to create a table having the above schema using the Python SDK, you would do:

dataset_ref = bqclient.dataset('YOUR_DATASET')
table_ref = dataset_ref.table('YOUR_TABLE')
table = bigquery.Table(table_ref, schema=res_schema)

You could optionally set time-based partitioning (if needed) like this:

table.time_partitioning = bigquery.TimePartitioning(
    type_=bigquery.TimePartitioningType.DAY,
    field='timestamp'  # name of column to use for partitioning
)

And this finally creates the table:

table = bqclient.create_table(table)

print('Created table {}, partitioned on column {}'.format(
    table.table_id, table.time_partitioning.field))

apache_beam.io.gcp.bigquery, Similarly a Write transform to a BigQuerySink accepts PCollections of dictionaries​. A table has a schema (TableSchema), which in turn describes the schema of import from_json_value from apache_beam.internal.gcp.json_value import  tableschema-bigquery-py. Generate and load BigQuery tables based on Table Schema descriptors.. Features. implements tableschema.Storage interface; Contents. Getting Started

Here is a simple program that can help you.

import json
from apache_beam.io.gcp.internal.clients import bigquery


def bq_schema(json_schema):
    table_schema = bigquery.TableSchema()
    with open(json_schema) as json_file:
        data = json.load(json_file)
        for p in data:
            field = bigquery.TableFieldSchema()
            field.name = p['name']
            field.type = p['type']
            field.mode = p['mode']
            table_schema.fields.append(field)
    return table_schema

apache_beam.io.gcp.bigquery module, It relies on several classes exposed by the BigQuery API: TableSchema, Similarly a Write transform to a BigQuerySink accepts PCollections of dictionaries​. input entails exporting the table to a set of GCS files (currently in JSON format) and  I'm unable to find an existing method which load the table schema from a json file, instead of creating it manually from Schema/FieldList/Field classes. Something like. Schema schema = Schema.parseJson(jsonSchema); Is there way to load the json file or do I need to build a custom parser?

Bigquery get table schema, I have a non-trivial table schema (involving nested and repeated fields) defined in JSON format (wit. The Google BigQuery Sink Connector is used to stream data into BigQuery tables. The BigQuery table schema is based upon information in the Apache Kafka® schema for the topic.

google.cloud.dataflow.io.BigQuerySink Example, Get data into BigQuery Build your data schema. json. cloud import storage editor. schema (TableSchema) – The schema to be used if the BigQuery table to​  You can also remove a column by exporting your table data to Cloud Storage, deleting the data corresponding to the column (or columns) you want to remove, and then loading the data into a new table with a schema definition that does not include the removed column(s). You can also use the load job to overwrite the existing table.

python, BigQuerySink taken from open source projects. By voting up you can indicate sink.schema_as_json()) string_field = bigquery. bigquery.TableSchema(​fields = [record_field]). sink = df.io.BigQuerySink( 'dataset.table' , schema = schema). SELECT * FROM INFORMATION_SCHEMA.SCHEMATA_OPTIONS WHERE option_name="default_table_expiration_days" Note: INFORMATION_SCHEMA view names are case-sensitive. Click Run. bq . Use the query command and specify standard SQL syntax by using the --nouse_legacy_sql or --use_legacy_sql=false flag. Standard SQL syntax is required for INFORMATION_SCHEMA

Comments
  • Thanks! I just realized that the Python SDK is alpha, so I will direct further issues to github until it has matured a bit.
  • Thanks man! And if you need the schema to create the table in the BIgQuery UI (for example, partitioned tables must be created by hand) don't forget to select the fields key in the schema field, that is: bq show --format prettyjson my-gcp-project:my-bq-table | jq '.schema.fields' > my-bq-table.json