• Stars
    star
    449
  • Rank 95,071 (Top 2 %)
  • Language
    Python
  • License
    Apache License 2.0
  • Created over 10 years ago
  • Updated over 2 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Simple Python client for interacting with Google BigQuery.

BigQuery-Python

Simple Python client for interacting with Google BigQuery.

This client provides an API for retrieving and inserting BigQuery data by wrapping Google's low-level API client library. It also provides facilities that make it convenient to access data that is tied to an App Engine appspot, such as request logs.

Documentation

Installation

pip install bigquery-python

Basic Usage

from bigquery import get_client

# BigQuery project id as listed in the Google Developers Console.
project_id = 'project_id'

# Service account email address as listed in the Google Developers Console.
service_account = '[email protected]'

# PKCS12 or PEM key provided by Google.
key = 'key.pem'

client = get_client(project_id, service_account=service_account,
                    private_key_file=key, readonly=True)

# JSON key provided by Google
json_key = 'key.json'
 
client = get_client(json_key_file=json_key, readonly=True)

# Submit an async query.
job_id, _results = client.query('SELECT * FROM dataset.my_table LIMIT 1000')

# Check if the query has finished running.
complete, row_count = client.check_job(job_id)

# Retrieve the results.
results = client.get_query_rows(job_id)

Executing Queries

The BigQuery client allows you to execute raw queries against a dataset. The query method inserts a query job into BigQuery. By default, query method runs asynchronously with 0 for timeout. When a non-zero timeout value is specified, the job will wait for the results, and throws an exception on timeout.

When you run an async query, you can use the returned job_id to poll for job status later with check_job.

# Submit an async query
job_id, _results = client.query('SELECT * FROM dataset.my_table LIMIT 1000')

# Do other stuffs

# Poll for query completion.
complete, row_count = client.check_job(job_id)

# Retrieve the results.
if complete:
    results = client.get_query_rows(job_id)

You can also specify a non-zero timeout value if you want your query to be synchronous.

# Submit a synchronous query
try:
    _job_id, results = client.query('SELECT * FROM dataset.my_table LIMIT 1000', timeout=10)
except BigQueryTimeoutException:
    print "Timeout"

Query Builder

The query_builder module provides an API for generating query strings that can be run using the BigQuery client.

from bigquery.query_builder import render_query

selects = {
    'start_time': {
        'alias': 'Timestamp',
        'format': 'INTEGER-FORMAT_UTC_USEC'
    }
}

conditions = [
    {
        'field': 'Timestamp',
        'type': 'INTEGER',
        'comparators': [
            {
                'condition': '>=',
                'negate': False,
                'value': 1399478981
            }
        ]
    }
]

grouping = ['Timestamp']

having = [
    {
        'field': 'Timestamp',
        'type': 'INTEGER',
        'comparators': [
            {
                'condition': '==',
                'negate': False,
                'value': 1399478981
            }
        ]
    }
]

order_by ={'fields': ['Timestamp'], 'direction': 'desc'}

query = render_query(
    'dataset',
    ['table'],
    select=selects,
    conditions=conditions,
    groupings=grouping,
    having=having,
    order_by=order_by,
    limit=47
)

job_id, _ = client.query(query)

Managing Tables

The BigQuery client provides facilities to manage dataset tables, including creating, deleting, checking the existence, and getting the metadata of tables.

# Create a new table.
schema = [
    {'name': 'foo', 'type': 'STRING', 'mode': 'nullable'},
    {'name': 'bar', 'type': 'FLOAT', 'mode': 'nullable'}
]
created = client.create_table('dataset', 'my_table', schema)

# Delete an existing table.
deleted = client.delete_table('dataset', 'my_table')

# Check if a table exists.
exists = client.check_table('dataset', 'my_table')

# Get a table's full metadata. Includes numRows, numBytes, etc. 
# See: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables
metadata = client.get_table('dataset', 'my_table')

