diff options
Diffstat (limited to 'tools/gcp')
-rwxr-xr-x | tools/gcp/stress_test/run_client.py | 187 | ||||
-rwxr-xr-x | tools/gcp/stress_test/run_server.py | 120 | ||||
-rwxr-xr-x | tools/gcp/stress_test/stress_test_utils.py | 197 | ||||
-rwxr-xr-x | tools/gcp/utils/big_query_utils.py | 140 | ||||
-rwxr-xr-x | tools/gcp/utils/kubernetes_api.py | 269 |
5 files changed, 913 insertions, 0 deletions
diff --git a/tools/gcp/stress_test/run_client.py b/tools/gcp/stress_test/run_client.py new file mode 100755 index 0000000000..0fa1bf1cb9 --- /dev/null +++ b/tools/gcp/stress_test/run_client.py @@ -0,0 +1,187 @@ +#!/usr/bin/env python2.7 +# Copyright 2015-2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import datetime +import os +import re +import select +import subprocess +import sys +import time + +from stress_test_utils import EventType +from stress_test_utils import BigQueryHelper + + +# TODO (sree): Write a python grpc client to directly query the metrics instead +# of calling metrics_client +def _get_qps(metrics_cmd): + qps = 0 + try: + # Note: gpr_log() writes even non-error messages to stderr stream. So it is + # important that we set stderr=subprocess.STDOUT + p = subprocess.Popen(args=metrics_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + retcode = p.wait() + (out_str, err_str) = p.communicate() + if retcode != 0: + print 'Error in reading metrics information' + print 'Output: ', out_str + else: + # The overall qps is printed at the end of the line + m = re.search('\d+$', out_str) + qps = int(m.group()) if m else 0 + except Exception as ex: + print 'Exception while reading metrics information: ' + str(ex) + return qps + + +def run_client(): + """This is a wrapper around the stress test client and performs the following: + 1) Create the following two tables in Big Query: + (i) Summary table: To record events like the test started, completed + successfully or failed + (ii) Qps table: To periodically record the QPS sent by this client + 2) Start the stress test client and add a row in the Big Query summary + table + 3) Once every few seconds (as specificed by the poll_interval_secs) poll + the status of the stress test client process and perform the + following: + 3.1) If the process is still running, get the current qps by invoking + the metrics client program and add a row in the Big Query + Qps table. Sleep for a duration specified by poll_interval_secs + 3.2) If the process exited successfully, add a row in the Big Query + Summary table and exit + 3.3) If the process failed, add a row in Big Query summary table and + wait forever. + NOTE: This script typically runs inside a GKE pod which means + that the pod gets destroyed when the script exits. However, in + case the stress test client fails, we would not want the pod to + be destroyed (since we might want to connect to the pod for + examining logs). This is the reason why the script waits forever + in case of failures + """ + env = dict(os.environ) + image_type = env['STRESS_TEST_IMAGE_TYPE'] + image_name = env['STRESS_TEST_IMAGE'] + args_str = env['STRESS_TEST_ARGS_STR'] + metrics_client_image = env['METRICS_CLIENT_IMAGE'] + metrics_client_args_str = env['METRICS_CLIENT_ARGS_STR'] + run_id = env['RUN_ID'] + pod_name = env['POD_NAME'] + logfile_name = env.get('LOGFILE_NAME') + poll_interval_secs = float(env['POLL_INTERVAL_SECS']) + project_id = env['GCP_PROJECT_ID'] + dataset_id = env['DATASET_ID'] + summary_table_id = env['SUMMARY_TABLE_ID'] + qps_table_id = env['QPS_TABLE_ID'] + + bq_helper = BigQueryHelper(run_id, image_type, pod_name, project_id, + dataset_id, summary_table_id, qps_table_id) + bq_helper.initialize() + + # Create BigQuery Dataset and Tables: Summary Table and Metrics Table + if not bq_helper.setup_tables(): + print 'Error in creating BigQuery tables' + return + + start_time = datetime.datetime.now() + + logfile = None + details = 'Logging to stdout' + if logfile_name is not None: + print 'Opening logfile: %s ...' % logfile_name + details = 'Logfile: %s' % logfile_name + logfile = open(logfile_name, 'w') + + # Update status that the test is starting (in the status table) + bq_helper.insert_summary_row(EventType.STARTING, details) + + metrics_cmd = [metrics_client_image + ] + [x for x in metrics_client_args_str.split()] + stress_cmd = [image_name] + [x for x in args_str.split()] + + print 'Launching process %s ...' % stress_cmd + stress_p = subprocess.Popen(args=stress_cmd, + stdout=logfile, + stderr=subprocess.STDOUT) + + qps_history = [1, 1, 1] # Maintain the last 3 qps readings + qps_history_idx = 0 # Index into the qps_history list + + is_error = False + while True: + # Check if stress_client is still running. If so, collect metrics and upload + # to BigQuery status table + if stress_p.poll() is not None: + end_time = datetime.datetime.now().isoformat() + event_type = EventType.SUCCESS + details = 'End time: %s' % end_time + if stress_p.returncode != 0: + event_type = EventType.FAILURE + details = 'Return code = %d. End time: %s' % (stress_p.returncode, + end_time) + is_error = True + bq_helper.insert_summary_row(event_type, details) + print details + break + + # Stress client still running. Get metrics + qps = _get_qps(metrics_cmd) + qps_recorded_at = datetime.datetime.now().isoformat() + print 'qps: %d at %s' % (qps, qps_recorded_at) + + # If QPS has been zero for the last 3 iterations, flag it as error and exit + qps_history[qps_history_idx] = qps + qps_history_idx = (qps_history_idx + 1) % len(qps_history) + if sum(qps_history) == 0: + details = 'QPS has been zero for the last %d seconds - as of : %s' % ( + poll_interval_secs * 3, qps_recorded_at) + is_error = True + bq_helper.insert_summary_row(EventType.FAILURE, details) + print details + break + + # Upload qps metrics to BiqQuery + bq_helper.insert_qps_row(qps, qps_recorded_at) + + time.sleep(poll_interval_secs) + + if is_error: + print 'Waiting indefinitely..' + select.select([], [], []) + + print 'Completed' + return + + +if __name__ == '__main__': + run_client() diff --git a/tools/gcp/stress_test/run_server.py b/tools/gcp/stress_test/run_server.py new file mode 100755 index 0000000000..64322f6100 --- /dev/null +++ b/tools/gcp/stress_test/run_server.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python2.7 +# Copyright 2015-2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import datetime +import os +import select +import subprocess +import sys +import time + +from stress_test_utils import BigQueryHelper +from stress_test_utils import EventType + + +def run_server(): + """This is a wrapper around the interop server and performs the following: + 1) Create a 'Summary table' in Big Query to record events like the server + started, completed successfully or failed. NOTE: This also creates + another table called the QPS table which is currently NOT needed on the + server (it is needed on the stress test clients) + 2) Start the server process and add a row in Big Query summary table + 3) Wait for the server process to terminate. The server process does not + terminate unless there is an error. + If the server process terminated with a failure, add a row in Big Query + and wait forever. + NOTE: This script typically runs inside a GKE pod which means that the + pod gets destroyed when the script exits. However, in case the server + process fails, we would not want the pod to be destroyed (since we + might want to connect to the pod for examining logs). This is the + reason why the script waits forever in case of failures. + """ + + # Read the parameters from environment variables + env = dict(os.environ) + + run_id = env['RUN_ID'] # The unique run id for this test + image_type = env['STRESS_TEST_IMAGE_TYPE'] + image_name = env['STRESS_TEST_IMAGE'] + args_str = env['STRESS_TEST_ARGS_STR'] + pod_name = env['POD_NAME'] + project_id = env['GCP_PROJECT_ID'] + dataset_id = env['DATASET_ID'] + summary_table_id = env['SUMMARY_TABLE_ID'] + qps_table_id = env['QPS_TABLE_ID'] + + logfile_name = env.get('LOGFILE_NAME') + + print('pod_name: %s, project_id: %s, run_id: %s, dataset_id: %s, ' + 'summary_table_id: %s, qps_table_id: %s') % ( + pod_name, project_id, run_id, dataset_id, summary_table_id, + qps_table_id) + + bq_helper = BigQueryHelper(run_id, image_type, pod_name, project_id, + dataset_id, summary_table_id, qps_table_id) + bq_helper.initialize() + + # Create BigQuery Dataset and Tables: Summary Table and Metrics Table + if not bq_helper.setup_tables(): + print 'Error in creating BigQuery tables' + return + + start_time = datetime.datetime.now() + + logfile = None + details = 'Logging to stdout' + if logfile_name is not None: + print 'Opening log file: ', logfile_name + logfile = open(logfile_name, 'w') + details = 'Logfile: %s' % logfile_name + + # Update status that the test is starting (in the status table) + bq_helper.insert_summary_row(EventType.STARTING, details) + + stress_cmd = [image_name] + [x for x in args_str.split()] + + print 'Launching process %s ...' % stress_cmd + stress_p = subprocess.Popen(args=stress_cmd, + stdout=logfile, + stderr=subprocess.STDOUT) + + returncode = stress_p.wait() + if returncode != 0: + end_time = datetime.datetime.now().isoformat() + event_type = EventType.FAILURE + details = 'Returncode: %d; End time: %s' % (returncode, end_time) + bq_helper.insert_summary_row(event_type, details) + print 'Waiting indefinitely..' + select.select([], [], []) + return returncode + + +if __name__ == '__main__': + run_server() diff --git a/tools/gcp/stress_test/stress_test_utils.py b/tools/gcp/stress_test/stress_test_utils.py new file mode 100755 index 0000000000..c4b437e345 --- /dev/null +++ b/tools/gcp/stress_test/stress_test_utils.py @@ -0,0 +1,197 @@ +#!/usr/bin/env python2.7 +# Copyright 2015-2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import datetime +import json +import os +import re +import select +import subprocess +import sys +import time + +# Import big_query_utils module +bq_utils_dir = os.path.abspath(os.path.join( + os.path.dirname(__file__), '../utils')) +sys.path.append(bq_utils_dir) +import big_query_utils as bq_utils + + +class EventType: + STARTING = 'STARTING' + SUCCESS = 'SUCCESS' + FAILURE = 'FAILURE' + + +class BigQueryHelper: + """Helper class for the stress test wrappers to interact with BigQuery. + """ + + def __init__(self, run_id, image_type, pod_name, project_id, dataset_id, + summary_table_id, qps_table_id): + self.run_id = run_id + self.image_type = image_type + self.pod_name = pod_name + self.project_id = project_id + self.dataset_id = dataset_id + self.summary_table_id = summary_table_id + self.qps_table_id = qps_table_id + + def initialize(self): + self.bq = bq_utils.create_big_query() + + def setup_tables(self): + return bq_utils.create_dataset(self.bq, self.project_id, self.dataset_id) \ + and self.__create_summary_table() \ + and self.__create_qps_table() + + def insert_summary_row(self, event_type, details): + row_values_dict = { + 'run_id': self.run_id, + 'image_type': self.image_type, + 'pod_name': self.pod_name, + 'event_date': datetime.datetime.now().isoformat(), + 'event_type': event_type, + 'details': details + } + # row_unique_id is something that uniquely identifies the row (BigQuery uses + # it for duplicate detection). + row_unique_id = '%s_%s_%s' % (self.run_id, self.pod_name, event_type) + row = bq_utils.make_row(row_unique_id, row_values_dict) + return bq_utils.insert_rows(self.bq, self.project_id, self.dataset_id, + self.summary_table_id, [row]) + + def insert_qps_row(self, qps, recorded_at): + row_values_dict = { + 'run_id': self.run_id, + 'pod_name': self.pod_name, + 'recorded_at': recorded_at, + 'qps': qps + } + + # row_unique_id is something that uniquely identifies the row (BigQuery uses + # it for duplicate detection). + row_unique_id = '%s_%s_%s' % (self.run_id, self.pod_name, recorded_at) + row = bq_utils.make_row(row_unique_id, row_values_dict) + return bq_utils.insert_rows(self.bq, self.project_id, self.dataset_id, + self.qps_table_id, [row]) + + def check_if_any_tests_failed(self, num_query_retries=3): + query = ('SELECT event_type FROM %s.%s WHERE run_id = \'%s\' AND ' + 'event_type="%s"') % (self.dataset_id, self.summary_table_id, + self.run_id, EventType.FAILURE) + query_job = bq_utils.sync_query_job(self.bq, self.project_id, query) + page = self.bq.jobs().getQueryResults(**query_job['jobReference']).execute( + num_retries=num_query_retries) + num_failures = int(page['totalRows']) + print 'num rows: ', num_failures + return num_failures > 0 + + def print_summary_records(self, num_query_retries=3): + line = '-' * 120 + print line + print 'Summary records' + print 'Run Id: ', self.run_id + print 'Dataset Id: ', self.dataset_id + print line + query = ('SELECT pod_name, image_type, event_type, event_date, details' + ' FROM %s.%s WHERE run_id = \'%s\' ORDER by event_date;') % ( + self.dataset_id, self.summary_table_id, self.run_id) + query_job = bq_utils.sync_query_job(self.bq, self.project_id, query) + + print '{:<25} {:<12} {:<12} {:<30} {}'.format( + 'Pod name', 'Image type', 'Event type', 'Date', 'Details') + print line + page_token = None + while True: + page = self.bq.jobs().getQueryResults( + pageToken=page_token, + **query_job['jobReference']).execute(num_retries=num_query_retries) + rows = page.get('rows', []) + for row in rows: + print '{:<25} {:<12} {:<12} {:<30} {}'.format( + row['f'][0]['v'], row['f'][1]['v'], row['f'][2]['v'], + row['f'][3]['v'], row['f'][4]['v']) + page_token = page.get('pageToken') + if not page_token: + break + + def print_qps_records(self, num_query_retries=3): + line = '-' * 80 + print line + print 'QPS Summary' + print 'Run Id: ', self.run_id + print 'Dataset Id: ', self.dataset_id + print line + query = ( + 'SELECT pod_name, recorded_at, qps FROM %s.%s WHERE run_id = \'%s\' ' + 'ORDER by recorded_at;') % (self.dataset_id, self.qps_table_id, + self.run_id) + query_job = bq_utils.sync_query_job(self.bq, self.project_id, query) + print '{:<25} {:30} {}'.format('Pod name', 'Recorded at', 'Qps') + print line + page_token = None + while True: + page = self.bq.jobs().getQueryResults( + pageToken=page_token, + **query_job['jobReference']).execute(num_retries=num_query_retries) + rows = page.get('rows', []) + for row in rows: + print '{:<25} {:30} {}'.format(row['f'][0]['v'], row['f'][1]['v'], + row['f'][2]['v']) + page_token = page.get('pageToken') + if not page_token: + break + + def __create_summary_table(self): + summary_table_schema = [ + ('run_id', 'STRING', 'Test run id'), + ('image_type', 'STRING', 'Client or Server?'), + ('pod_name', 'STRING', 'GKE pod hosting this image'), + ('event_date', 'STRING', 'The date of this event'), + ('event_type', 'STRING', 'STARTED/SUCCESS/FAILURE'), + ('details', 'STRING', 'Any other relevant details') + ] + desc = ('The table that contains START/SUCCESS/FAILURE events for ' + ' the stress test clients and servers') + return bq_utils.create_table(self.bq, self.project_id, self.dataset_id, + self.summary_table_id, summary_table_schema, + desc) + + def __create_qps_table(self): + qps_table_schema = [ + ('run_id', 'STRING', 'Test run id'), + ('pod_name', 'STRING', 'GKE pod hosting this image'), + ('recorded_at', 'STRING', 'Metrics recorded at time'), + ('qps', 'INTEGER', 'Queries per second') + ] + desc = 'The table that cointains the qps recorded at various intervals' + return bq_utils.create_table(self.bq, self.project_id, self.dataset_id, + self.qps_table_id, qps_table_schema, desc) diff --git a/tools/gcp/utils/big_query_utils.py b/tools/gcp/utils/big_query_utils.py new file mode 100755 index 0000000000..e2379fd1aa --- /dev/null +++ b/tools/gcp/utils/big_query_utils.py @@ -0,0 +1,140 @@ +#!/usr/bin/env python2.7 +# Copyright 2015-2016 Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import argparse +import json +import uuid +import httplib2 + +from apiclient import discovery +from apiclient.errors import HttpError +from oauth2client.client import GoogleCredentials + +NUM_RETRIES = 3 + + +def create_big_query(): + """Authenticates with cloud platform and gets a BiqQuery service object + """ + creds = GoogleCredentials.get_application_default() + return discovery.build('bigquery', 'v2', credentials=creds) + + +def create_dataset(biq_query, project_id, dataset_id): + is_success = True + body = { + 'datasetReference': { + 'projectId': project_id, + 'datasetId': dataset_id + } + } + + try: + dataset_req = biq_query.datasets().insert(projectId=project_id, body=body) + dataset_req.execute(num_retries=NUM_RETRIES) + except HttpError as http_error: + if http_error.resp.status == 409: + print 'Warning: The dataset %s already exists' % dataset_id + else: + # Note: For more debugging info, print "http_error.content" + print 'Error in creating dataset: %s. Err: %s' % (dataset_id, http_error) + is_success = False + return is_success + + +def create_table(big_query, project_id, dataset_id, table_id, table_schema, + description): + is_success = True + + body = { + 'description': description, + 'schema': { + 'fields': [{ + 'name': field_name, + 'type': field_type, + 'description': field_description + } for (field_name, field_type, field_description) in table_schema] + }, + 'tableReference': { + 'datasetId': dataset_id, + 'projectId': project_id, + 'tableId': table_id + } + } + + try: + table_req = big_query.tables().insert(projectId=project_id, + datasetId=dataset_id, + body=body) + res = table_req.execute(num_retries=NUM_RETRIES) + print 'Successfully created %s "%s"' % (res['kind'], res['id']) + except HttpError as http_error: + if http_error.resp.status == 409: + print 'Warning: Table %s already exists' % table_id + else: + print 'Error in creating table: %s. Err: %s' % (table_id, http_error) + is_success = False + return is_success + + +def insert_rows(big_query, project_id, dataset_id, table_id, rows_list): + is_success = True + body = {'rows': rows_list} + try: + insert_req = big_query.tabledata().insertAll(projectId=project_id, + datasetId=dataset_id, + tableId=table_id, + body=body) + print body + res = insert_req.execute(num_retries=NUM_RETRIES) + print res + except HttpError as http_error: + print 'Error in inserting rows in the table %s' % table_id + is_success = False + return is_success + + +def sync_query_job(big_query, project_id, query, timeout=5000): + query_data = {'query': query, 'timeoutMs': timeout} + query_job = None + try: + query_job = big_query.jobs().query( + projectId=project_id, + body=query_data).execute(num_retries=NUM_RETRIES) + except HttpError as http_error: + print 'Query execute job failed with error: %s' % http_error + print http_error.content + return query_job + + # List of (column name, column type, description) tuples +def make_row(unique_row_id, row_values_dict): + """row_values_dict is a dictionary of column name and column value. + """ + return {'insertId': unique_row_id, 'json': row_values_dict} diff --git a/tools/gcp/utils/kubernetes_api.py b/tools/gcp/utils/kubernetes_api.py new file mode 100755 index 0000000000..2d3f771e93 --- /dev/null +++ b/tools/gcp/utils/kubernetes_api.py @@ -0,0 +1,269 @@ +#!/usr/bin/env python2.7 +# Copyright 2015-2016 Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import requests +import json + +_REQUEST_TIMEOUT_SECS = 10 + + +def _make_pod_config(pod_name, image_name, container_port_list, cmd_list, + arg_list, env_dict): + """Creates a string containing the Pod defintion as required by the Kubernetes API""" + body = { + 'kind': 'Pod', + 'apiVersion': 'v1', + 'metadata': { + 'name': pod_name, + 'labels': {'name': pod_name} + }, + 'spec': { + 'containers': [ + { + 'name': pod_name, + 'image': image_name, + 'ports': [{'containerPort': port, + 'protocol': 'TCP'} + for port in container_port_list], + 'imagePullPolicy': 'Always' + } + ] + } + } + + env_list = [{'name': k, 'value': v} for (k, v) in env_dict.iteritems()] + if len(env_list) > 0: + body['spec']['containers'][0]['env'] = env_list + + # Add the 'Command' and 'Args' attributes if they are passed. + # Note: + # - 'Command' overrides the ENTRYPOINT in the Docker Image + # - 'Args' override the CMD in Docker image (yes, it is confusing!) + if len(cmd_list) > 0: + body['spec']['containers'][0]['command'] = cmd_list + if len(arg_list) > 0: + body['spec']['containers'][0]['args'] = arg_list + return json.dumps(body) + + +def _make_service_config(service_name, pod_name, service_port_list, + container_port_list, is_headless): + """Creates a string containing the Service definition as required by the Kubernetes API. + + NOTE: + This creates either a Headless Service or 'LoadBalancer' service depending on + the is_headless parameter. For Headless services, there is no 'type' attribute + and the 'clusterIP' attribute is set to 'None'. Also, if the service is + Headless, Kubernetes creates DNS entries for Pods - i.e creates DNS A-records + mapping the service's name to the Pods' IPs + """ + if len(container_port_list) != len(service_port_list): + print( + 'ERROR: container_port_list and service_port_list must be of same size') + return '' + body = { + 'kind': 'Service', + 'apiVersion': 'v1', + 'metadata': { + 'name': service_name, + 'labels': { + 'name': service_name + } + }, + 'spec': { + 'ports': [], + 'selector': { + 'name': pod_name + } + } + } + # Populate the 'ports' list in the 'spec' section. This maps service ports + # (port numbers that are exposed by Kubernetes) to container ports (i.e port + # numbers that are exposed by your Docker image) + for idx in range(len(container_port_list)): + port_entry = { + 'port': service_port_list[idx], + 'targetPort': container_port_list[idx], + 'protocol': 'TCP' + } + body['spec']['ports'].append(port_entry) + + # Make this either a LoadBalancer service or a headless service depending on + # the is_headless parameter + if is_headless: + body['spec']['clusterIP'] = 'None' + else: + body['spec']['type'] = 'LoadBalancer' + return json.dumps(body) + + +def _print_connection_error(msg): + print('ERROR: Connection failed. Did you remember to run Kubenetes proxy on ' + 'localhost (i.e kubectl proxy --port=<proxy_port>) ?. Error: %s' % msg) + + +def _do_post(post_url, api_name, request_body): + """Helper to do HTTP POST. + + Note: + 1) On success, Kubernetes returns a success code of 201(CREATED) not 200(OK) + 2) A response code of 509(CONFLICT) is interpreted as a success code (since + the error is most likely due to the resource already existing). This makes + _do_post() idempotent which is semantically desirable. + """ + is_success = True + try: + r = requests.post(post_url, + data=request_body, + timeout=_REQUEST_TIMEOUT_SECS) + if r.status_code == requests.codes.conflict: + print('WARN: Looks like the resource already exists. Api: %s, url: %s' % + (api_name, post_url)) + elif r.status_code != requests.codes.created: + print('ERROR: %s API returned error. HTTP response: (%d) %s' % + (api_name, r.status_code, r.text)) + is_success = False + except (requests.exceptions.Timeout, + requests.exceptions.ConnectionError) as e: + is_success = False + _print_connection_error(str(e)) + return is_success + + +def _do_delete(del_url, api_name): + """Helper to do HTTP DELETE. + + Note: A response code of 404(NOT_FOUND) is treated as success to keep + _do_delete() idempotent. + """ + is_success = True + try: + r = requests.delete(del_url, timeout=_REQUEST_TIMEOUT_SECS) + if r.status_code == requests.codes.not_found: + print('WARN: The resource does not exist. Api: %s, url: %s' % + (api_name, del_url)) + elif r.status_code != requests.codes.ok: + print('ERROR: %s API returned error. HTTP response: %s' % + (api_name, r.text)) + is_success = False + except (requests.exceptions.Timeout, + requests.exceptions.ConnectionError) as e: + is_success = False + _print_connection_error(str(e)) + return is_success + + +def create_service(kube_host, kube_port, namespace, service_name, pod_name, + service_port_list, container_port_list, is_headless): + """Creates either a Headless Service or a LoadBalancer Service depending + on the is_headless parameter. + """ + post_url = 'http://%s:%d/api/v1/namespaces/%s/services' % ( + kube_host, kube_port, namespace) + request_body = _make_service_config(service_name, pod_name, service_port_list, + container_port_list, is_headless) + return _do_post(post_url, 'Create Service', request_body) + + +def create_pod(kube_host, kube_port, namespace, pod_name, image_name, + container_port_list, cmd_list, arg_list, env_dict): + """Creates a Kubernetes Pod. + + Note that it is generally NOT considered a good practice to directly create + Pods. Typically, the recommendation is to create 'Controllers' to create and + manage Pods' lifecycle. Currently Kubernetes only supports 'Replication + Controller' which creates a configurable number of 'identical Replicas' of + Pods and automatically restarts any Pods in case of failures (for eg: Machine + failures in Kubernetes). This makes it less flexible for our test use cases + where we might want slightly different set of args to each Pod. Hence we + directly create Pods and not care much about Kubernetes failures since those + are very rare. + """ + post_url = 'http://%s:%d/api/v1/namespaces/%s/pods' % (kube_host, kube_port, + namespace) + request_body = _make_pod_config(pod_name, image_name, container_port_list, + cmd_list, arg_list, env_dict) + return _do_post(post_url, 'Create Pod', request_body) + + +def delete_service(kube_host, kube_port, namespace, service_name): + del_url = 'http://%s:%d/api/v1/namespaces/%s/services/%s' % ( + kube_host, kube_port, namespace, service_name) + return _do_delete(del_url, 'Delete Service') + + +def delete_pod(kube_host, kube_port, namespace, pod_name): + del_url = 'http://%s:%d/api/v1/namespaces/%s/pods/%s' % (kube_host, kube_port, + namespace, pod_name) + return _do_delete(del_url, 'Delete Pod') + + +def create_pod_and_service(kube_host, kube_port, namespace, pod_name, + image_name, container_port_list, cmd_list, arg_list, + env_dict, is_headless_service): + """A helper function that creates a pod and a service (if pod creation was successful).""" + is_success = create_pod(kube_host, kube_port, namespace, pod_name, image_name, + container_port_list, cmd_list, arg_list, env_dict) + if not is_success: + print 'Error in creating Pod' + return False + + is_success = create_service( + kube_host, + kube_port, + namespace, + pod_name, # Use pod_name for service + pod_name, + container_port_list, # Service port list same as container port list + container_port_list, + is_headless_service) + if not is_success: + print 'Error in creating Service' + return False + + print 'Successfully created the pod/service %s' % pod_name + return True + + +def delete_pod_and_service(kube_host, kube_port, namespace, pod_name): + """ A helper function that calls delete_pod and delete_service """ + is_success = delete_pod(kube_host, kube_port, namespace, pod_name) + if not is_success: + print 'Error in deleting pod %s' % pod_name + return False + + # Note: service name assumed to the the same as pod name + is_success = delete_service(kube_host, kube_port, namespace, pod_name) + if not is_success: + print 'Error in deleting service %s' % pod_name + return False + + print 'Successfully deleted the Pod/Service: %s' % pod_name + return True |