diff options
Diffstat (limited to 'tools/gcp/utils/big_query_utils.py')
-rwxr-xr-x | tools/gcp/utils/big_query_utils.py | 276 |
1 files changed, 148 insertions, 128 deletions
diff --git a/tools/gcp/utils/big_query_utils.py b/tools/gcp/utils/big_query_utils.py index 77a5f5691e..3e811ca2bf 100755 --- a/tools/gcp/utils/big_query_utils.py +++ b/tools/gcp/utils/big_query_utils.py @@ -28,154 +28,174 @@ NUM_RETRIES = 3 def create_big_query(): - """Authenticates with cloud platform and gets a BiqQuery service object + """Authenticates with cloud platform and gets a BiqQuery service object """ - creds = GoogleCredentials.get_application_default() - return discovery.build('bigquery', 'v2', credentials=creds, cache_discovery=False) + creds = GoogleCredentials.get_application_default() + return discovery.build( + 'bigquery', 'v2', credentials=creds, cache_discovery=False) def create_dataset(biq_query, project_id, dataset_id): - is_success = True - body = { - 'datasetReference': { - 'projectId': project_id, - 'datasetId': dataset_id - } - } - - try: - dataset_req = biq_query.datasets().insert(projectId=project_id, body=body) - dataset_req.execute(num_retries=NUM_RETRIES) - except HttpError as http_error: - if http_error.resp.status == 409: - print 'Warning: The dataset %s already exists' % dataset_id - else: - # Note: For more debugging info, print "http_error.content" - print 'Error in creating dataset: %s. Err: %s' % (dataset_id, http_error) - is_success = False - return is_success + is_success = True + body = { + 'datasetReference': { + 'projectId': project_id, + 'datasetId': dataset_id + } + } + + try: + dataset_req = biq_query.datasets().insert( + projectId=project_id, body=body) + dataset_req.execute(num_retries=NUM_RETRIES) + except HttpError as http_error: + if http_error.resp.status == 409: + print 'Warning: The dataset %s already exists' % dataset_id + else: + # Note: For more debugging info, print "http_error.content" + print 'Error in creating dataset: %s. Err: %s' % (dataset_id, + http_error) + is_success = False + return is_success def create_table(big_query, project_id, dataset_id, table_id, table_schema, description): - fields = [{'name': field_name, - 'type': field_type, - 'description': field_description - } for (field_name, field_type, field_description) in table_schema] - return create_table2(big_query, project_id, dataset_id, table_id, - fields, description) - - -def create_partitioned_table(big_query, project_id, dataset_id, table_id, table_schema, - description, partition_type='DAY', expiration_ms=_EXPIRATION_MS): - """Creates a partitioned table. By default, a date-paritioned table is created with + fields = [{ + 'name': field_name, + 'type': field_type, + 'description': field_description + } for (field_name, field_type, field_description) in table_schema] + return create_table2(big_query, project_id, dataset_id, table_id, fields, + description) + + +def create_partitioned_table(big_query, + project_id, + dataset_id, + table_id, + table_schema, + description, + partition_type='DAY', + expiration_ms=_EXPIRATION_MS): + """Creates a partitioned table. By default, a date-paritioned table is created with each partition lasting 30 days after it was last modified. """ - fields = [{'name': field_name, - 'type': field_type, - 'description': field_description - } for (field_name, field_type, field_description) in table_schema] - return create_table2(big_query, project_id, dataset_id, table_id, - fields, description, partition_type, expiration_ms) - - -def create_table2(big_query, project_id, dataset_id, table_id, fields_schema, - description, partition_type=None, expiration_ms=None): - is_success = True - - body = { - 'description': description, - 'schema': { - 'fields': fields_schema - }, - 'tableReference': { - 'datasetId': dataset_id, - 'projectId': project_id, - 'tableId': table_id - } - } - - if partition_type and expiration_ms: - body["timePartitioning"] = { - "type": partition_type, - "expirationMs": expiration_ms + fields = [{ + 'name': field_name, + 'type': field_type, + 'description': field_description + } for (field_name, field_type, field_description) in table_schema] + return create_table2(big_query, project_id, dataset_id, table_id, fields, + description, partition_type, expiration_ms) + + +def create_table2(big_query, + project_id, + dataset_id, + table_id, + fields_schema, + description, + partition_type=None, + expiration_ms=None): + is_success = True + + body = { + 'description': description, + 'schema': { + 'fields': fields_schema + }, + 'tableReference': { + 'datasetId': dataset_id, + 'projectId': project_id, + 'tableId': table_id + } } - try: - table_req = big_query.tables().insert(projectId=project_id, - datasetId=dataset_id, - body=body) - res = table_req.execute(num_retries=NUM_RETRIES) - print 'Successfully created %s "%s"' % (res['kind'], res['id']) - except HttpError as http_error: - if http_error.resp.status == 409: - print 'Warning: Table %s already exists' % table_id - else: - print 'Error in creating table: %s. Err: %s' % (table_id, http_error) - is_success = False - return is_success + if partition_type and expiration_ms: + body["timePartitioning"] = { + "type": partition_type, + "expirationMs": expiration_ms + } + + try: + table_req = big_query.tables().insert( + projectId=project_id, datasetId=dataset_id, body=body) + res = table_req.execute(num_retries=NUM_RETRIES) + print 'Successfully created %s "%s"' % (res['kind'], res['id']) + except HttpError as http_error: + if http_error.resp.status == 409: + print 'Warning: Table %s already exists' % table_id + else: + print 'Error in creating table: %s. Err: %s' % (table_id, + http_error) + is_success = False + return is_success def patch_table(big_query, project_id, dataset_id, table_id, fields_schema): - is_success = True - - body = { - 'schema': { - 'fields': fields_schema - }, - 'tableReference': { - 'datasetId': dataset_id, - 'projectId': project_id, - 'tableId': table_id - } - } - - try: - table_req = big_query.tables().patch(projectId=project_id, - datasetId=dataset_id, - tableId=table_id, - body=body) - res = table_req.execute(num_retries=NUM_RETRIES) - print 'Successfully patched %s "%s"' % (res['kind'], res['id']) - except HttpError as http_error: - print 'Error in creating table: %s. Err: %s' % (table_id, http_error) - is_success = False - return is_success + is_success = True + + body = { + 'schema': { + 'fields': fields_schema + }, + 'tableReference': { + 'datasetId': dataset_id, + 'projectId': project_id, + 'tableId': table_id + } + } + + try: + table_req = big_query.tables().patch( + projectId=project_id, + datasetId=dataset_id, + tableId=table_id, + body=body) + res = table_req.execute(num_retries=NUM_RETRIES) + print 'Successfully patched %s "%s"' % (res['kind'], res['id']) + except HttpError as http_error: + print 'Error in creating table: %s. Err: %s' % (table_id, http_error) + is_success = False + return is_success def insert_rows(big_query, project_id, dataset_id, table_id, rows_list): - is_success = True - body = {'rows': rows_list} - try: - insert_req = big_query.tabledata().insertAll(projectId=project_id, - datasetId=dataset_id, - tableId=table_id, - body=body) - res = insert_req.execute(num_retries=NUM_RETRIES) - if res.get('insertErrors', None): - print 'Error inserting rows! Response: %s' % res - is_success = False - except HttpError as http_error: - print 'Error inserting rows to the table %s' % table_id - is_success = False - - return is_success + is_success = True + body = {'rows': rows_list} + try: + insert_req = big_query.tabledata().insertAll( + projectId=project_id, + datasetId=dataset_id, + tableId=table_id, + body=body) + res = insert_req.execute(num_retries=NUM_RETRIES) + if res.get('insertErrors', None): + print 'Error inserting rows! Response: %s' % res + is_success = False + except HttpError as http_error: + print 'Error inserting rows to the table %s' % table_id + is_success = False + + return is_success def sync_query_job(big_query, project_id, query, timeout=5000): - query_data = {'query': query, 'timeoutMs': timeout} - query_job = None - try: - query_job = big_query.jobs().query( - projectId=project_id, - body=query_data).execute(num_retries=NUM_RETRIES) - except HttpError as http_error: - print 'Query execute job failed with error: %s' % http_error - print http_error.content - return query_job - - # List of (column name, column type, description) tuples + query_data = {'query': query, 'timeoutMs': timeout} + query_job = None + try: + query_job = big_query.jobs().query( + projectId=project_id, + body=query_data).execute(num_retries=NUM_RETRIES) + except HttpError as http_error: + print 'Query execute job failed with error: %s' % http_error + print http_error.content + return query_job + + + # List of (column name, column type, description) tuples def make_row(unique_row_id, row_values_dict): - """row_values_dict is a dictionary of column name and column value. + """row_values_dict is a dictionary of column name and column value. """ - return {'insertId': unique_row_id, 'json': row_values_dict} + return {'insertId': unique_row_id, 'json': row_values_dict} |