There is also functionality for retrieving tables that are associated with a Google App Engine appspot, assuming table names are in the form of appid_YYYY_MM or YYYY_MM_appid. This allows tables between a date range to be selected and queried on.

# Get appspot tables falling within a start and end time.
from datetime import datetime, timedelta
range_end = datetime.utcnow()
range_start = range_end - timedelta(weeks=12)
tables = client.get_tables('dataset', 'appid', range_start, range_end)

Inserting Data

The client provides an API for inserting data into a BigQuery table. The last parameter refers to an optional insert id key used to avoid duplicate entries.

# Insert data into table.
rows =  [
    {'one': 'ein', 'two': 'zwei'},
    {'id': 'NzAzYmRiY', 'one': 'uno', 'two': 'dos'},
    {'id': 'NzAzYmRiY', 'one': 'ein', 'two': 'zwei'} # duplicate entry
]

inserted = client.push_rows('dataset', 'table', rows, 'id')

Write Query Results to Table

You can write query results directly to table. When either dataset or table parameter is omitted, query result will be written to temporary table.

# write to permanent table
job = client.write_to_table('SELECT * FROM dataset.original_table LIMIT 100',
                            'dataset',
                            'table')
try:
    job_resource = client.wait_for_job(job, timeout=60)
    print job_resource
except BigQueryTimeoutException:
    print "Timeout"

# write to permanent table with UDF in query string
external_udf_uris = ["gs://bigquery-sandbox-udf/url_decode.js"]
query = """SELECT requests, title
            FROM
              urlDecode(
                SELECT
                  title, sum(requests) AS num_requests
                FROM
                  [fh-bigquery:wikipedia.pagecounts_201504]
                WHERE language = 'fr'
                GROUP EACH BY title
              )
            WHERE title LIKE '%ç%'
            ORDER BY requests DESC
            LIMIT 100
        """
job = client.write_to_table(
  query,
  'dataset',
  'table',
  external_udf_uris=external_udf_uris
)

try:
    job_resource = client.wait_for_job(job, timeout=60)
    print job_resource
except BigQueryTimeoutException:
    print "Timeout"

# write to temporary table
job = client.write_to_table('SELECT * FROM dataset.original_table LIMIT 100')
try:
    job_resource = client.wait_for_job(job, timeout=60)
    print job_resource
except BigQueryTimeoutException:
    print "Timeout"

Import data from Google cloud storage

schema = [ {"name": "username", "type": "string", "mode": "nullable"} ]
job = client.import_data_from_uris( ['gs://mybucket/mydata.json'],
                                    'dataset',
                                    'table',
                                    schema,
                                    source_format=JOB_SOURCE_FORMAT_JSON)

try:
    job_resource = client.wait_for_job(job, timeout=60)
    print job_resource
except BigQueryTimeoutException:
    print "Timeout"

Export data to Google cloud storage

job = client.export_data_to_uris( ['gs://mybucket/mydata.json'],
                                   'dataset',
                                   'table')
try:
    job_resource = client.wait_for_job(job, timeout=60)
    print job_resource
except BigQueryTimeoutException:
    print "Timeout"

Managing Datasets

The client provides an API for listing, creating, deleting, updating and patching datasets.

# List datasets
datasets = client.get_datasets()


# Create dataset
dataset = client.create_dataset('mydataset', friendly_name="My Dataset", description="A dataset created by me")

# Get dataset
client.get_dataset('mydataset')

# Delete dataset
client.delete_dataset('mydataset')
client.delete_dataset('mydataset', delete_contents=True) # delete even if it contains data

# Update dataset
client.update_dataset('mydataset', friendly_name="mon Dataset") # description is deleted

# Patch dataset
client.patch_dataset('mydataset', friendly_name="mon Dataset") # friendly_name changed; description is preserved

# Check if dataset exists.
exists = client.check_dataset('mydataset')

Creating a schema from a sample record

from bigquery import schema_from_record

schema_from_record({"id":123, "posts": [{"id":123, "text": "this is a post"}], "username": "bob"})

Contributing

