aboutsummaryrefslogtreecommitdiffhomepage
path: root/tools/gcp
diff options
context:
space:
mode:
Diffstat (limited to 'tools/gcp')
-rwxr-xr-xtools/gcp/stress_test/run_client.py187
-rwxr-xr-xtools/gcp/stress_test/run_server.py120
-rwxr-xr-xtools/gcp/stress_test/stress_test_utils.py197
-rwxr-xr-xtools/gcp/utils/big_query_utils.py140
-rwxr-xr-xtools/gcp/utils/kubernetes_api.py269
5 files changed, 913 insertions, 0 deletions
diff --git a/tools/gcp/stress_test/run_client.py b/tools/gcp/stress_test/run_client.py
new file mode 100755
index 0000000000..0fa1bf1cb9
--- /dev/null
+++ b/tools/gcp/stress_test/run_client.py
@@ -0,0 +1,187 @@
+#!/usr/bin/env python2.7
+# Copyright 2015-2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import datetime
+import os
+import re
+import select
+import subprocess
+import sys
+import time
+
+from stress_test_utils import EventType
+from stress_test_utils import BigQueryHelper
+
+
+# TODO (sree): Write a python grpc client to directly query the metrics instead
+# of calling metrics_client
+def _get_qps(metrics_cmd):
+ qps = 0
+ try:
+ # Note: gpr_log() writes even non-error messages to stderr stream. So it is
+ # important that we set stderr=subprocess.STDOUT
+ p = subprocess.Popen(args=metrics_cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+ retcode = p.wait()
+ (out_str, err_str) = p.communicate()
+ if retcode != 0:
+ print 'Error in reading metrics information'
+ print 'Output: ', out_str
+ else:
+ # The overall qps is printed at the end of the line
+ m = re.search('\d+$', out_str)
+ qps = int(m.group()) if m else 0
+ except Exception as ex:
+ print 'Exception while reading metrics information: ' + str(ex)
+ return qps
+
+
+def run_client():
+ """This is a wrapper around the stress test client and performs the following:
+ 1) Create the following two tables in Big Query:
+ (i) Summary table: To record events like the test started, completed
+ successfully or failed
+ (ii) Qps table: To periodically record the QPS sent by this client
+ 2) Start the stress test client and add a row in the Big Query summary
+ table
+ 3) Once every few seconds (as specificed by the poll_interval_secs) poll
+ the status of the stress test client process and perform the
+ following:
+ 3.1) If the process is still running, get the current qps by invoking
+ the metrics client program and add a row in the Big Query
+ Qps table. Sleep for a duration specified by poll_interval_secs
+ 3.2) If the process exited successfully, add a row in the Big Query
+ Summary table and exit
+ 3.3) If the process failed, add a row in Big Query summary table and
+ wait forever.
+ NOTE: This script typically runs inside a GKE pod which means
+ that the pod gets destroyed when the script exits. However, in
+ case the stress test client fails, we would not want the pod to
+ be destroyed (since we might want to connect to the pod for
+ examining logs). This is the reason why the script waits forever
+ in case of failures
+ """
+ env = dict(os.environ)
+ image_type = env['STRESS_TEST_IMAGE_TYPE']
+ image_name = env['STRESS_TEST_IMAGE']
+ args_str = env['STRESS_TEST_ARGS_STR']
+ metrics_client_image = env['METRICS_CLIENT_IMAGE']
+ metrics_client_args_str = env['METRICS_CLIENT_ARGS_STR']
+ run_id = env['RUN_ID']
+ pod_name = env['POD_NAME']
+ logfile_name = env.get('LOGFILE_NAME')
+ poll_interval_secs = float(env['POLL_INTERVAL_SECS'])
+ project_id = env['GCP_PROJECT_ID']
+ dataset_id = env['DATASET_ID']
+ summary_table_id = env['SUMMARY_TABLE_ID']
+ qps_table_id = env['QPS_TABLE_ID']
+
+ bq_helper = BigQueryHelper(run_id, image_type, pod_name, project_id,
+ dataset_id, summary_table_id, qps_table_id)
+ bq_helper.initialize()
+
+ # Create BigQuery Dataset and Tables: Summary Table and Metrics Table
+ if not bq_helper.setup_tables():
+ print 'Error in creating BigQuery tables'
+ return
+
+ start_time = datetime.datetime.now()
+
+ logfile = None
+ details = 'Logging to stdout'
+ if logfile_name is not None:
+ print 'Opening logfile: %s ...' % logfile_name
+ details = 'Logfile: %s' % logfile_name
+ logfile = open(logfile_name, 'w')
+
+ # Update status that the test is starting (in the status table)
+ bq_helper.insert_summary_row(EventType.STARTING, details)
+
+ metrics_cmd = [metrics_client_image
+ ] + [x for x in metrics_client_args_str.split()]
+ stress_cmd = [image_name] + [x for x in args_str.split()]
+
+ print 'Launching process %s ...' % stress_cmd
+ stress_p = subprocess.Popen(args=stress_cmd,
+ stdout=logfile,
+ stderr=subprocess.STDOUT)
+
+ qps_history = [1, 1, 1] # Maintain the last 3 qps readings
+ qps_history_idx = 0 # Index into the qps_history list
+
+ is_error = False
+ while True:
+ # Check if stress_client is still running. If so, collect metrics and upload
+ # to BigQuery status table
+ if stress_p.poll() is not None:
+ end_time = datetime.datetime.now().isoformat()
+ event_type = EventType.SUCCESS
+ details = 'End time: %s' % end_time
+ if stress_p.returncode != 0:
+ event_type = EventType.FAILURE
+ details = 'Return code = %d. End time: %s' % (stress_p.returncode,
+ end_time)
+ is_error = True
+ bq_helper.insert_summary_row(event_type, details)
+ print details
+ break
+
+ # Stress client still running. Get metrics
+ qps = _get_qps(metrics_cmd)
+ qps_recorded_at = datetime.datetime.now().isoformat()
+ print 'qps: %d at %s' % (qps, qps_recorded_at)
+
+ # If QPS has been zero for the last 3 iterations, flag it as error and exit
+ qps_history[qps_history_idx] = qps
+ qps_history_idx = (qps_history_idx + 1) % len(qps_history)
+ if sum(qps_history) == 0:
+ details = 'QPS has been zero for the last %d seconds - as of : %s' % (
+ poll_interval_secs * 3, qps_recorded_at)
+ is_error = True
+ bq_helper.insert_summary_row(EventType.FAILURE, details)
+ print details
+ break
+
+ # Upload qps metrics to BiqQuery
+ bq_helper.insert_qps_row(qps, qps_recorded_at)
+
+ time.sleep(poll_interval_secs)
+
+ if is_error:
+ print 'Waiting indefinitely..'
+ select.select([], [], [])
+
+ print 'Completed'
+ return
+
+
+if __name__ == '__main__':
+ run_client()
diff --git a/tools/gcp/stress_test/run_server.py b/tools/gcp/stress_test/run_server.py
new file mode 100755
index 0000000000..64322f6100
--- /dev/null
+++ b/tools/gcp/stress_test/run_server.py
@@ -0,0 +1,120 @@
+#!/usr/bin/env python2.7
+# Copyright 2015-2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import datetime
+import os
+import select
+import subprocess
+import sys
+import time
+
+from stress_test_utils import BigQueryHelper
+from stress_test_utils import EventType
+
+
+def run_server():
+ """This is a wrapper around the interop server and performs the following:
+ 1) Create a 'Summary table' in Big Query to record events like the server
+ started, completed successfully or failed. NOTE: This also creates
+ another table called the QPS table which is currently NOT needed on the
+ server (it is needed on the stress test clients)
+ 2) Start the server process and add a row in Big Query summary table
+ 3) Wait for the server process to terminate. The server process does not
+ terminate unless there is an error.
+ If the server process terminated with a failure, add a row in Big Query
+ and wait forever.
+ NOTE: This script typically runs inside a GKE pod which means that the
+ pod gets destroyed when the script exits. However, in case the server
+ process fails, we would not want the pod to be destroyed (since we
+ might want to connect to the pod for examining logs). This is the
+ reason why the script waits forever in case of failures.
+ """
+
+ # Read the parameters from environment variables
+ env = dict(os.environ)
+
+ run_id = env['RUN_ID'] # The unique run id for this test
+ image_type = env['STRESS_TEST_IMAGE_TYPE']
+ image_name = env['STRESS_TEST_IMAGE']
+ args_str = env['STRESS_TEST_ARGS_STR']
+ pod_name = env['POD_NAME']
+ project_id = env['GCP_PROJECT_ID']
+ dataset_id = env['DATASET_ID']
+ summary_table_id = env['SUMMARY_TABLE_ID']
+ qps_table_id = env['QPS_TABLE_ID']
+
+ logfile_name = env.get('LOGFILE_NAME')
+
+ print('pod_name: %s, project_id: %s, run_id: %s, dataset_id: %s, '
+ 'summary_table_id: %s, qps_table_id: %s') % (
+ pod_name, project_id, run_id, dataset_id, summary_table_id,
+ qps_table_id)
+
+ bq_helper = BigQueryHelper(run_id, image_type, pod_name, project_id,
+ dataset_id, summary_table_id, qps_table_id)
+ bq_helper.initialize()
+
+ # Create BigQuery Dataset and Tables: Summary Table and Metrics Table
+ if not bq_helper.setup_tables():
+ print 'Error in creating BigQuery tables'
+ return
+
+ start_time = datetime.datetime.now()
+
+ logfile = None
+ details = 'Logging to stdout'
+ if logfile_name is not None:
+ print 'Opening log file: ', logfile_name
+ logfile = open(logfile_name, 'w')
+ details = 'Logfile: %s' % logfile_name
+
+ # Update status that the test is starting (in the status table)
+ bq_helper.insert_summary_row(EventType.STARTING, details)
+
+ stress_cmd = [image_name] + [x for x in args_str.split()]
+
+ print 'Launching process %s ...' % stress_cmd
+ stress_p = subprocess.Popen(args=stress_cmd,
+ stdout=logfile,
+ stderr=subprocess.STDOUT)
+
+ returncode = stress_p.wait()
+ if returncode != 0:
+ end_time = datetime.datetime.now().isoformat()
+ event_type = EventType.FAILURE
+ details = 'Returncode: %d; End time: %s' % (returncode, end_time)
+ bq_helper.insert_summary_row(event_type, details)
+ print 'Waiting indefinitely..'
+ select.select([], [], [])
+ return returncode
+
+
+if __name__ == '__main__':
+ run_server()
diff --git a/tools/gcp/stress_test/stress_test_utils.py b/tools/gcp/stress_test/stress_test_utils.py
new file mode 100755
index 0000000000..c4b437e345
--- /dev/null
+++ b/tools/gcp/stress_test/stress_test_utils.py
@@ -0,0 +1,197 @@
+#!/usr/bin/env python2.7
+# Copyright 2015-2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import datetime
+import json
+import os
+import re
+import select
+import subprocess
+import sys
+import time
+
+# Import big_query_utils module
+bq_utils_dir = os.path.abspath(os.path.join(
+ os.path.dirname(__file__), '../utils'))
+sys.path.append(bq_utils_dir)
+import big_query_utils as bq_utils
+
+
+class EventType:
+ STARTING = 'STARTING'
+ SUCCESS = 'SUCCESS'
+ FAILURE = 'FAILURE'
+
+
+class BigQueryHelper:
+ """Helper class for the stress test wrappers to interact with BigQuery.
+ """
+
+ def __init__(self, run_id, image_type, pod_name, project_id, dataset_id,
+ summary_table_id, qps_table_id):
+ self.run_id = run_id
+ self.image_type = image_type
+ self.pod_name = pod_name
+ self.project_id = project_id
+ self.dataset_id = dataset_id
+ self.summary_table_id = summary_table_id
+ self.qps_table_id = qps_table_id
+
+ def initialize(self):
+ self.bq = bq_utils.create_big_query()
+
+ def setup_tables(self):
+ return bq_utils.create_dataset(self.bq, self.project_id, self.dataset_id) \
+ and self.__create_summary_table() \
+ and self.__create_qps_table()
+
+ def insert_summary_row(self, event_type, details):
+ row_values_dict = {
+ 'run_id': self.run_id,
+ 'image_type': self.image_type,
+ 'pod_name': self.pod_name,
+ 'event_date': datetime.datetime.now().isoformat(),
+ 'event_type': event_type,
+ 'details': details
+ }
+ # row_unique_id is something that uniquely identifies the row (BigQuery uses
+ # it for duplicate detection).
+ row_unique_id = '%s_%s_%s' % (self.run_id, self.pod_name, event_type)
+ row = bq_utils.make_row(row_unique_id, row_values_dict)
+ return bq_utils.insert_rows(self.bq, self.project_id, self.dataset_id,
+ self.summary_table_id, [row])
+
+ def insert_qps_row(self, qps, recorded_at):
+ row_values_dict = {
+ 'run_id': self.run_id,
+ 'pod_name': self.pod_name,
+ 'recorded_at': recorded_at,
+ 'qps': qps
+ }
+
+ # row_unique_id is something that uniquely identifies the row (BigQuery uses
+ # it for duplicate detection).
+ row_unique_id = '%s_%s_%s' % (self.run_id, self.pod_name, recorded_at)
+ row = bq_utils.make_row(row_unique_id, row_values_dict)
+ return bq_utils.insert_rows(self.bq, self.project_id, self.dataset_id,
+ self.qps_table_id, [row])
+
+ def check_if_any_tests_failed(self, num_query_retries=3):
+ query = ('SELECT event_type FROM %s.%s WHERE run_id = \'%s\' AND '
+ 'event_type="%s"') % (self.dataset_id, self.summary_table_id,
+ self.run_id, EventType.FAILURE)
+ query_job = bq_utils.sync_query_job(self.bq, self.project_id, query)
+ page = self.bq.jobs().getQueryResults(**query_job['jobReference']).execute(
+ num_retries=num_query_retries)
+ num_failures = int(page['totalRows'])
+ print 'num rows: ', num_failures
+ return num_failures > 0
+
+ def print_summary_records(self, num_query_retries=3):
+ line = '-' * 120
+ print line
+ print 'Summary records'
+ print 'Run Id: ', self.run_id
+ print 'Dataset Id: ', self.dataset_id
+ print line
+ query = ('SELECT pod_name, image_type, event_type, event_date, details'
+ ' FROM %s.%s WHERE run_id = \'%s\' ORDER by event_date;') % (
+ self.dataset_id, self.summary_table_id, self.run_id)
+ query_job = bq_utils.sync_query_job(self.bq, self.project_id, query)
+
+ print '{:<25} {:<12} {:<12} {:<30} {}'.format(
+ 'Pod name', 'Image type', 'Event type', 'Date', 'Details')
+ print line
+ page_token = None
+ while True:
+ page = self.bq.jobs().getQueryResults(
+ pageToken=page_token,
+ **query_job['jobReference']).execute(num_retries=num_query_retries)
+ rows = page.get('rows', [])
+ for row in rows:
+ print '{:<25} {:<12} {:<12} {:<30} {}'.format(
+ row['f'][0]['v'], row['f'][1]['v'], row['f'][2]['v'],
+ row['f'][3]['v'], row['f'][4]['v'])
+ page_token = page.get('pageToken')
+ if not page_token:
+ break
+
+ def print_qps_records(self, num_query_retries=3):
+ line = '-' * 80
+ print line
+ print 'QPS Summary'
+ print 'Run Id: ', self.run_id
+ print 'Dataset Id: ', self.dataset_id
+ print line
+ query = (
+ 'SELECT pod_name, recorded_at, qps FROM %s.%s WHERE run_id = \'%s\' '
+ 'ORDER by recorded_at;') % (self.dataset_id, self.qps_table_id,
+ self.run_id)
+ query_job = bq_utils.sync_query_job(self.bq, self.project_id, query)
+ print '{:<25} {:30} {}'.format('Pod name', 'Recorded at', 'Qps')
+ print line
+ page_token = None
+ while True:
+ page = self.bq.jobs().getQueryResults(
+ pageToken=page_token,
+ **query_job['jobReference']).execute(num_retries=num_query_retries)
+ rows = page.get('rows', [])
+ for row in rows:
+ print '{:<25} {:30} {}'.format(row['f'][0]['v'], row['f'][1]['v'],
+ row['f'][2]['v'])
+ page_token = page.get('pageToken')
+ if not page_token:
+ break
+
+ def __create_summary_table(self):
+ summary_table_schema = [
+ ('run_id', 'STRING', 'Test run id'),
+ ('image_type', 'STRING', 'Client or Server?'),
+ ('pod_name', 'STRING', 'GKE pod hosting this image'),
+ ('event_date', 'STRING', 'The date of this event'),
+ ('event_type', 'STRING', 'STARTED/SUCCESS/FAILURE'),
+ ('details', 'STRING', 'Any other relevant details')
+ ]
+ desc = ('The table that contains START/SUCCESS/FAILURE events for '
+ ' the stress test clients and servers')
+ return bq_utils.create_table(self.bq, self.project_id, self.dataset_id,
+ self.summary_table_id, summary_table_schema,
+ desc)
+
+ def __create_qps_table(self):
+ qps_table_schema = [
+ ('run_id', 'STRING', 'Test run id'),
+ ('pod_name', 'STRING', 'GKE pod hosting this image'),
+ ('recorded_at', 'STRING', 'Metrics recorded at time'),
+ ('qps', 'INTEGER', 'Queries per second')
+ ]
+ desc = 'The table that cointains the qps recorded at various intervals'
+ return bq_utils.create_table(self.bq, self.project_id, self.dataset_id,
+ self.qps_table_id, qps_table_schema, desc)
diff --git a/tools/gcp/utils/big_query_utils.py b/tools/gcp/utils/big_query_utils.py
new file mode 100755
index 0000000000..e2379fd1aa
--- /dev/null
+++ b/tools/gcp/utils/big_query_utils.py
@@ -0,0 +1,140 @@
+#!/usr/bin/env python2.7
+# Copyright 2015-2016 Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import argparse
+import json
+import uuid
+import httplib2
+
+from apiclient import discovery
+from apiclient.errors import HttpError
+from oauth2client.client import GoogleCredentials
+
+NUM_RETRIES = 3
+
+
+def create_big_query():
+ """Authenticates with cloud platform and gets a BiqQuery service object
+ """
+ creds = GoogleCredentials.get_application_default()
+ return discovery.build('bigquery', 'v2', credentials=creds)
+
+
+def create_dataset(biq_query, project_id, dataset_id):
+ is_success = True
+ body = {
+ 'datasetReference': {
+ 'projectId': project_id,
+ 'datasetId': dataset_id
+ }
+ }
+
+ try:
+ dataset_req = biq_query.datasets().insert(projectId=project_id, body=body)
+ dataset_req.execute(num_retries=NUM_RETRIES)
+ except HttpError as http_error:
+ if http_error.resp.status == 409:
+ print 'Warning: The dataset %s already exists' % dataset_id
+ else:
+ # Note: For more debugging info, print "http_error.content"
+ print 'Error in creating dataset: %s. Err: %s' % (dataset_id, http_error)
+ is_success = False
+ return is_success
+
+
+def create_table(big_query, project_id, dataset_id, table_id, table_schema,
+ description):
+ is_success = True
+
+ body = {
+ 'description': description,
+ 'schema': {
+ 'fields': [{
+ 'name': field_name,
+ 'type': field_type,
+ 'description': field_description
+ } for (field_name, field_type, field_description) in table_schema]
+ },
+ 'tableReference': {
+ 'datasetId': dataset_id,
+ 'projectId': project_id,
+ 'tableId': table_id
+ }
+ }
+
+ try:
+ table_req = big_query.tables().insert(projectId=project_id,
+ datasetId=dataset_id,
+ body=body)
+ res = table_req.execute(num_retries=NUM_RETRIES)
+ print 'Successfully created %s "%s"' % (res['kind'], res['id'])
+ except HttpError as http_error:
+ if http_error.resp.status == 409:
+ print 'Warning: Table %s already exists' % table_id
+ else:
+ print 'Error in creating table: %s. Err: %s' % (table_id, http_error)
+ is_success = False
+ return is_success
+
+
+def insert_rows(big_query, project_id, dataset_id, table_id, rows_list):
+ is_success = True
+ body = {'rows': rows_list}
+ try:
+ insert_req = big_query.tabledata().insertAll(projectId=project_id,
+ datasetId=dataset_id,
+ tableId=table_id,
+ body=body)
+ print body
+ res = insert_req.execute(num_retries=NUM_RETRIES)
+ print res
+ except HttpError as http_error:
+ print 'Error in inserting rows in the table %s' % table_id
+ is_success = False
+ return is_success
+
+
+def sync_query_job(big_query, project_id, query, timeout=5000):
+ query_data = {'query': query, 'timeoutMs': timeout}
+ query_job = None
+ try:
+ query_job = big_query.jobs().query(
+ projectId=project_id,
+ body=query_data).execute(num_retries=NUM_RETRIES)
+ except HttpError as http_error:
+ print 'Query execute job failed with error: %s' % http_error
+ print http_error.content
+ return query_job
+
+ # List of (column name, column type, description) tuples
+def make_row(unique_row_id, row_values_dict):
+ """row_values_dict is a dictionary of column name and column value.
+ """
+ return {'insertId': unique_row_id, 'json': row_values_dict}
diff --git a/tools/gcp/utils/kubernetes_api.py b/tools/gcp/utils/kubernetes_api.py
new file mode 100755
index 0000000000..2d3f771e93
--- /dev/null
+++ b/tools/gcp/utils/kubernetes_api.py
@@ -0,0 +1,269 @@
+#!/usr/bin/env python2.7
+# Copyright 2015-2016 Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import requests
+import json
+
+_REQUEST_TIMEOUT_SECS = 10
+
+
+def _make_pod_config(pod_name, image_name, container_port_list, cmd_list,
+ arg_list, env_dict):
+ """Creates a string containing the Pod defintion as required by the Kubernetes API"""
+ body = {
+ 'kind': 'Pod',
+ 'apiVersion': 'v1',
+ 'metadata': {
+ 'name': pod_name,
+ 'labels': {'name': pod_name}
+ },
+ 'spec': {
+ 'containers': [
+ {
+ 'name': pod_name,
+ 'image': image_name,
+ 'ports': [{'containerPort': port,
+ 'protocol': 'TCP'}
+ for port in container_port_list],
+ 'imagePullPolicy': 'Always'
+ }
+ ]
+ }
+ }
+
+ env_list = [{'name': k, 'value': v} for (k, v) in env_dict.iteritems()]
+ if len(env_list) > 0:
+ body['spec']['containers'][0]['env'] = env_list
+
+ # Add the 'Command' and 'Args' attributes if they are passed.
+ # Note:
+ # - 'Command' overrides the ENTRYPOINT in the Docker Image
+ # - 'Args' override the CMD in Docker image (yes, it is confusing!)
+ if len(cmd_list) > 0:
+ body['spec']['containers'][0]['command'] = cmd_list
+ if len(arg_list) > 0:
+ body['spec']['containers'][0]['args'] = arg_list
+ return json.dumps(body)
+
+
+def _make_service_config(service_name, pod_name, service_port_list,
+ container_port_list, is_headless):
+ """Creates a string containing the Service definition as required by the Kubernetes API.
+
+ NOTE:
+ This creates either a Headless Service or 'LoadBalancer' service depending on
+ the is_headless parameter. For Headless services, there is no 'type' attribute
+ and the 'clusterIP' attribute is set to 'None'. Also, if the service is
+ Headless, Kubernetes creates DNS entries for Pods - i.e creates DNS A-records
+ mapping the service's name to the Pods' IPs
+ """
+ if len(container_port_list) != len(service_port_list):
+ print(
+ 'ERROR: container_port_list and service_port_list must be of same size')
+ return ''
+ body = {
+ 'kind': 'Service',
+ 'apiVersion': 'v1',
+ 'metadata': {
+ 'name': service_name,
+ 'labels': {
+ 'name': service_name
+ }
+ },
+ 'spec': {
+ 'ports': [],
+ 'selector': {
+ 'name': pod_name
+ }
+ }
+ }
+ # Populate the 'ports' list in the 'spec' section. This maps service ports
+ # (port numbers that are exposed by Kubernetes) to container ports (i.e port
+ # numbers that are exposed by your Docker image)
+ for idx in range(len(container_port_list)):
+ port_entry = {
+ 'port': service_port_list[idx],
+ 'targetPort': container_port_list[idx],
+ 'protocol': 'TCP'
+ }
+ body['spec']['ports'].append(port_entry)
+
+ # Make this either a LoadBalancer service or a headless service depending on
+ # the is_headless parameter
+ if is_headless:
+ body['spec']['clusterIP'] = 'None'
+ else:
+ body['spec']['type'] = 'LoadBalancer'
+ return json.dumps(body)
+
+
+def _print_connection_error(msg):
+ print('ERROR: Connection failed. Did you remember to run Kubenetes proxy on '
+ 'localhost (i.e kubectl proxy --port=<proxy_port>) ?. Error: %s' % msg)
+
+
+def _do_post(post_url, api_name, request_body):
+ """Helper to do HTTP POST.
+
+ Note:
+ 1) On success, Kubernetes returns a success code of 201(CREATED) not 200(OK)
+ 2) A response code of 509(CONFLICT) is interpreted as a success code (since
+ the error is most likely due to the resource already existing). This makes
+ _do_post() idempotent which is semantically desirable.
+ """
+ is_success = True
+ try:
+ r = requests.post(post_url,
+ data=request_body,
+ timeout=_REQUEST_TIMEOUT_SECS)
+ if r.status_code == requests.codes.conflict:
+ print('WARN: Looks like the resource already exists. Api: %s, url: %s' %
+ (api_name, post_url))
+ elif r.status_code != requests.codes.created:
+ print('ERROR: %s API returned error. HTTP response: (%d) %s' %
+ (api_name, r.status_code, r.text))
+ is_success = False
+ except (requests.exceptions.Timeout,
+ requests.exceptions.ConnectionError) as e:
+ is_success = False
+ _print_connection_error(str(e))
+ return is_success
+
+
+def _do_delete(del_url, api_name):
+ """Helper to do HTTP DELETE.
+
+ Note: A response code of 404(NOT_FOUND) is treated as success to keep
+ _do_delete() idempotent.
+ """
+ is_success = True
+ try:
+ r = requests.delete(del_url, timeout=_REQUEST_TIMEOUT_SECS)
+ if r.status_code == requests.codes.not_found:
+ print('WARN: The resource does not exist. Api: %s, url: %s' %
+ (api_name, del_url))
+ elif r.status_code != requests.codes.ok:
+ print('ERROR: %s API returned error. HTTP response: %s' %
+ (api_name, r.text))
+ is_success = False
+ except (requests.exceptions.Timeout,
+ requests.exceptions.ConnectionError) as e:
+ is_success = False
+ _print_connection_error(str(e))
+ return is_success
+
+
+def create_service(kube_host, kube_port, namespace, service_name, pod_name,
+ service_port_list, container_port_list, is_headless):
+ """Creates either a Headless Service or a LoadBalancer Service depending
+ on the is_headless parameter.
+ """
+ post_url = 'http://%s:%d/api/v1/namespaces/%s/services' % (
+ kube_host, kube_port, namespace)
+ request_body = _make_service_config(service_name, pod_name, service_port_list,
+ container_port_list, is_headless)
+ return _do_post(post_url, 'Create Service', request_body)
+
+
+def create_pod(kube_host, kube_port, namespace, pod_name, image_name,
+ container_port_list, cmd_list, arg_list, env_dict):
+ """Creates a Kubernetes Pod.
+
+ Note that it is generally NOT considered a good practice to directly create
+ Pods. Typically, the recommendation is to create 'Controllers' to create and
+ manage Pods' lifecycle. Currently Kubernetes only supports 'Replication
+ Controller' which creates a configurable number of 'identical Replicas' of
+ Pods and automatically restarts any Pods in case of failures (for eg: Machine
+ failures in Kubernetes). This makes it less flexible for our test use cases
+ where we might want slightly different set of args to each Pod. Hence we
+ directly create Pods and not care much about Kubernetes failures since those
+ are very rare.
+ """
+ post_url = 'http://%s:%d/api/v1/namespaces/%s/pods' % (kube_host, kube_port,
+ namespace)
+ request_body = _make_pod_config(pod_name, image_name, container_port_list,
+ cmd_list, arg_list, env_dict)
+ return _do_post(post_url, 'Create Pod', request_body)
+
+
+def delete_service(kube_host, kube_port, namespace, service_name):
+ del_url = 'http://%s:%d/api/v1/namespaces/%s/services/%s' % (
+ kube_host, kube_port, namespace, service_name)
+ return _do_delete(del_url, 'Delete Service')
+
+
+def delete_pod(kube_host, kube_port, namespace, pod_name):
+ del_url = 'http://%s:%d/api/v1/namespaces/%s/pods/%s' % (kube_host, kube_port,
+ namespace, pod_name)
+ return _do_delete(del_url, 'Delete Pod')
+
+
+def create_pod_and_service(kube_host, kube_port, namespace, pod_name,
+ image_name, container_port_list, cmd_list, arg_list,
+ env_dict, is_headless_service):
+ """A helper function that creates a pod and a service (if pod creation was successful)."""
+ is_success = create_pod(kube_host, kube_port, namespace, pod_name, image_name,
+ container_port_list, cmd_list, arg_list, env_dict)
+ if not is_success:
+ print 'Error in creating Pod'
+ return False
+
+ is_success = create_service(
+ kube_host,
+ kube_port,
+ namespace,
+ pod_name, # Use pod_name for service
+ pod_name,
+ container_port_list, # Service port list same as container port list
+ container_port_list,
+ is_headless_service)
+ if not is_success:
+ print 'Error in creating Service'
+ return False
+
+ print 'Successfully created the pod/service %s' % pod_name
+ return True
+
+
+def delete_pod_and_service(kube_host, kube_port, namespace, pod_name):
+ """ A helper function that calls delete_pod and delete_service """
+ is_success = delete_pod(kube_host, kube_port, namespace, pod_name)
+ if not is_success:
+ print 'Error in deleting pod %s' % pod_name
+ return False
+
+ # Note: service name assumed to the the same as pod name
+ is_success = delete_service(kube_host, kube_port, namespace, pod_name)
+ if not is_success:
+ print 'Error in deleting service %s' % pod_name
+ return False
+
+ print 'Successfully deleted the Pod/Service: %s' % pod_name
+ return True