aboutsummaryrefslogtreecommitdiffhomepage
path: root/benchmarks/util/big_query_utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'benchmarks/util/big_query_utils.py')
-rwxr-xr-xbenchmarks/util/big_query_utils.py189
1 files changed, 189 insertions, 0 deletions
diff --git a/benchmarks/util/big_query_utils.py b/benchmarks/util/big_query_utils.py
new file mode 100755
index 00000000..aea55bbd
--- /dev/null
+++ b/benchmarks/util/big_query_utils.py
@@ -0,0 +1,189 @@
+#!/usr/bin/env python2.7
+
+from __future__ import print_function
+import argparse
+import json
+import uuid
+import httplib2
+
+from apiclient import discovery
+from apiclient.errors import HttpError
+from oauth2client.client import GoogleCredentials
+
+# 30 days in milliseconds
+_EXPIRATION_MS = 30 * 24 * 60 * 60 * 1000
+NUM_RETRIES = 3
+
+
+def create_big_query():
+ """Authenticates with cloud platform and gets a BiqQuery service object
+ """
+ creds = GoogleCredentials.get_application_default()
+ return discovery.build(
+ 'bigquery', 'v2', credentials=creds, cache_discovery=False)
+
+
+def create_dataset(biq_query, project_id, dataset_id):
+ is_success = True
+ body = {
+ 'datasetReference': {
+ 'projectId': project_id,
+ 'datasetId': dataset_id
+ }
+ }
+
+ try:
+ dataset_req = biq_query.datasets().insert(
+ projectId=project_id, body=body)
+ dataset_req.execute(num_retries=NUM_RETRIES)
+ except HttpError as http_error:
+ if http_error.resp.status == 409:
+ print('Warning: The dataset %s already exists' % dataset_id)
+ else:
+ # Note: For more debugging info, print "http_error.content"
+ print('Error in creating dataset: %s. Err: %s' % (dataset_id,
+ http_error))
+ is_success = False
+ return is_success
+
+
+def create_table(big_query, project_id, dataset_id, table_id, table_schema,
+ description):
+ fields = [{
+ 'name': field_name,
+ 'type': field_type,
+ 'description': field_description
+ } for (field_name, field_type, field_description) in table_schema]
+ return create_table2(big_query, project_id, dataset_id, table_id, fields,
+ description)
+
+
+def create_partitioned_table(big_query,
+ project_id,
+ dataset_id,
+ table_id,
+ table_schema,
+ description,
+ partition_type='DAY',
+ expiration_ms=_EXPIRATION_MS):
+ """Creates a partitioned table. By default, a date-paritioned table is created with
+ each partition lasting 30 days after it was last modified.
+ """
+ fields = [{
+ 'name': field_name,
+ 'type': field_type,
+ 'description': field_description
+ } for (field_name, field_type, field_description) in table_schema]
+ return create_table2(big_query, project_id, dataset_id, table_id, fields,
+ description, partition_type, expiration_ms)
+
+
+def create_table2(big_query,
+ project_id,
+ dataset_id,
+ table_id,
+ fields_schema,
+ description,
+ partition_type=None,
+ expiration_ms=None):
+ is_success = True
+
+ body = {
+ 'description': description,
+ 'schema': {
+ 'fields': fields_schema
+ },
+ 'tableReference': {
+ 'datasetId': dataset_id,
+ 'projectId': project_id,
+ 'tableId': table_id
+ }
+ }
+
+ if partition_type and expiration_ms:
+ body["timePartitioning"] = {
+ "type": partition_type,
+ "expirationMs": expiration_ms
+ }
+
+ try:
+ table_req = big_query.tables().insert(
+ projectId=project_id, datasetId=dataset_id, body=body)
+ res = table_req.execute(num_retries=NUM_RETRIES)
+ print('Successfully created %s "%s"' % (res['kind'], res['id']))
+ except HttpError as http_error:
+ if http_error.resp.status == 409:
+ print('Warning: Table %s already exists' % table_id)
+ else:
+ print('Error in creating table: %s. Err: %s' % (table_id,
+ http_error))
+ is_success = False
+ return is_success
+
+
+def patch_table(big_query, project_id, dataset_id, table_id, fields_schema):
+ is_success = True
+
+ body = {
+ 'schema': {
+ 'fields': fields_schema
+ },
+ 'tableReference': {
+ 'datasetId': dataset_id,
+ 'projectId': project_id,
+ 'tableId': table_id
+ }
+ }
+
+ try:
+ table_req = big_query.tables().patch(
+ projectId=project_id,
+ datasetId=dataset_id,
+ tableId=table_id,
+ body=body)
+ res = table_req.execute(num_retries=NUM_RETRIES)
+ print('Successfully patched %s "%s"' % (res['kind'], res['id']))
+ except HttpError as http_error:
+ print('Error in creating table: %s. Err: %s' % (table_id, http_error))
+ is_success = False
+ return is_success
+
+
+def insert_rows(big_query, project_id, dataset_id, table_id, rows_list):
+ is_success = True
+ body = {'rows': rows_list}
+ try:
+ insert_req = big_query.tabledata().insertAll(
+ projectId=project_id,
+ datasetId=dataset_id,
+ tableId=table_id,
+ body=body)
+ res = insert_req.execute(num_retries=NUM_RETRIES)
+ if res.get('insertErrors', None):
+ print('Error inserting rows! Response: %s' % res)
+ is_success = False
+ except HttpError as http_error:
+ print('Error inserting rows to the table %s' % table_id)
+ is_success = False
+
+ return is_success
+
+
+def sync_query_job(big_query, project_id, query, timeout=5000):
+ query_data = {'query': query, 'timeoutMs': timeout}
+ query_job = None
+ try:
+ query_job = big_query.jobs().query(
+ projectId=project_id,
+ body=query_data).execute(num_retries=NUM_RETRIES)
+ except HttpError as http_error:
+ print('Query execute job failed with error: %s' % http_error)
+ print(http_error.content)
+ return query_job
+
+
+ # List of (column name, column type, description) tuples
+def make_row(unique_row_id, row_values_dict):
+ """row_values_dict is a dictionary of column name and column value.
+ """
+ return {'insertId': unique_row_id, 'json': row_values_dict}