Requirements to commit here:

  • Branch off master, PR back to master.
  • Your code should pass Flake8.
  • Unit test coverage is required.
  • Good docstrs are required.
  • Good commit messages are required.

More Repositories

1

comcast

Simulating shitty network connections so you can build better systems.
Go
10,227
star
2

BoomFilters

Probabilistic data structures for processing continuous, unbounded streams.
Go
1,580
star
3

chan

Pure C implementation of Go channels.
Shell
1,395
star
4

bench

A generic latency benchmarking library.
Go
305
star
5

Flotilla

Automated message queue orchestration for scaled-up benchmarking.
Go
235
star
6

mq-benchmarking

Performance benchmarks for various message queues.
Go
191
star
7

CS-Literature-of-the-Day

A curated list of computer science literature, updated daily.
163
star
8

go-benchmarks

A few miscellaneous Go microbenchmarks.
Go
146
star
9

fast-topic-matching

Messaging middleware topic matching implementations.
Go
73
star
10

nanomsg-service-discovery

Service-discovery pattern implemented with nanomsg.
Python
25
star
11

gatling

Minimal pub/sub message queue in C.
C
20
star
12

Zinc

Real-time distributed messaging and document synchronization.
Java
15
star
13

InverseBloomFilter

Concurrent inverse Bloom filter.
Go
13
star
14

PubSub-Python

Simple Python client for interacting with Google Cloud Pub/Sub.
Python
9
star
15

log-benchmarking

Benchmarking for distributed logs.
Go
9
star
16

thrift-nats

Thrift RPC over NATS.
Go
8
star
17

go-fast

Material for 'So You Wanna Go Fast?' Strange Loop talk
Go
8
star
18

OpenGL-Playground

Playing with OpenGL.
C
7
star
19

go-zab

Go implementation of ZooKeeper Atomic Broadcast.
Go
7
star
20

Dalvik-Baksmali

Android library for disassembling DEX files using baksmali.
Java
7
star
21

nats-leader-election

Leader election implemented with Raft over NATS.
Go
6
star
22

hdrhistogram-writer

API for writing HdrHistograms in a readable format consumable by http://hdrhistogram.github.io/HdrHistogram/plotFiles.html.
Go
6
star
23

httpmonitor

CLI tool for monitoring HTTP traffic.
Go
5
star
24

kaput.io

Laws of General Systemantics.
CSS
5
star
25

sync

Some basic synchronization primitives.
C
4
star
26

rdcss

RDCSS double-compare-single-swap.
Go
4
star
27

dockerfiles

Contain yourself.
Shell
3
star
28

gae-circleci-java

Example Java Spring app running on Google App Engine deployed from CircleCI.
Java
2
star
29

uthread

C
2
star
30

dexmaker

Java
2
star
31

gatling-gun

Go client for gatling broker.
Go
2
star
32

matchboxd

Replicated matchbox server (https://github.com/Workiva/matchbox)
Go
2
star
33

vessel

Fast, asynchronous client-server messaging.
Go
2
star
34

SQLAlchemyUtils

Useful utilities for working with SQLAlchemy.
Python
2
star
35

TrendBlotter

The world talks, we listen.
JavaScript
2
star
36

vessel.js

Vessel messaging client.
JavaScript
2
star
37

gaeutils

Useful utilities for working with Google App Engine.
Python
1
star
38

crapchat

A janky and obnoxious chat application.
Go
1
star
39

Jockey

Seamless Google App Engine authentication for Android.
Java
1
star
40

type-ahead-search

Toy type-ahead search.
Java
1
star
41

nimbus

Nim
1
star
42

zk-select

Go
1
star
43

dotfiles

The most exciting thing you will ever see.
Vim Script
1
star
44

pipe-sem

C
1
star
45

webhook-test

webhook test
Python
1
star
46

jump-consistent-hash

Go implementation of Jump Consistent Hash.
Go
1
star
47

config-parser

A simple API for parsing ini-like configuration files.
1
star