From 44ca2c26409a172b80bc9f40f7578f3eaf1d135d Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Tue, 16 Feb 2016 09:48:36 -0800 Subject: Examples --- test/cpp/interop/metrics_client.cc | 41 +++++++++++++++++++++++++------------- 1 file changed, 27 insertions(+), 14 deletions(-) (limited to 'test') diff --git a/test/cpp/interop/metrics_client.cc b/test/cpp/interop/metrics_client.cc index 0c140ffd85..cc304f2e89 100644 --- a/test/cpp/interop/metrics_client.cc +++ b/test/cpp/interop/metrics_client.cc @@ -37,39 +37,45 @@ #include #include -#include "test/cpp/util/metrics_server.h" -#include "test/cpp/util/test_config.h" #include "src/proto/grpc/testing/metrics.grpc.pb.h" #include "src/proto/grpc/testing/metrics.pb.h" +#include "test/cpp/util/metrics_server.h" +#include "test/cpp/util/test_config.h" DEFINE_string(metrics_server_address, "", "The metrics server addresses in the fomrat :"); +DEFINE_bool(total_only, false, + "If true, this prints only the total value of all gauges"); + +int kDeadlineSecs = 10; using grpc::testing::EmptyMessage; using grpc::testing::GaugeResponse; using grpc::testing::MetricsService; using grpc::testing::MetricsServiceImpl; -void PrintMetrics(const grpc::string& server_address) { - gpr_log(GPR_INFO, "creating a channel to %s", server_address.c_str()); - std::shared_ptr channel( - grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials())); - - std::unique_ptr stub(MetricsService::NewStub(channel)); - +// Prints the values of all Gauges (unless total_only is set to 'true' in which +// case this only prints the sum of all gauge values). +bool PrintMetrics(std::unique_ptr stub, bool total_only) { grpc::ClientContext context; EmptyMessage message; + std::chrono::system_clock::time_point deadline = + std::chrono::system_clock::now() + std::chrono::seconds(kDeadlineSecs); + + context.set_deadline(deadline); + std::unique_ptr> reader( stub->GetAllGauges(&context, message)); GaugeResponse gauge_response; long overall_qps = 0; - int idx = 0; while (reader->Read(&gauge_response)) { if (gauge_response.value_case() == GaugeResponse::kLongValue) { - gpr_log(GPR_INFO, "Gauge: %d (%s: %ld)", ++idx, - gauge_response.name().c_str(), gauge_response.long_value()); + if (!total_only) { + gpr_log(GPR_INFO, "%s: %ld", gauge_response.name().c_str(), + gauge_response.long_value()); + } overall_qps += gauge_response.long_value(); } else { gpr_log(GPR_INFO, "Gauge %s is not a long value", @@ -77,12 +83,14 @@ void PrintMetrics(const grpc::string& server_address) { } } - gpr_log(GPR_INFO, "OVERALL: %ld", overall_qps); + gpr_log(GPR_INFO, "%ld", overall_qps); const grpc::Status status = reader->Finish(); if (!status.ok()) { gpr_log(GPR_ERROR, "Error in getting metrics from the client"); } + + return status.ok(); } int main(int argc, char** argv) { @@ -97,7 +105,12 @@ int main(int argc, char** argv) { return 1; } - PrintMetrics(FLAGS_metrics_server_address); + std::shared_ptr channel(grpc::CreateChannel( + FLAGS_metrics_server_address, grpc::InsecureChannelCredentials())); + + if (!PrintMetrics(MetricsService::NewStub(channel), FLAGS_total_only)) { + return 1; + } return 0; } -- cgit v1.2.3 From 559e45becd0a50bd6af850900abbb2b5759f8719 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Fri, 19 Feb 2016 03:02:16 -0800 Subject: Scripts to launch stress tests in GKE --- test/cpp/util/metrics_server.cc | 2 +- tools/big_query/big_query_utils.py | 140 ++++++++ tools/gke/big_query_utils.py | 181 ----------- tools/gke/create_client.py | 108 ------- tools/gke/create_server.py | 74 ----- tools/gke/delete_client.py | 66 ---- tools/gke/delete_server.py | 58 ---- tools/gke/kubernetes_api.py | 5 +- tools/gke/run_stress_tests_on_gke.py | 389 +++++++++++++++++++++++ tools/run_tests/stress_test/run_client.py | 188 +++++++++++ tools/run_tests/stress_test/run_server.py | 115 +++++++ tools/run_tests/stress_test/stress_test_utils.py | 192 +++++++++++ tools/run_tests/stress_test_wrapper.py | 96 ------ 13 files changed, 1028 insertions(+), 586 deletions(-) create mode 100755 tools/big_query/big_query_utils.py delete mode 100644 tools/gke/big_query_utils.py delete mode 100755 tools/gke/create_client.py delete mode 100755 tools/gke/create_server.py delete mode 100755 tools/gke/delete_client.py delete mode 100755 tools/gke/delete_server.py create mode 100755 tools/gke/run_stress_tests_on_gke.py create mode 100755 tools/run_tests/stress_test/run_client.py create mode 100755 tools/run_tests/stress_test/run_server.py create mode 100755 tools/run_tests/stress_test/stress_test_utils.py delete mode 100755 tools/run_tests/stress_test_wrapper.py (limited to 'test') diff --git a/test/cpp/util/metrics_server.cc b/test/cpp/util/metrics_server.cc index 07978d0bdb..34d51eb316 100644 --- a/test/cpp/util/metrics_server.cc +++ b/test/cpp/util/metrics_server.cc @@ -57,7 +57,7 @@ long Gauge::Get() { grpc::Status MetricsServiceImpl::GetAllGauges( ServerContext* context, const EmptyMessage* request, ServerWriter* writer) { - gpr_log(GPR_INFO, "GetAllGauges called"); + gpr_log(GPR_DEBUG, "GetAllGauges called"); std::lock_guard lock(mu_); for (auto it = gauges_.begin(); it != gauges_.end(); it++) { diff --git a/tools/big_query/big_query_utils.py b/tools/big_query/big_query_utils.py new file mode 100755 index 0000000000..267d019850 --- /dev/null +++ b/tools/big_query/big_query_utils.py @@ -0,0 +1,140 @@ +#!/usr/bin/env python2.7 +# Copyright 2015-2016 Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import argparse +import json +import uuid +import httplib2 + +from apiclient import discovery +from apiclient.errors import HttpError +from oauth2client.client import GoogleCredentials + +NUM_RETRIES = 3 + + +def create_big_query(): + """Authenticates with cloud platform and gets a BiqQuery service object + """ + creds = GoogleCredentials.get_application_default() + return discovery.build('bigquery', 'v2', credentials=creds) + + +def create_dataset(biq_query, project_id, dataset_id): + is_success = True + body = { + 'datasetReference': { + 'projectId': project_id, + 'datasetId': dataset_id + } + } + + try: + dataset_req = biq_query.datasets().insert(projectId=project_id, body=body) + dataset_req.execute(num_retries=NUM_RETRIES) + except HttpError as http_error: + if http_error.resp.status == 409: + print 'Warning: The dataset %s already exists' % dataset_id + else: + # Note: For more debugging info, print "http_error.content" + print 'Error in creating dataset: %s. Err: %s' % (dataset_id, http_error) + is_success = False + return is_success + + +def create_table(big_query, project_id, dataset_id, table_id, table_schema, + description): + is_success = True + + body = { + 'description': description, + 'schema': { + 'fields': [{ + 'name': field_name, + 'type': field_type, + 'description': field_description + } for (field_name, field_type, field_description) in table_schema] + }, + 'tableReference': { + 'datasetId': dataset_id, + 'projectId': project_id, + 'tableId': table_id + } + } + + try: + table_req = big_query.tables().insert(projectId=project_id, + datasetId=dataset_id, + body=body) + res = table_req.execute(num_retries=NUM_RETRIES) + print 'Successfully created %s "%s"' % (res['kind'], res['id']) + except HttpError as http_error: + if http_error.resp.status == 409: + print 'Warning: Table %s already exists' % table_id + else: + print 'Error in creating table: %s. Err: %s' % (table_id, http_error) + is_success = False + return is_success + + +def insert_rows(big_query, project_id, dataset_id, table_id, rows_list): + is_success = True + body = {'rows': rows_list} + try: + insert_req = big_query.tabledata().insertAll(projectId=project_id, + datasetId=dataset_id, + tableId=table_id, + body=body) + print body + res = insert_req.execute(num_retries=NUM_RETRIES) + print res + except HttpError as http_error: + print 'Error in inserting rows in the table %s' % table_id + is_success = False + return is_success + + +def sync_query_job(big_query, project_id, query, timeout=5000): + query_data = {'query': query, 'timeoutMs': timeout} + query_job = None + try: + query_job = big_query.jobs().query( + projectId=project_id, + body=query_data).execute(num_retries=NUM_RETRIES) + except HttpError as http_error: + print 'Query execute job failed with error: %s' % http_error + print http_error.content + return query_job + + # List of (column name, column type, description) tuples +def make_row(unique_row_id, row_values_dict): + """row_values_dict is a dictionar of column name and column value. + """ + return {'insertId': unique_row_id, 'json': row_values_dict} diff --git a/tools/gke/big_query_utils.py b/tools/gke/big_query_utils.py deleted file mode 100644 index ebcf9d6ec3..0000000000 --- a/tools/gke/big_query_utils.py +++ /dev/null @@ -1,181 +0,0 @@ -import argparse -import json -import uuid -import httplib2 - -from apiclient import discovery -from apiclient.errors import HttpError -from oauth2client.client import GoogleCredentials - -NUM_RETRIES = 3 - - -def create_bq(): - """Authenticates with cloud platform and gets a BiqQuery service object - """ - creds = GoogleCredentials.get_application_default() - return discovery.build('bigquery', 'v2', credentials=creds) - - -def create_ds(biq_query, project_id, dataset_id): - is_success = True - body = { - 'datasetReference': { - 'projectId': project_id, - 'datasetId': dataset_id - } - } - try: - dataset_req = biq_query.datasets().insert(projectId=project_id, body=body) - dataset_req.execute(num_retries=NUM_RETRIES) - except HttpError as http_error: - if http_error.resp.status == 409: - print 'Warning: The dataset %s already exists' % dataset_id - else: - # Note: For more debugging info, print "http_error.content" - print 'Error in creating dataset: %s. Err: %s' % (dataset_id, http_error) - is_success = False - return is_success - - -def make_field(field_name, field_type, field_description): - return { - 'name': field_name, - 'type': field_type, - 'description': field_description - } - - -def create_table(big_query, project_id, dataset_id, table_id, fields_list, - description): - is_success = True - body = { - 'description': description, - 'schema': { - 'fields': fields_list - }, - 'tableReference': { - 'datasetId': dataset_id, - 'projectId': project_id, - 'tableId': table_id - } - } - try: - table_req = big_query.tables().insert(projectId=project_id, - datasetId=dataset_id, - body=body) - res = table_req.execute(num_retries=NUM_RETRIES) - print 'Successfully created %s "%s"' % (res['kind'], res['id']) - except HttpError as http_error: - if http_error.resp.status == 409: - print 'Warning: Table %s already exists' % table_id - else: - print 'Error in creating table: %s. Err: %s' % (table_id, http_error) - is_success = False - return is_success - - -def insert_rows(big_query, project_id, dataset_id, table_id, rows_list): - is_success = True - body = {'rows': rows_list} - try: - insert_req = big_query.tabledata().insertAll(projectId=project_id, - datasetId=dataset_id, - tableId=table_id, - body=body) - print body - res = insert_req.execute(num_retries=NUM_RETRIES) - print res - except HttpError as http_error: - print 'Error in inserting rows in the table %s' % table_id - is_success = False - return is_success - -##################### - - -def make_emp_row(emp_id, emp_name, emp_email): - return { - 'insertId': str(emp_id), - 'json': { - 'emp_id': emp_id, - 'emp_name': emp_name, - 'emp_email_id': emp_email - } - } - - -def get_emp_table_fields_list(): - return [ - make_field('emp_id', 'INTEGER', 'Employee id'), - make_field('emp_name', 'STRING', 'Employee name'), - make_field('emp_email_id', 'STRING', 'Employee email id') - ] - - -def insert_emp_rows(big_query, project_id, dataset_id, table_id, start_idx, - num_rows): - rows_list = [make_emp_row(i, 'sree_%d' % i, 'sreecha_%d@gmail.com' % i) - for i in range(start_idx, start_idx + num_rows)] - insert_rows(big_query, project_id, dataset_id, table_id, rows_list) - - -def create_emp_table(big_query, project_id, dataset_id, table_id): - fields_list = get_emp_table_fields_list() - description = 'Test table created by sree' - create_table(big_query, project_id, dataset_id, table_id, fields_list, - description) - - -def sync_query(big_query, project_id, query, timeout=5000): - query_data = {'query': query, 'timeoutMs': timeout} - query_job = None - try: - query_job = big_query.jobs().query( - projectId=project_id, - body=query_data).execute(num_retries=NUM_RETRIES) - except HttpError as http_error: - print 'Query execute job failed with error: %s' % http_error - print http_error.content - return query_job - -#[Start query_emp_records] -def query_emp_records(big_query, project_id, dataset_id, table_id): - query = 'SELECT emp_id, emp_name FROM %s.%s ORDER BY emp_id;' % (dataset_id, table_id) - print query - query_job = sync_query(big_query, project_id, query, 5000) - job_id = query_job['jobReference'] - - print query_job - print '**Starting paging **' - #[Start Paging] - page_token = None - while True: - page = big_query.jobs().getQueryResults( - pageToken=page_token, - **query_job['jobReference']).execute(num_retries=NUM_RETRIES) - rows = page['rows'] - for row in rows: - print row['f'][0]['v'], "---", row['f'][1]['v'] - page_token = page.get('pageToken') - if not page_token: - break - #[End Paging] -#[End query_emp_records] - -######################### -DATASET_SEQ_NUM = 1 -TABLE_SEQ_NUM = 11 - -PROJECT_ID = 'sree-gce' -DATASET_ID = 'sree_test_dataset_%d' % DATASET_SEQ_NUM -TABLE_ID = 'sree_test_table_%d' % TABLE_SEQ_NUM - -EMP_ROW_IDX = 10 -EMP_NUM_ROWS = 5 - -bq = create_bq() -create_ds(bq, PROJECT_ID, DATASET_ID) -create_emp_table(bq, PROJECT_ID, DATASET_ID, TABLE_ID) -insert_emp_rows(bq, PROJECT_ID, DATASET_ID, TABLE_ID, EMP_ROW_IDX, EMP_NUM_ROWS) -query_emp_records(bq, PROJECT_ID, DATASET_ID, TABLE_ID) diff --git a/tools/gke/create_client.py b/tools/gke/create_client.py deleted file mode 100755 index bc56ef0ef1..0000000000 --- a/tools/gke/create_client.py +++ /dev/null @@ -1,108 +0,0 @@ -#!/usr/bin/env python2.7 -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import argparse - -import kubernetes_api - -argp = argparse.ArgumentParser(description='Launch Stress tests in GKE') - -argp.add_argument('-n', - '--num_instances', - required=True, - type=int, - help='The number of instances to launch in GKE') -args = argp.parse_args() - -kubernetes_api_server="localhost" -kubernetes_api_port=8001 - - -# Docker image -image_name="gcr.io/sree-gce/grpc_stress_test_2" - -server_address = "stress-server.default.svc.cluster.local:8080" -metrics_server_address = "localhost:8081" - -stress_test_arg_list=[ - "--server_addresses=" + server_address, - "--test_cases=empty_unary:20,large_unary:20", - "--num_stubs_per_channel=10" -] - -metrics_client_arg_list=[ - "--metrics_server_address=" + metrics_server_address, - "--total_only=true"] - -env_dict={ - "GPRC_ROOT": "/var/local/git/grpc", - "STRESS_TEST_IMAGE": "/var/local/git/grpc/bins/opt/stress_test", - "STRESS_TEST_ARGS_STR": ' '.join(stress_test_arg_list), - "METRICS_CLIENT_IMAGE": "/var/local/git/grpc/bins/opt/metrics_client", - "METRICS_CLIENT_ARGS_STR": ' '.join(metrics_client_arg_list)} - -cmd_list=["/var/local/git/grpc/bins/opt/stress_test"] -arg_list=stress_test_arg_list # make this [] in future -port_list=[8081] - -namespace = 'default' -is_headless_service = False # Client is NOT headless service - -print('Creating %d instances of client..' % args.num_instances) - -for i in range(1, args.num_instances + 1): - service_name = 'stress-client-%d' % i - pod_name = service_name # Use the same name for kubernetes Service and Pod - is_success = kubernetes_api.create_pod( - kubernetes_api_server, - kubernetes_api_port, - namespace, - pod_name, - image_name, - port_list, - cmd_list, - arg_list, - env_dict) - if not is_success: - print("Error in creating pod %s" % pod_name) - else: - is_success = kubernetes_api.create_service( - kubernetes_api_server, - kubernetes_api_port, - namespace, - service_name, - pod_name, - port_list, # Service port list - port_list, # Container port list (same as service port list) - is_headless_service) - if not is_success: - print("Error in creating service %s" % service_name) - else: - print("Created client %s" % pod_name) diff --git a/tools/gke/create_server.py b/tools/gke/create_server.py deleted file mode 100755 index 23ab62c205..0000000000 --- a/tools/gke/create_server.py +++ /dev/null @@ -1,74 +0,0 @@ -#!/usr/bin/env python2.7 -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import argparse - -import kubernetes_api - -service_name = 'stress-server' -pod_name = service_name # Use the same name for kubernetes Service and Pod -namespace = 'default' -is_headless_service = True -cmd_list=['/var/local/git/grpc/bins/opt/interop_server'] -arg_list=['--port=8080'] -port_list=[8080] -image_name='gcr.io/sree-gce/grpc_stress_test_2' -env_dict={} - -# Make sure you run kubectl proxy --port=8001 -kubernetes_api_server='localhost' -kubernetes_api_port=8001 - -is_success = kubernetes_api.create_pod( - kubernetes_api_server, - kubernetes_api_port, - namespace, - pod_name, - image_name, - port_list, - cmd_list, - arg_list, - env_dict) -if not is_success: - print("Error in creating pod") -else: - is_success = kubernetes_api.create_service( - kubernetes_api_server, - kubernetes_api_port, - namespace, - service_name, - pod_name, - port_list, # Service port list - port_list, # Container port list (same as service port list) - is_headless_service) - if not is_success: - print("Error in creating service") - else: - print("Successfully created the Server") diff --git a/tools/gke/delete_client.py b/tools/gke/delete_client.py deleted file mode 100755 index aa519f26b8..0000000000 --- a/tools/gke/delete_client.py +++ /dev/null @@ -1,66 +0,0 @@ -#!/usr/bin/env python2.7 -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import argparse - -import kubernetes_api - -argp = argparse.ArgumentParser(description='Delete Stress test clients in GKE') -argp.add_argument('-n', - '--num_instances', - required=True, - type=int, - help='The number of instances currently running') - -args = argp.parse_args() -for i in range(1, args.num_instances + 1): - service_name = 'stress-client-%d' % i - pod_name = service_name - namespace = 'default' - kubernetes_api_server="localhost" - kubernetes_api_port=8001 - - is_success=kubernetes_api.delete_pod( - kubernetes_api_server, - kubernetes_api_port, - namespace, - pod_name) - if not is_success: - print('Error in deleting Pod %s' % pod_name) - else: - is_success= kubernetes_api.delete_service( - kubernetes_api_server, - kubernetes_api_port, - namespace, - service_name) - if not is_success: - print('Error in deleting Service %s' % service_name) - else: - print('Deleted %s' % pod_name) diff --git a/tools/gke/delete_server.py b/tools/gke/delete_server.py deleted file mode 100755 index 6e3fdcc33b..0000000000 --- a/tools/gke/delete_server.py +++ /dev/null @@ -1,58 +0,0 @@ -#!/usr/bin/env python2.7 -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import argparse - -import kubernetes_api - -service_name = 'stress-server' -pod_name = service_name # Use the same name for kubernetes Service and Pod -namespace = 'default' -is_headless_service = True -kubernetes_api_server="localhost" -kubernetes_api_port=8001 - -is_success = kubernetes_api.delete_pod( - kubernetes_api_server, - kubernetes_api_port, - namespace, - pod_name) -if not is_success: - print("Error in deleting Pod %s" % pod_name) -else: - is_success = kubernetes_api.delete_service( - kubernetes_api_server, - kubernetes_api_port, - namespace, - service_name) - if not is_success: - print("Error in deleting Service %d" % service_name) - else: - print("Deleted server %s" % service_name) diff --git a/tools/gke/kubernetes_api.py b/tools/gke/kubernetes_api.py index 14d724bd31..d14c26ad6a 100755 --- a/tools/gke/kubernetes_api.py +++ b/tools/gke/kubernetes_api.py @@ -1,5 +1,5 @@ #!/usr/bin/env python2.7 -# Copyright 2015, Google Inc. +# Copyright 2015-2016 Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -50,7 +50,8 @@ def _make_pod_config(pod_name, image_name, container_port_list, cmd_list, 'name': pod_name, 'image': image_name, 'ports': [{'containerPort': port, - 'protocol': 'TCP'} for port in container_port_list] + 'protocol': 'TCP'} for port in container_port_list], + 'imagePullPolicy': 'Always' } ] } diff --git a/tools/gke/run_stress_tests_on_gke.py b/tools/gke/run_stress_tests_on_gke.py new file mode 100755 index 0000000000..d0c3887a42 --- /dev/null +++ b/tools/gke/run_stress_tests_on_gke.py @@ -0,0 +1,389 @@ +#!/usr/bin/env python2.7 +# Copyright 2015-2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +import datetime +import os +import subprocess +import sys +import time + +import kubernetes_api + +GRPC_ROOT = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), '../..')) +os.chdir(GRPC_ROOT) + +class BigQuerySettings: + + def __init__(self, run_id, dataset_id, summary_table_id, qps_table_id): + self.run_id = run_id + self.dataset_id = dataset_id + self.summary_table_id = summary_table_id + self.qps_table_id = qps_table_id + + +class KubernetesProxy: + """ Class to start a proxy on localhost to the Kubernetes API server """ + + def __init__(self, api_port): + self.port = api_port + self.p = None + self.started = False + + def start(self): + cmd = ['kubectl', 'proxy', '--port=%d' % self.port] + self.p = subprocess.Popen(args=cmd) + self.started = True + time.sleep(2) + print '..Started' + + def get_port(self): + return self.port + + def is_started(self): + return self.started + + def __del__(self): + if self.p is not None: + self.p.kill() + + +def _build_docker_image(image_name, tag_name): + """ Build the docker image and add a tag """ + os.environ['INTEROP_IMAGE'] = image_name + # Note that 'BASE_NAME' HAS to be 'grpc_interop_stress_cxx' since the script + # build_interop_stress_image.sh invokes the following script: + # tools/dockerfile/$BASE_NAME/build_interop_stress.sh + os.environ['BASE_NAME'] = 'grpc_interop_stress_cxx' + cmd = ['tools/jenkins/build_interop_stress_image.sh'] + p = subprocess.Popen(args=cmd) + retcode = p.wait() + if retcode != 0: + print 'Error in building docker image' + return False + + cmd = ['docker', 'tag', '-f', image_name, tag_name] + p = subprocess.Popen(args=cmd) + retcode = p.wait() + if retcode != 0: + print 'Error in creating the tag %s for %s' % (tag_name, image_name) + return False + + return True + + +def _push_docker_image_to_gke_registry(docker_tag_name): + """Executes 'gcloud docker push ' to push the image to GKE registry""" + cmd = ['gcloud', 'docker', 'push', docker_tag_name] + print 'Pushing %s to GKE registry..' % docker_tag_name + p = subprocess.Popen(args=cmd) + retcode = p.wait() + if retcode != 0: + print 'Error in pushing docker image %s to the GKE registry' % docker_tag_name + return False + return True + + +def _launch_image_on_gke(kubernetes_api_server, kubernetes_api_port, namespace, + pod_name, image_name, port_list, cmd_list, arg_list, + env_dict, is_headless_service): + """Creates a GKE Pod and a Service object for a given image by calling Kubernetes API""" + is_success = kubernetes_api.create_pod( + kubernetes_api_server, + kubernetes_api_port, + namespace, + pod_name, + image_name, + port_list, # The ports to be exposed on this container/pod + cmd_list, # The command that launches the stress server + arg_list, + env_dict # Environment variables to be passed to the pod + ) + if not is_success: + print 'Error in creating Pod' + return False + + is_success = kubernetes_api.create_service( + kubernetes_api_server, + kubernetes_api_port, + namespace, + pod_name, # Use the pod name for service name as well + pod_name, + port_list, # Service port list + port_list, # Container port list (same as service port list) + is_headless_service) + if not is_success: + print 'Error in creating Service' + return False + + print 'Successfully created the pod/service %s' % pod_name + return True + + +def _delete_image_on_gke(kubernetes_proxy, pod_name_list): + """Deletes a GKE Pod and Service object for given list of Pods by calling Kubernetes API""" + if not kubernetes_proxy.is_started: + print 'Kubernetes proxy must be started before calling this function' + return False + + is_success = True + for pod_name in pod_name_list: + is_success = kubernetes_api.delete_pod( + 'localhost', kubernetes_proxy.get_port(), 'default', pod_name) + if not is_success: + print 'Error in deleting pod %s' % pod_name + break + + is_success = kubernetes_api.delete_service( + 'localhost', kubernetes_proxy.get_port(), 'default', + pod_name) # service name same as pod name + if not is_success: + print 'Error in deleting service %s' % pod_name + break + + if is_success: + print 'Successfully deleted the Pods/Services: %s' % ','.join(pod_name_list) + + return is_success + + +def _launch_server(gcp_project_id, docker_image_name, bq_settings, + kubernetes_proxy, server_pod_name, server_port): + """ Launches a stress test server instance in GKE cluster """ + if not kubernetes_proxy.is_started: + print 'Kubernetes proxy must be started before calling this function' + return False + + server_cmd_list = [ + '/var/local/git/grpc/tools/run_tests/stress_test/run_server.py' + ] # Process that is launched + server_arg_list = [] # run_server.py does not take any args (for now) + + # == Parameters to the server process launched in GKE == + server_env = { + 'STRESS_TEST_IMAGE_TYPE': 'SERVER', + 'STRESS_TEST_IMAGE': '/var/local/git/grpc/bins/opt/interop_server', + 'STRESS_TEST_ARGS_STR': '--port=%s' % server_port, + 'RUN_ID': bq_settings.run_id, + 'POD_NAME': server_pod_name, + 'GCP_PROJECT_ID': gcp_project_id, + 'DATASET_ID': bq_settings.dataset_id, + 'SUMMARY_TABLE_ID': bq_settings.summary_table_id, + 'QPS_TABLE_ID': bq_settings.qps_table_id + } + + # Launch Server + is_success = _launch_image_on_gke( + 'localhost', + kubernetes_proxy.get_port(), + 'default', + server_pod_name, + docker_image_name, + [server_port], # Port that should be exposed on the container + server_cmd_list, + server_arg_list, + server_env, + True # Headless = True for server. Since we want DNS records to be greated by GKE + ) + + return is_success + + +def _launch_client(gcp_project_id, docker_image_name, bq_settings, + kubernetes_proxy, num_instances, client_pod_name_prefix, + server_pod_name, server_port): + """ Launches a configurable number of stress test clients on GKE cluster """ + if not kubernetes_proxy.is_started: + print 'Kubernetes proxy must be started before calling this function' + return False + + server_address = '%s.default.svc.cluster.local:%d' % (server_pod_name, + server_port) + #TODO(sree) Make the whole client args configurable + test_cases_str = 'empty_unary:1,large_unary:1' + stress_client_arg_list = [ + '--server_addresses=%s' % server_address, + '--test_cases=%s' % test_cases_str, '--num_stubs_per_channel=10' + ] + + client_cmd_list = [ + '/var/local/git/grpc/tools/run_tests/stress_test/run_client.py' + ] + # run_client.py takes no args. All args are passed as env variables + client_arg_list = [] + + # TODO(sree) Make this configurable (and also less frequent) + poll_interval_secs = 5 + + metrics_port = 8081 + metrics_server_address = 'localhost:%d' % metrics_port + metrics_client_arg_list = [ + '--metrics_server_address=%s' % metrics_server_address, + '--total_only=true' + ] + + client_env = { + 'STRESS_TEST_IMAGE_TYPE': 'CLIENT', + 'STRESS_TEST_IMAGE': '/var/local/git/grpc/bins/opt/stress_test', + 'STRESS_TEST_ARGS_STR': ' '.join(stress_client_arg_list), + 'METRICS_CLIENT_IMAGE': '/var/local/git/grpc/bins/opt/metrics_client', + 'METRICS_CLIENT_ARGS_STR': ' '.join(metrics_client_arg_list), + 'RUN_ID': bq_settings.run_id, + 'POLL_INTERVAL_SECS': str(poll_interval_secs), + 'GCP_PROJECT_ID': gcp_project_id, + 'DATASET_ID': bq_settings.dataset_id, + 'SUMMARY_TABLE_ID': bq_settings.summary_table_id, + 'QPS_TABLE_ID': bq_settings.qps_table_id + } + + for i in range(1, num_instances + 1): + pod_name = '%s-%d' % (client_pod_name_prefix, i) + client_env['POD_NAME'] = pod_name + is_success = _launch_image_on_gke( + 'localhost', + kubernetes_proxy.get_port(), + 'default', + pod_name, + docker_image_name, + [metrics_port], # Client pods expose metrics port + client_cmd_list, + client_arg_list, + client_env, + False # Client is not a headless service. + ) + if not is_success: + print 'Error in launching client %s' % pod_name + return False + + return True + + +def _launch_server_and_client(gcp_project_id, docker_image_name, + num_client_instances): + # == Big Query tables related settings (Common for both server and client) == + + # Create a unique id for this run (Note: Using timestamp instead of UUID to + # make it easier to deduce the date/time of the run just by looking at the run + # run id. This is useful in debugging when looking at records in Biq query) + run_id = datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S') + + dataset_id = 'stress_test_%s' % run_id + summary_table_id = 'summary' + qps_table_id = 'qps' + + bq_settings = BigQuerySettings(run_id, dataset_id, summary_table_id, + qps_table_id) + + # Start kubernetes proxy + kubernetes_api_port = 9001 + kubernetes_proxy = KubernetesProxy(kubernetes_api_port) + kubernetes_proxy.start() + + server_pod_name = 'stress-server' + server_port = 8080 + is_success = _launch_server(gcp_project_id, docker_image_name, bq_settings, + kubernetes_proxy, server_pod_name, server_port) + if not is_success: + print 'Error in launching server' + return False + + # Server takes a while to start. + # TODO(sree) Use Kubernetes API to query the status of the server instead of + # sleeping + time.sleep(60) + + # Launch client + server_address = '%s.default.svc.cluster.local:%d' % (server_pod_name, + server_port) + client_pod_name_prefix = 'stress-client' + is_success = _launch_client(gcp_project_id, docker_image_name, bq_settings, + kubernetes_proxy, num_client_instances, + client_pod_name_prefix, server_pod_name, + server_port) + if not is_success: + print 'Error in launching client(s)' + return False + + return True + + +def _delete_server_and_client(num_client_instances): + kubernetes_api_port = 9001 + kubernetes_proxy = KubernetesProxy(kubernetes_api_port) + kubernetes_proxy.start() + + # Delete clients first + client_pod_names = ['stress-client-%d' % i + for i in range(1, num_client_instances + 1)] + + is_success = _delete_image_on_gke(kubernetes_proxy, client_pod_names) + if not is_success: + return False + + # Delete server + server_pod_name = 'stress-server' + return _delete_image_on_gke(kubernetes_proxy, [server_pod_name]) + + +def _build_and_push_docker_image(gcp_project_id, docker_image_name, tag_name): + is_success = _build_docker_image(docker_image_name, tag_name) + if not is_success: + return False + return _push_docker_image_to_gke_registry(tag_name) + + +# TODO(sree): This is just to test the above APIs. Rewrite this to make +# everything configurable (like image names / number of instances etc) +def test_run(): + image_name = 'grpc_stress_test' + gcp_project_id = 'sree-gce' + tag_name = 'gcr.io/%s/%s' % (gcp_project_id, image_name) + num_client_instances = 3 + + is_success = _build_docker_image(image_name, tag_name) + if not is_success: + return + + is_success = _push_docker_image_to_gke_registry(tag_name) + if not is_success: + return + + is_success = _launch_server_and_client(gcp_project_id, tag_name, + num_client_instances) + + # Run the test for 2 mins + time.sleep(120) + + is_success = _delete_server_and_client(num_client_instances) + + if not is_success: + return + + +if __name__ == '__main__': + test_run() diff --git a/tools/run_tests/stress_test/run_client.py b/tools/run_tests/stress_test/run_client.py new file mode 100755 index 0000000000..33958bce49 --- /dev/null +++ b/tools/run_tests/stress_test/run_client.py @@ -0,0 +1,188 @@ +#!/usr/bin/env python2.7 +# Copyright 2015-2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import datetime +import os +import re +import select +import subprocess +import sys +import time + +from stress_test_utils import EventType +from stress_test_utils import BigQueryHelper + + +# TODO (sree): Write a python grpc client to directly query the metrics instead +# of calling metrics_client +def _get_qps(metrics_cmd): + qps = 0 + try: + # Note: gpr_log() writes even non-error messages to stderr stream. So it is + # important that we set stderr=subprocess.STDOUT + p = subprocess.Popen(args=metrics_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + retcode = p.wait() + (out_str, err_str) = p.communicate() + if retcode != 0: + print 'Error in reading metrics information' + print 'Output: ', out_str + else: + # The overall qps is printed at the end of the line + m = re.search('\d+$', out_str) + qps = int(m.group()) if m else 0 + except Exception as ex: + print 'Exception while reading metrics information: ' + str(ex) + return qps + + +def run_client(): + """This is a wrapper around the stress test client and performs the following: + 1) Create the following two tables in Big Query: + (i) Summary table: To record events like the test started, completed + successfully or failed + (ii) Qps table: To periodically record the QPS sent by this client + 2) Start the stress test client and add a row in the Big Query summary + table + 3) Once every few seconds (as specificed by the poll_interval_secs) poll + the status of the stress test client process and perform the + following: + 3.1) If the process is still running, get the current qps by invoking + the metrics client program and add a row in the Big Query + Qps table. Sleep for a duration specified by poll_interval_secs + 3.2) If the process exited successfully, add a row in the Big Query + Summary table and exit + 3.3) If the process failed, add a row in Big Query summary table and + wait forever. + NOTE: This script typically runs inside a GKE pod which means + that the pod gets destroyed when the script exits. However, in + case the stress test client fails, we would not want the pod to + be destroyed (since we might want to connect to the pod for + examining logs). This is the reason why the script waits forever + in case of failures + """ + env = dict(os.environ) + image_type = env['STRESS_TEST_IMAGE_TYPE'] + image_name = env['STRESS_TEST_IMAGE'] + args_str = env['STRESS_TEST_ARGS_STR'] + metrics_client_image = env['METRICS_CLIENT_IMAGE'] + metrics_client_args_str = env['METRICS_CLIENT_ARGS_STR'] + run_id = env['RUN_ID'] + pod_name = env['POD_NAME'] + logfile_name = env.get('LOGFILE_NAME') + poll_interval_secs = float(env['POLL_INTERVAL_SECS']) + project_id = env['GCP_PROJECT_ID'] + dataset_id = env['DATASET_ID'] + summary_table_id = env['SUMMARY_TABLE_ID'] + qps_table_id = env['QPS_TABLE_ID'] + + bq_helper = BigQueryHelper(run_id, image_type, pod_name, project_id, + dataset_id, summary_table_id, qps_table_id) + bq_helper.initialize() + + # Create BigQuery Dataset and Tables: Summary Table and Metrics Table + if not bq_helper.setup_tables(): + print 'Error in creating BigQuery tables' + return + + start_time = datetime.datetime.now() + + logfile = None + details = 'Logging to stdout' + if logfile_name is not None: + print 'Opening logfile: %s ...' % logfile_name + details = 'Logfile: %s' % logfile_name + logfile = open(logfile_name, 'w') + + # Update status that the test is starting (in the status table) + bq_helper.insert_summary_row(EventType.STARTING, details) + + metrics_cmd = [metrics_client_image + ] + [x for x in metrics_client_args_str.split()] + stress_cmd = [image_name] + [x for x in args_str.split()] + + print 'Launching process %s ...' % stress_cmd + stress_p = subprocess.Popen(args=stress_cmd, + stdout=logfile, + stderr=subprocess.STDOUT) + + qps_history = [1, 1, 1] # Maintain the last 3 qps readings + qps_history_idx = 0 # Index into the qps_history list + + is_error = False + while True: + # Check if stress_client is still running. If so, collect metrics and upload + # to BigQuery status table + if stress_p.poll() is not None: + # TODO(sree) Upload completion status to BigQuery + end_time = datetime.datetime.now().isoformat() + event_type = EventType.SUCCESS + details = 'End time: %s' % end_time + if stress_p.returncode != 0: + event_type = EventType.FAILURE + details = 'Return code = %d. End time: %s' % (stress_p.returncode, + end_time) + is_error = True + bq_helper.insert_summary_row(event_type, details) + print details + break + + # Stress client still running. Get metrics + qps = _get_qps(metrics_cmd) + qps_recorded_at = datetime.datetime.now().isoformat() + print 'qps: %d at %s' % (qps, qps_recorded_at) + + # If QPS has been zero for the last 3 iterations, flag it as error and exit + qps_history[qps_history_idx] = qps + qps_history_idx = (qps_history_idx + 1) % len(qps_history) + if sum(qps_history) == 0: + details = 'QPS has been zero for the last %d seconds - as of : %s' % ( + poll_interval_secs * 3, qps_recorded_at) + is_error = True + bq_helper.insert_summary_row(EventType.FAILURE, details) + print details + break + + # Upload qps metrics to BiqQuery + bq_helper.insert_qps_row(qps, qps_recorded_at) + + time.sleep(poll_interval_secs) + + if is_error: + print 'Waiting indefinitely..' + select.select([], [], []) + + print 'Completed' + return + + +if __name__ == '__main__': + run_client() diff --git a/tools/run_tests/stress_test/run_server.py b/tools/run_tests/stress_test/run_server.py new file mode 100755 index 0000000000..9ad8d63638 --- /dev/null +++ b/tools/run_tests/stress_test/run_server.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python2.7 +# Copyright 2015-2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import datetime +import os +import select +import subprocess +import sys +import time + +from stress_test_utils import BigQueryHelper +from stress_test_utils import EventType + + +def run_server(): + """This is a wrapper around the interop server and performs the following: + 1) Create a 'Summary table' in Big Query to record events like the server + started, completed successfully or failed. NOTE: This also creates + another table called the QPS table which is currently NOT needed on the + server (it is needed on the stress test clients) + 2) Start the server process and add a row in Big Query summary table + 3) Wait for the server process to terminate. The server process does not + terminate unless there is an error. + If the server process terminated with a failure, add a row in Big Query + and wait forever. + NOTE: This script typically runs inside a GKE pod which means that the + pod gets destroyed when the script exits. However, in case the server + process fails, we would not want the pod to be destroyed (since we + might want to connect to the pod for examining logs). This is the + reason why the script waits forever in case of failures. + """ + + # Read the parameters from environment variables + env = dict(os.environ) + + run_id = env['RUN_ID'] # The unique run id for this test + image_type = env['STRESS_TEST_IMAGE_TYPE'] + image_name = env['STRESS_TEST_IMAGE'] + args_str = env['STRESS_TEST_ARGS_STR'] + pod_name = env['POD_NAME'] + project_id = env['GCP_PROJECT_ID'] + dataset_id = env['DATASET_ID'] + summary_table_id = env['SUMMARY_TABLE_ID'] + qps_table_id = env['QPS_TABLE_ID'] + + logfile_name = env.get('LOGFILE_NAME') + + bq_helper = BigQueryHelper(run_id, image_type, pod_name, project_id, + dataset_id, summary_table_id, qps_table_id) + bq_helper.initialize() + + # Create BigQuery Dataset and Tables: Summary Table and Metrics Table + if not bq_helper.setup_tables(): + print 'Error in creating BigQuery tables' + return + + start_time = datetime.datetime.now() + + logfile = None + details = 'Logging to stdout' + if logfile_name is not None: + print 'Opening log file: ', logfile_name + logfile = open(logfile_name, 'w') + details = 'Logfile: %s' % logfile_name + + # Update status that the test is starting (in the status table) + bq_helper.insert_summary_row(EventType.STARTING, details) + + stress_cmd = [image_name] + [x for x in args_str.split()] + + print 'Launching process %s ...' % stress_cmd + stress_p = subprocess.Popen(args=stress_cmd, + stdout=logfile, + stderr=subprocess.STDOUT) + + returncode = stress_p.wait() + if returncode != 0: + end_time = datetime.datetime.now().isoformat() + event_type = EventType.FAILURE + details = 'Returncode: %d; End time: %s' % (returncode, end_time) + bq_helper.insert_summary_row(event_type, details) + print 'Waiting indefinitely..' + select.select([], [], []) + return returncode + + +if __name__ == '__main__': + run_server() diff --git a/tools/run_tests/stress_test/stress_test_utils.py b/tools/run_tests/stress_test/stress_test_utils.py new file mode 100755 index 0000000000..a0626ce3ac --- /dev/null +++ b/tools/run_tests/stress_test/stress_test_utils.py @@ -0,0 +1,192 @@ +#!/usr/bin/env python2.7 +# Copyright 2015-2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import datetime +import json +import os +import re +import select +import subprocess +import sys +import time + +# Import big_query_utils module +bq_utils_dir = os.path.abspath(os.path.join( + os.path.dirname(__file__), '../../big_query')) +sys.path.append(bq_utils_dir) +import big_query_utils as bq_utils + +class EventType: + STARTING = 'STARTING' + SUCCESS = 'SUCCESS' + FAILURE = 'FAILURE' + +class BigQueryHelper: + """Helper class for the stress test wrappers to interact with BigQuery. + """ + + def __init__(self, run_id, image_type, pod_name, project_id, dataset_id, + summary_table_id, qps_table_id): + self.run_id = run_id + self.image_type = image_type + self.pod_name = pod_name + self.project_id = project_id + self.dataset_id = dataset_id + self.summary_table_id = summary_table_id + self.qps_table_id = qps_table_id + + def initialize(self): + self.bq = bq_utils.create_big_query() + + def setup_tables(self): + return bq_utils.create_dataset(self.bq, self.project_id, self.dataset_id) \ + and self.__create_summary_table() \ + and self.__create_qps_table() + + def insert_summary_row(self, event_type, details): + row_values_dict = { + 'run_id': self.run_id, + 'image_type': self.image_type, + 'pod_name': self.pod_name, + 'event_date': datetime.datetime.now().isoformat(), + 'event_type': event_type, + 'details': details + } + # Something that uniquely identifies the row (Biquery needs it for duplicate + # detection). + row_unique_id = '%s_%s_%s' % (self.run_id, self.pod_name, event_type) + + row = bq_utils.make_row(row_unique_id, row_values_dict) + return bq_utils.insert_rows(self.bq, self.project_id, self.dataset_id, + self.summary_table_id, [row]) + + def insert_qps_row(self, qps, recorded_at): + row_values_dict = { + 'run_id': self.run_id, + 'pod_name': self.pod_name, + 'recorded_at': recorded_at, + 'qps': qps + } + + row_unique_id = '%s_%s_%s' % (self.run_id, self.pod_name, recorded_at) + row = bq_utils.make_row(row_unique_id, row_values_dict) + return bq_utils.insert_rows(self.bq, self.project_id, self.dataset_id, + self.qps_table_id, [row]) + + def check_if_any_tests_failed(self, num_query_retries=3): + query = ('SELECT event_type FROM %s.%s WHERE run_id = %s AND ' + 'event_type="%s"') % (self.dataset_id, self.summary_table_id, + self.run_id, EventType.FAILURE) + query_job = bq_utils.sync_query_job(self.bq, self.project_id, query) + page = self.bq.jobs().getQueryResults(**query_job['jobReference']).execute( + num_retries=num_query_retries) + print page + num_failures = int(page['totalRows']) + print 'num rows: ', num_failures + return num_failures > 0 + + def print_summary_records(self, num_query_retries=3): + line = '-' * 120 + print line + print 'Summary records' + print 'Run Id', self.run_id + print line + query = ('SELECT pod_name, image_type, event_type, event_date, details' + ' FROM %s.%s WHERE run_id = %s ORDER by event_date;') % ( + self.dataset_id, self.summary_table_id, self.run_id) + query_job = bq_utils.sync_query_job(self.bq, self.project_id, query) + + print '{:<25} {:<12} {:<12} {:<30} {}'.format( + 'Pod name', 'Image type', 'Event type', 'Date', 'Details') + print line + page_token = None + while True: + page = self.bq.jobs().getQueryResults( + pageToken=page_token, + **query_job['jobReference']).execute(num_retries=num_query_retries) + rows = page.get('rows', []) + for row in rows: + print '{:<25} {:<12} {:<12} {:<30} {}'.format( + row['f'][0]['v'], row['f'][1]['v'], row['f'][2]['v'], + row['f'][3]['v'], row['f'][4]['v']) + page_token = page.get('pageToken') + if not page_token: + break + + def print_qps_records(self, num_query_retries=3): + line = '-' * 80 + print line + print 'QPS Summary' + print 'Run Id: ', self.run_id + print line + query = ( + 'SELECT pod_name, recorded_at, qps FROM %s.%s WHERE run_id = %s ORDER ' + 'by recorded_at;') % (self.dataset_id, self.qps_table_id, self.run_id) + query_job = bq_utils.sync_query_job(self.bq, self.project_id, query) + print '{:<25} {:30} {}'.format('Pod name', 'Recorded at', 'Qps') + print line + page_token = None + while True: + page = self.bq.jobs().getQueryResults( + pageToken=page_token, + **query_job['jobReference']).execute(num_retries=num_query_retries) + rows = page.get('rows', []) + for row in rows: + print '{:<25} {:30} {}'.format(row['f'][0]['v'], row['f'][1]['v'], + row['f'][2]['v']) + page_token = page.get('pageToken') + if not page_token: + break + + def __create_summary_table(self): + summary_table_schema = [ + ('run_id', 'INTEGER', 'Test run id'), + ('image_type', 'STRING', 'Client or Server?'), + ('pod_name', 'STRING', 'GKE pod hosting this image'), + ('event_date', 'STRING', 'The date of this event'), + ('event_type', 'STRING', 'STARTED/SUCCESS/FAILURE'), + ('details', 'STRING', 'Any other relevant details') + ] + desc = ('The table that contains START/SUCCESS/FAILURE events for ' + ' the stress test clients and servers') + return bq_utils.create_table(self.bq, self.project_id, self.dataset_id, + self.summary_table_id, summary_table_schema, + desc) + + def __create_qps_table(self): + qps_table_schema = [ + ('run_id', 'INTEGER', 'Test run id'), + ('pod_name', 'STRING', 'GKE pod hosting this image'), + ('recorded_at', 'STRING', 'Metrics recorded at time'), + ('qps', 'INTEGER', 'Queries per second') + ] + desc = 'The table that cointains the qps recorded at various intervals' + return bq_utils.create_table(self.bq, self.project_id, self.dataset_id, + self.qps_table_id, qps_table_schema, desc) diff --git a/tools/run_tests/stress_test_wrapper.py b/tools/run_tests/stress_test_wrapper.py deleted file mode 100755 index 8f1bd2024e..0000000000 --- a/tools/run_tests/stress_test_wrapper.py +++ /dev/null @@ -1,96 +0,0 @@ -#!/usr/bin/env python2.7 -import os -import re -import select -import subprocess -import sys -import time - -GRPC_ROOT = '/usr/local/google/home/sreek/workspace/grpc/' -STRESS_TEST_IMAGE = GRPC_ROOT + 'bins/opt/stress_test' -STRESS_TEST_ARGS_STR = ' '.join([ - '--server_addresses=localhost:8000', - '--test_cases=empty_unary:1,large_unary:1', '--num_stubs_per_channel=10', - '--test_duration_secs=10']) -METRICS_CLIENT_IMAGE = GRPC_ROOT + 'bins/opt/metrics_client' -METRICS_CLIENT_ARGS_STR = ' '.join([ - '--metrics_server_address=localhost:8081', '--total_only=true']) -LOGFILE_NAME = 'stress_test.log' - - -# TODO (sree): Write a python grpc client to directly query the metrics instead -# of calling metrics_client -def get_qps(metrics_cmd): - qps = 0 - try: - # Note: gpr_log() writes even non-error messages to stderr stream. So it is - # important that we set stderr=subprocess.STDOUT - p = subprocess.Popen(args=metrics_cmd, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - retcode = p.wait() - (out_str, err_str) = p.communicate() - if retcode != 0: - print 'Error in reading metrics information' - print 'Output: ', out_str - else: - # The overall qps is printed at the end of the line - m = re.search('\d+$', out_str) - qps = int(m.group()) if m else 0 - except Exception as ex: - print 'Exception while reading metrics information: ' + str(ex) - return qps - -def main(argv): - # TODO(sree) Create BigQuery Tables - # (Summary table), (Metrics table) - - # TODO(sree) Update status that the test is starting (in the status table) - # - - metrics_cmd = [METRICS_CLIENT_IMAGE - ] + [x for x in METRICS_CLIENT_ARGS_STR.split()] - - stress_cmd = [STRESS_TEST_IMAGE] + [x for x in STRESS_TEST_ARGS_STR.split()] - # TODO(sree): Add an option to print to stdout if logfilename is absent - logfile = open(LOGFILE_NAME, 'w') - stress_p = subprocess.Popen(args=arg_list, - stdout=logfile, - stderr=subprocess.STDOUT) - - qps_history = [1, 1, 1] # Maintain the last 3 qps - qps_history_idx = 0 # Index into the qps_history list - - is_error = False - while True: - # Check if stress_client is still running. If so, collect metrics and upload - # to BigQuery status table - # - if stress_p is not None: - # TODO(sree) Upload completion status to BiqQuery - is_error = (stress_p.returncode != 0) - break - - # Stress client still running. Get metrics - qps = get_qps(metrics_cmd) - - # If QPS has been zero for the last 3 iterations, flag it as error and exit - qps_history[qps_history_idx] = qps - qps_history_idx = (qps_histor_idx + 1) % len(qps_history) - if sum(a) == 0: - print ('QPS has been zero for the last 3 iterations. Not monitoring ' - 'anymore. The stress test client may be stalled.') - is_error = True - break - - #TODO(sree) Upload qps metrics to BiqQuery - - if is_error: - print 'Waiting indefinitely..' - select.select([],[],[]) - - return 1 - - -if __name__ == '__main__': - main(sys.argv[1:]) -- cgit v1.2.3 From 1b5a264eb80817a6fc362637a7a0497cbc0611ec Mon Sep 17 00:00:00 2001 From: Dan Born Date: Wed, 24 Feb 2016 18:52:39 -0800 Subject: Allow new credential types to be added to tests. --- test/cpp/util/test_credentials_provider.cc | 84 ++++++++++++++++++++---------- test/cpp/util/test_credentials_provider.h | 19 +++---- 2 files changed, 67 insertions(+), 36 deletions(-) (limited to 'test') diff --git a/test/cpp/util/test_credentials_provider.cc b/test/cpp/util/test_credentials_provider.cc index cfd3ebbb11..65d3205767 100644 --- a/test/cpp/util/test_credentials_provider.cc +++ b/test/cpp/util/test_credentials_provider.cc @@ -34,6 +34,8 @@ #include "test/cpp/util/test_credentials_provider.h" +#include + #include #include @@ -48,12 +50,36 @@ using grpc::InsecureServerCredentials; using grpc::ServerCredentials; using grpc::SslCredentialsOptions; using grpc::SslServerCredentialsOptions; -using grpc::testing::CredentialsProvider; +using grpc::testing::CredentialTypeProvider; + +// Provide test credentials. Thread-safe. +class CredentialsProvider { + public: + virtual ~CredentialsProvider() {} + + virtual void AddSecureType( + const grpc::string& type, + std::unique_ptr type_provider) = 0; + virtual std::shared_ptr GetChannelCredentials( + const grpc::string& type, ChannelArguments* args) = 0; + virtual std::shared_ptr GetServerCredentials( + const grpc::string& type) = 0; + virtual std::vector GetSecureCredentialsTypeList() = 0; +}; class DefaultCredentialsProvider : public CredentialsProvider { public: ~DefaultCredentialsProvider() override {} + void AddSecureType( + const grpc::string& type, + std::unique_ptr type_provider) override { + // This clobbers any existing entry for type, except the defaults, which + // can't be clobbered. + grpc::unique_lock lock(mu_); + added_secure_types_[type] = std::move(type_provider); + } + std::shared_ptr GetChannelCredentials( const grpc::string& type, ChannelArguments* args) override { if (type == grpc::testing::kInsecureCredentialsType) { @@ -63,9 +89,14 @@ class DefaultCredentialsProvider : public CredentialsProvider { args->SetSslTargetNameOverride("foo.test.google.fr"); return SslCredentials(ssl_opts); } else { - gpr_log(GPR_ERROR, "Unsupported credentials type %s.", type.c_str()); + grpc::unique_lock lock(mu_); + auto it(added_secure_types_.find(type)); + if (it == added_secure_types_.end()) { + gpr_log(GPR_ERROR, "Unsupported credentials type %s.", type.c_str()); + return nullptr; + } + return it->second->GetChannelCredentials(args); } - return nullptr; } std::shared_ptr GetServerCredentials( @@ -80,35 +111,40 @@ class DefaultCredentialsProvider : public CredentialsProvider { ssl_opts.pem_key_cert_pairs.push_back(pkcp); return SslServerCredentials(ssl_opts); } else { - gpr_log(GPR_ERROR, "Unsupported credentials type %s.", type.c_str()); + grpc::unique_lock lock(mu_); + auto it(added_secure_types_.find(type)); + if (it == added_secure_types_.end()) { + gpr_log(GPR_ERROR, "Unsupported credentials type %s.", type.c_str()); + return nullptr; + } + return it->second->GetServerCredentials(); } - return nullptr; } std::vector GetSecureCredentialsTypeList() override { std::vector types; types.push_back(grpc::testing::kTlsCredentialsType); + grpc::unique_lock lock(mu_); + for (const auto& type_pair : added_secure_types_) { + types.push_back(type_pair.first); + } return types; } + + private: + grpc::mutex mu_; + std::unordered_map > + added_secure_types_; }; -gpr_once g_once_init_provider_mu = GPR_ONCE_INIT; -grpc::mutex* g_provider_mu = nullptr; +gpr_once g_once_init_provider = GPR_ONCE_INIT; CredentialsProvider* g_provider = nullptr; -void InitProviderMu() { - g_provider_mu = new grpc::mutex; -} - -grpc::mutex& GetMu() { - gpr_once_init(&g_once_init_provider_mu, &InitProviderMu); - return *g_provider_mu; +void CreateDefaultProvider() { + g_provider = new DefaultCredentialsProvider; } CredentialsProvider* GetProvider() { - grpc::unique_lock lock(GetMu()); - if (g_provider == nullptr) { - g_provider = new DefaultCredentialsProvider; - } + gpr_once_init(&g_once_init_provider, &CreateDefaultProvider); return g_provider; } @@ -117,15 +153,9 @@ CredentialsProvider* GetProvider() { namespace grpc { namespace testing { -// Note that it is not thread-safe to set a provider while concurrently using -// the previously set provider, as this deletes and replaces it. nullptr may be -// given to reset to the default. -void SetTestCredentialsProvider(std::unique_ptr provider) { - grpc::unique_lock lock(GetMu()); - if (g_provider != nullptr) { - delete g_provider; - } - g_provider = provider.release(); +void AddSecureType(const grpc::string& type, + std::unique_ptr type_provider) { + GetProvider()->AddSecureType(type, std::move(type_provider)); } std::shared_ptr GetChannelCredentials( diff --git a/test/cpp/util/test_credentials_provider.h b/test/cpp/util/test_credentials_provider.h index a6b547cb07..50fadb53a2 100644 --- a/test/cpp/util/test_credentials_provider.h +++ b/test/cpp/util/test_credentials_provider.h @@ -46,20 +46,21 @@ namespace testing { const char kInsecureCredentialsType[] = "INSECURE_CREDENTIALS"; const char kTlsCredentialsType[] = "TLS_CREDENTIALS"; -class CredentialsProvider { +// Provide test credentials of a particular type. +class CredentialTypeProvider { public: - virtual ~CredentialsProvider() {} + virtual ~CredentialTypeProvider() {} virtual std::shared_ptr GetChannelCredentials( - const grpc::string& type, ChannelArguments* args) = 0; - virtual std::shared_ptr GetServerCredentials( - const grpc::string& type) = 0; - virtual std::vector GetSecureCredentialsTypeList() = 0; + ChannelArguments* args) = 0; + virtual std::shared_ptr GetServerCredentials() = 0; }; -// Set the CredentialsProvider used by the other functions in this file. If this -// is not set, a default provider will be used. -void SetTestCredentialsProvider(std::unique_ptr provider); +// Add a secure type in addition to the defaults above +// (kInsecureCredentialsType, kTlsCredentialsType) that can be returned from the +// functions below. +void AddSecureType(const grpc::string& type, + std::unique_ptr type_provider); // Provide channel credentials according to the given type. Alter the channel // arguments if needed. -- cgit v1.2.3 From 188563f474372ca41400d8ff28c44959a1083075 Mon Sep 17 00:00:00 2001 From: Alistair Veitch Date: Thu, 25 Feb 2016 14:27:44 -0800 Subject: eliminate binary tags --- include/grpc/census.h | 82 +++++++++++---------------- src/core/census/context.c | 121 +++++++++++++--------------------------- test/core/census/context_test.c | 121 ++++++++++++++++++---------------------- 3 files changed, 127 insertions(+), 197 deletions(-) (limited to 'test') diff --git a/include/grpc/census.h b/include/grpc/census.h index dfa3bd7e0d..442a754f0a 100644 --- a/include/grpc/census.h +++ b/include/grpc/census.h @@ -80,18 +80,18 @@ CENSUSAPI int census_enabled(void); metrics will be recorded. Keys are unique within a context. */ typedef struct census_context census_context; -/* A tag is a key:value pair. The key is a non-empty, printable (UTF-8 - encoded), nil-terminated string. The value is a binary string, that may be - printable. There are limits on the sizes of both keys and values (see - CENSUS_MAX_TAG_KB_LEN definition below), and the number of tags that can be - propagated (CENSUS_MAX_PROPAGATED_TAGS). Users should also remember that - some systems may have limits on, e.g., the number of bytes that can be - transmitted as metadata, and that larger tags means more memory consumed - and time in processing. */ +/* A tag is a key:value pair. Both keys and values are nil-terminated strings, + containing printable ASCII characters (decimal 32-126). Keys must be at + least one character in length. Both keys and values can have at most + CENSUS_MAX_TAG_KB_LEN characters (including the terminating nil). The + maximum number of tags that can be propagated is + CENSUS_MAX_PROPAGATED_TAGS. Users should also remember that some systems + may have limits on, e.g., the number of bytes that can be transmitted as + metadata, and that larger tags means more memory consumed and time in + processing. */ typedef struct { const char *key; const char *value; - size_t value_len; uint8_t flags; } census_tag; @@ -103,28 +103,25 @@ typedef struct { /* Tag flags. */ #define CENSUS_TAG_PROPAGATE 1 /* Tag should be propagated over RPC */ #define CENSUS_TAG_STATS 2 /* Tag will be used for statistics aggregation */ -#define CENSUS_TAG_BINARY 4 /* Tag value is not printable */ -#define CENSUS_TAG_RESERVED 8 /* Reserved for internal use. */ -/* Flag values 8,16,32,64,128 are reserved for future/internal use. Clients +#define CENSUS_TAG_RESERVED 4 /* Reserved for internal use. */ +/* Flag values 4,8,16,32,64,128 are reserved for future/internal use. Clients should not use or rely on their values. */ #define CENSUS_TAG_IS_PROPAGATED(flags) (flags & CENSUS_TAG_PROPAGATE) #define CENSUS_TAG_IS_STATS(flags) (flags & CENSUS_TAG_STATS) -#define CENSUS_TAG_IS_BINARY(flags) (flags & CENSUS_TAG_BINARY) /* An instance of this structure is kept by every context, and records the basic information associated with the creation of that context. */ typedef struct { - int n_propagated_tags; /* number of propagated printable tags */ - int n_propagated_binary_tags; /* number of propagated binary tags */ - int n_local_tags; /* number of non-propagated (local) tags */ - int n_deleted_tags; /* number of tags that were deleted */ - int n_added_tags; /* number of tags that were added */ - int n_modified_tags; /* number of tags that were modified */ - int n_invalid_tags; /* number of tags with bad keys or values (e.g. - longer than CENSUS_MAX_TAG_KV_LEN) */ - int n_ignored_tags; /* number of tags ignored because of - CENSUS_MAX_PROPAGATED_TAGS limit. */ + int n_propagated_tags; /* number of propagated tags */ + int n_local_tags; /* number of non-propagated (local) tags */ + int n_deleted_tags; /* number of tags that were deleted */ + int n_added_tags; /* number of tags that were added */ + int n_modified_tags; /* number of tags that were modified */ + int n_invalid_tags; /* number of tags with bad keys or values (e.g. + longer than CENSUS_MAX_TAG_KV_LEN) */ + int n_ignored_tags; /* number of tags ignored because of + CENSUS_MAX_PROPAGATED_TAGS limit. */ } census_context_status; /* Create a new context, adding and removing tags from an existing context. @@ -132,10 +129,10 @@ typedef struct { to add as many tags in a single operation as is practical for the client. @param base Base context to build upon. Can be NULL. @param tags A set of tags to be added/changed/deleted. Tags with keys that - are in 'tags', but not 'base', are added to the tag set. Keys that are in + are in 'tags', but not 'base', are added to the context. Keys that are in both 'tags' and 'base' will have their value/flags modified. Tags with keys - in both, but with NULL or zero-length values, will be deleted from the tag - set. Tags with invalid (too long or short) keys or values will be ignored. + in both, but with NULL values, will be deleted from the context. Tags with + invalid (too long or short) keys or values will be ignored. If adding a tag will result in more than CENSUS_MAX_PROPAGATED_TAGS in either binary or non-binary tags, they will be ignored, as will deletions of tags that don't exist. @@ -185,32 +182,19 @@ CENSUSAPI int census_context_get_tag(const census_context *context, for use by RPC systems only, for purposes of transmitting/receiving contexts. */ -/* Encode a context into a buffer. The propagated tags are encoded into the - buffer in two regions: one for printable tags, and one for binary tags. +/* Encode a context into a buffer. @param context context to be encoded - @param buffer pointer to buffer. This address will be used to encode the - printable tags. + @param buffer buffer into which the context will be encoded. @param buf_size number of available bytes in buffer. - @param print_buf_size Will be set to the number of bytes consumed by - printable tags. - @param bin_buf_size Will be set to the number of bytes used to encode the - binary tags. - @return A pointer to the binary tag's encoded, or NULL if the buffer was - insufficiently large to hold the encoded tags. Thus, if successful, - printable tags are encoded into - [buffer, buffer + *print_buf_size) and binary tags into - [returned-ptr, returned-ptr + *bin_buf_size) (and the returned - pointer should be buffer + *print_buf_size) */ -CENSUSAPI char *census_context_encode(const census_context *context, - char *buffer, size_t buf_size, - size_t *print_buf_size, - size_t *bin_buf_size); - -/* Decode context buffers encoded with census_context_encode(). Returns NULL + @return The number of buffer bytes consumed for the encoded context, or + zero if the buffer was of insufficient size. */ +CENSUSAPI size_t census_context_encode(const census_context *context, + char *buffer, size_t buf_size); + +/* Decode context buffer encoded with census_context_encode(). Returns NULL if there is an error in parsing either buffer. */ -CENSUSAPI census_context *census_context_decode(const char *buffer, size_t size, - const char *bin_buffer, - size_t bin_size); +CENSUSAPI census_context *census_context_decode(const char *buffer, + size_t size); /* Distributed traces can have a number of options. */ enum census_trace_mask_values { diff --git a/src/core/census/context.c b/src/core/census/context.c index e60330de64..441d3b89a6 100644 --- a/src/core/census/context.c +++ b/src/core/census/context.c @@ -60,10 +60,6 @@ // limit of 255 for both CENSUS_MAX_TAG_KV_LEN and CENSUS_MAX_PROPAGATED_TAGS. // * Keep all tag information (keys/values/flags) in a single memory buffer, // that can be directly copied to the wire. -// * Binary tags share the same structure as, but are encoded separately from, -// non-binary tags. This is primarily because non-binary tags are far more -// likely to be repeated across multiple RPC calls, so are more efficiently -// cached and compressed in any metadata schemes. // Structure representing a set of tags. Essentially a count of number of tags // present, and pointer to a chunk of memory that contains the per-tag details. @@ -77,7 +73,7 @@ struct tag_set { char *kvm; // key/value memory. Consists of repeated entries of: // Offset Size Description // 0 1 Key length, including trailing 0. (K) - // 1 1 Value length. (V) + // 1 1 Value length, including trailing 0 (V) // 2 1 Flags // 3 K Key bytes // 3 + K V Value bytes @@ -108,19 +104,18 @@ struct raw_tag { #define CENSUS_TAG_DELETED CENSUS_TAG_RESERVED #define CENSUS_TAG_IS_DELETED(flags) (flags & CENSUS_TAG_DELETED) -// Primary (external) representation of a context. Composed of 3 underlying -// tag_set structs, one for each of the binary/printable propagated tags, and -// one for everything else. This is to efficiently support tag -// encoding/decoding. +// Primary representation of a context. Composed of 2 underlying tag_set +// structs, one each for propagated and local (non-propagated) tags. This is +// to efficiently support tag encoding/decoding. +// TODO(aveitch): need to add tracing id's/structure. struct census_context { - struct tag_set tags[3]; + struct tag_set tags[2]; census_context_status status; }; // Indices into the tags member of census_context #define PROPAGATED_TAGS 0 -#define PROPAGATED_BINARY_TAGS 1 -#define LOCAL_TAGS 2 +#define LOCAL_TAGS 1 // Extract a raw tag given a pointer (raw) to the tag header. Allow for some // extra bytes in the tag header (see encode/decode functions for usage: this @@ -166,9 +161,7 @@ static bool context_delete_tag(census_context *context, const census_tag *tag, size_t key_len) { return ( tag_set_delete_tag(&context->tags[LOCAL_TAGS], tag->key, key_len) || - tag_set_delete_tag(&context->tags[PROPAGATED_TAGS], tag->key, key_len) || - tag_set_delete_tag(&context->tags[PROPAGATED_BINARY_TAGS], tag->key, - key_len)); + tag_set_delete_tag(&context->tags[PROPAGATED_TAGS], tag->key, key_len)); } // Add a tag to a tag_set. Return true on success, false if the tag could @@ -176,11 +169,11 @@ static bool context_delete_tag(census_context *context, const census_tag *tag, // not be called if the tag may already exist (in a non-deleted state) in // the tag_set, as that would result in two tags with the same key. static bool tag_set_add_tag(struct tag_set *tags, const census_tag *tag, - size_t key_len) { + size_t key_len, size_t value_len) { if (tags->ntags == CENSUS_MAX_PROPAGATED_TAGS) { return false; } - const size_t tag_size = key_len + tag->value_len + TAG_HEADER_SIZE; + const size_t tag_size = key_len + value_len + TAG_HEADER_SIZE; if (tags->kvm_used + tag_size > tags->kvm_size) { // allocate new memory if needed tags->kvm_size += 2 * CENSUS_MAX_TAG_KV_LEN + TAG_HEADER_SIZE; @@ -191,13 +184,12 @@ static bool tag_set_add_tag(struct tag_set *tags, const census_tag *tag, } char *kvp = tags->kvm + tags->kvm_used; *kvp++ = (char)key_len; - *kvp++ = (char)tag->value_len; + *kvp++ = (char)value_len; // ensure reserved flags are not used. - *kvp++ = (char)(tag->flags & (CENSUS_TAG_PROPAGATE | CENSUS_TAG_STATS | - CENSUS_TAG_BINARY)); + *kvp++ = (char)(tag->flags & (CENSUS_TAG_PROPAGATE | CENSUS_TAG_STATS)); memcpy(kvp, tag->key, key_len); kvp += key_len; - memcpy(kvp, tag->value, tag->value_len); + memcpy(kvp, tag->value, value_len); tags->kvm_used += tag_size; tags->ntags++; tags->ntags_alloc++; @@ -207,30 +199,20 @@ static bool tag_set_add_tag(struct tag_set *tags, const census_tag *tag, // Add/modify/delete a tag to/in a context. Caller must validate that tag key // etc. are valid. static void context_modify_tag(census_context *context, const census_tag *tag, - size_t key_len) { + size_t key_len, size_t value_len) { // First delete the tag if it is already present. bool deleted = context_delete_tag(context, tag, key_len); - // Determine if we need to add it back. - bool call_add = tag->value != NULL && tag->value_len != 0; bool added = false; - if (call_add) { - if (CENSUS_TAG_IS_PROPAGATED(tag->flags)) { - if (CENSUS_TAG_IS_BINARY(tag->flags)) { - added = tag_set_add_tag(&context->tags[PROPAGATED_BINARY_TAGS], tag, - key_len); - } else { - added = tag_set_add_tag(&context->tags[PROPAGATED_TAGS], tag, key_len); - } - } else { - added = tag_set_add_tag(&context->tags[LOCAL_TAGS], tag, key_len); - } + if (CENSUS_TAG_IS_PROPAGATED(tag->flags)) { + added = tag_set_add_tag(&context->tags[PROPAGATED_TAGS], tag, key_len, + value_len); + } else { + added = + tag_set_add_tag(&context->tags[LOCAL_TAGS], tag, key_len, value_len); } + if (deleted) { - if (call_add) { - context->status.n_modified_tags++; - } else { - context->status.n_deleted_tags++; - } + context->status.n_modified_tags++; } else { if (added) { context->status.n_added_tags++; @@ -292,8 +274,6 @@ census_context *census_context_create(const census_context *base, memset(context, 0, sizeof(census_context)); } else { tag_set_copy(&context->tags[PROPAGATED_TAGS], &base->tags[PROPAGATED_TAGS]); - tag_set_copy(&context->tags[PROPAGATED_BINARY_TAGS], - &base->tags[PROPAGATED_BINARY_TAGS]); tag_set_copy(&context->tags[LOCAL_TAGS], &base->tags[LOCAL_TAGS]); memset(&context->status, 0, sizeof(context->status)); } @@ -303,20 +283,27 @@ census_context *census_context_create(const census_context *base, const census_tag *tag = &tags[i]; size_t key_len = strlen(tag->key) + 1; // ignore the tag if it is too long/short. - if (key_len != 1 && key_len <= CENSUS_MAX_TAG_KV_LEN && - tag->value_len <= CENSUS_MAX_TAG_KV_LEN) { - context_modify_tag(context, tag, key_len); + if (key_len != 1 && key_len <= CENSUS_MAX_TAG_KV_LEN) { + if (tag->value != NULL) { + size_t value_len = strlen(tag->value) + 1; + if (value_len <= CENSUS_MAX_TAG_KV_LEN) { + context_modify_tag(context, tag, key_len, value_len); + } else { + context->status.n_invalid_tags++; + } + } else { + if (context_delete_tag(context, tag, key_len)) { + context->status.n_deleted_tags++; + } + } } else { context->status.n_invalid_tags++; } } // Remove any deleted tags, update status if needed, and return. tag_set_flatten(&context->tags[PROPAGATED_TAGS]); - tag_set_flatten(&context->tags[PROPAGATED_BINARY_TAGS]); tag_set_flatten(&context->tags[LOCAL_TAGS]); context->status.n_propagated_tags = context->tags[PROPAGATED_TAGS].ntags; - context->status.n_propagated_binary_tags = - context->tags[PROPAGATED_BINARY_TAGS].ntags; context->status.n_local_tags = context->tags[LOCAL_TAGS].ntags; if (status) { *status = &context->status; @@ -331,7 +318,6 @@ const census_context_status *census_context_get_status( void census_context_destroy(census_context *context) { gpr_free(context->tags[PROPAGATED_TAGS].kvm); - gpr_free(context->tags[PROPAGATED_BINARY_TAGS].kvm); gpr_free(context->tags[LOCAL_TAGS].kvm); gpr_free(context); } @@ -343,9 +329,6 @@ void census_context_initialize_iterator(const census_context *context, if (context->tags[PROPAGATED_TAGS].ntags != 0) { iterator->base = PROPAGATED_TAGS; iterator->kvm = context->tags[PROPAGATED_TAGS].kvm; - } else if (context->tags[PROPAGATED_BINARY_TAGS].ntags != 0) { - iterator->base = PROPAGATED_BINARY_TAGS; - iterator->kvm = context->tags[PROPAGATED_BINARY_TAGS].kvm; } else if (context->tags[LOCAL_TAGS].ntags != 0) { iterator->base = LOCAL_TAGS; iterator->kvm = context->tags[LOCAL_TAGS].kvm; @@ -363,7 +346,6 @@ int census_context_next_tag(census_context_iterator *iterator, iterator->kvm = decode_tag(&raw, iterator->kvm, 0); tag->key = raw.key; tag->value = raw.value; - tag->value_len = raw.value_len; tag->flags = raw.flags; if (++iterator->index == iterator->context->tags[iterator->base].ntags) { do { @@ -388,7 +370,6 @@ static bool tag_set_get_tag(const struct tag_set *tags, const char *key, if (key_len == raw.key_len && memcmp(raw.key, key, key_len) == 0) { tag->key = raw.key; tag->value = raw.value; - tag->value_len = raw.value_len; tag->flags = raw.flags; return true; } @@ -403,8 +384,6 @@ int census_context_get_tag(const census_context *context, const char *key, return 0; } if (tag_set_get_tag(&context->tags[PROPAGATED_TAGS], key, key_len, tag) || - tag_set_get_tag(&context->tags[PROPAGATED_BINARY_TAGS], key, key_len, - tag) || tag_set_get_tag(&context->tags[LOCAL_TAGS], key, key_len, tag)) { return 1; } @@ -447,21 +426,9 @@ static size_t tag_set_encode(const struct tag_set *tags, char *buffer, return ENCODED_HEADER_SIZE + tags->kvm_used; } -char *census_context_encode(const census_context *context, char *buffer, - size_t buf_size, size_t *print_buf_size, - size_t *bin_buf_size) { - *print_buf_size = - tag_set_encode(&context->tags[PROPAGATED_TAGS], buffer, buf_size); - if (*print_buf_size == 0) { - return NULL; - } - char *b_buffer = buffer + *print_buf_size; - *bin_buf_size = tag_set_encode(&context->tags[PROPAGATED_BINARY_TAGS], - b_buffer, buf_size - *print_buf_size); - if (*bin_buf_size == 0) { - return NULL; - } - return b_buffer; +size_t census_context_encode(const census_context *context, char *buffer, + size_t buf_size) { + return tag_set_encode(&context->tags[PROPAGATED_TAGS], buffer, buf_size); } // Decode a tag set. @@ -506,8 +473,7 @@ static void tag_set_decode(struct tag_set *tags, const char *buffer, } } -census_context *census_context_decode(const char *buffer, size_t size, - const char *bin_buffer, size_t bin_size) { +census_context *census_context_decode(const char *buffer, size_t size) { census_context *context = gpr_malloc(sizeof(census_context)); memset(&context->tags[LOCAL_TAGS], 0, sizeof(struct tag_set)); if (buffer == NULL) { @@ -515,16 +481,7 @@ census_context *census_context_decode(const char *buffer, size_t size, } else { tag_set_decode(&context->tags[PROPAGATED_TAGS], buffer, size); } - if (bin_buffer == NULL) { - memset(&context->tags[PROPAGATED_BINARY_TAGS], 0, sizeof(struct tag_set)); - } else { - tag_set_decode(&context->tags[PROPAGATED_BINARY_TAGS], bin_buffer, - bin_size); - } memset(&context->status, 0, sizeof(context->status)); context->status.n_propagated_tags = context->tags[PROPAGATED_TAGS].ntags; - context->status.n_propagated_binary_tags = - context->tags[PROPAGATED_BINARY_TAGS].ntags; - // TODO(aveitch): check that BINARY flag is correct for each type. return context; } diff --git a/test/core/census/context_test.c b/test/core/census/context_test.c index 63e7103ddc..b59ac7c094 100644 --- a/test/core/census/context_test.c +++ b/test/core/census/context_test.c @@ -42,60 +42,48 @@ #include #include "test/core/util/test_config.h" -static uint8_t one_byte_val = 7; -static uint32_t four_byte_val = 0x12345678; -static uint64_t eight_byte_val = 0x1234567890abcdef; - -// A set of tags Used to create a basic context for testing. Each tag has a -// unique set of flags. Note that replace_add_delete_test() relies on specific -// offsets into this array - if you add or delete entries, you will also need -// to change the test. +// A set of tags Used to create a basic context for testing. Note that +// replace_add_delete_test() relies on specific offsets into this array - if +// you add or delete entries, you will also need to change the test. #define BASIC_TAG_COUNT 8 static census_tag basic_tags[BASIC_TAG_COUNT] = { - /* 0 */ {"key0", "printable", 10, 0}, - /* 1 */ {"k1", "a", 2, CENSUS_TAG_PROPAGATE}, - /* 2 */ {"k2", "longer printable string", 24, CENSUS_TAG_STATS}, - /* 3 */ {"key_three", (char *)&one_byte_val, 1, CENSUS_TAG_BINARY}, - /* 4 */ {"really_long_key_4", "random", 7, + /* 0 */ {"key0", "tag value", 0}, + /* 1 */ {"k1", "a", CENSUS_TAG_PROPAGATE}, + /* 2 */ {"k2", "a longer tag value supercalifragilisticexpialiadocious", + CENSUS_TAG_STATS}, + /* 3 */ {"key_three", "", 0}, + /* 4 */ {"a_really_really_really_really_long_key_4", "random", CENSUS_TAG_PROPAGATE | CENSUS_TAG_STATS}, - /* 5 */ {"k5", (char *)&four_byte_val, 4, - CENSUS_TAG_PROPAGATE | CENSUS_TAG_BINARY}, - /* 6 */ {"k6", (char *)&eight_byte_val, 8, - CENSUS_TAG_STATS | CENSUS_TAG_BINARY}, - /* 7 */ {"k7", (char *)&four_byte_val, 4, - CENSUS_TAG_PROPAGATE | CENSUS_TAG_STATS | CENSUS_TAG_BINARY}}; + /* 5 */ {"k5", "v5", CENSUS_TAG_PROPAGATE}, + /* 6 */ {"k6", "v6", CENSUS_TAG_STATS}, + /* 7 */ {"k7", "v7", CENSUS_TAG_PROPAGATE | CENSUS_TAG_STATS}}; // Set of tags used to modify the basic context. Note that // replace_add_delete_test() relies on specific offsets into this array - if // you add or delete entries, you will also need to change the test. Other // tests that rely on specific instances have XXX_XXX_OFFSET definitions (also // change the defines below if you add/delete entires). -#define MODIFY_TAG_COUNT 11 +#define MODIFY_TAG_COUNT 10 static census_tag modify_tags[MODIFY_TAG_COUNT] = { #define REPLACE_VALUE_OFFSET 0 - /* 0 */ {"key0", "replace printable", 18, 0}, // replaces tag value only + /* 0 */ {"key0", "replace key0", 0}, // replaces tag value only #define ADD_TAG_OFFSET 1 - /* 1 */ {"new_key", "xyzzy", 6, CENSUS_TAG_STATS}, // new tag + /* 1 */ {"new_key", "xyzzy", CENSUS_TAG_STATS}, // new tag #define DELETE_TAG_OFFSET 2 - /* 2 */ {"k5", NULL, 5, - 0}, // should delete tag, despite bogus value length - /* 3 */ {"k6", "foo", 0, 0}, // should delete tag, despite bogus value - /* 4 */ {"k6", "foo", 0, 0}, // try deleting already-deleted tag - /* 5 */ {"non-existent", NULL, 0, 0}, // another non-existent tag -#define REPLACE_FLAG_OFFSET 6 - /* 6 */ {"k1", "a", 2, 0}, // change flags only - /* 7 */ {"k7", "bar", 4, CENSUS_TAG_STATS}, // change flags and value - /* 8 */ {"k2", (char *)&eight_byte_val, 8, - CENSUS_TAG_BINARY | CENSUS_TAG_PROPAGATE}, // more flags change - // non-binary -> binary - /* 9 */ {"k6", "bar", 4, 0}, // add back tag, with different value - /* 10 */ {"foo", "bar", 4, CENSUS_TAG_PROPAGATE}, // another new tag + /* 2 */ {"k5", NULL, 0}, // should delete tag + /* 3 */ {"k5", NULL, 0}, // try deleting already-deleted tag + /* 4 */ {"non-existent", NULL, 0}, // delete non-existent tag +#define REPLACE_FLAG_OFFSET 5 + /* 5 */ {"k1", "a", 0}, // change flags only + /* 6 */ {"k7", "bar", CENSUS_TAG_STATS}, // change flags and value + /* 7 */ {"k2", "", CENSUS_TAG_PROPAGATE}, // more value and flags change + /* 8 */ {"k5", "bar", 0}, // add back tag, with different value + /* 9 */ {"foo", "bar", CENSUS_TAG_PROPAGATE}, // another new tag }; // Utility function to compare tags. Returns true if all fields match. static bool compare_tag(const census_tag *t1, const census_tag *t2) { - return (strcmp(t1->key, t2->key) == 0 && t1->value_len == t2->value_len && - memcmp(t1->value, t2->value, t1->value_len) == 0 && + return (strcmp(t1->key, t2->key) == 0 && strcmp(t1->value, t2->value) == 0 && t1->flags == t2->flags); } @@ -111,7 +99,7 @@ static void empty_test(void) { struct census_context *context = census_context_create(NULL, NULL, 0, NULL); GPR_ASSERT(context != NULL); const census_context_status *status = census_context_get_status(context); - census_context_status expected = {0, 0, 0, 0, 0, 0, 0, 0}; + census_context_status expected = {0, 0, 0, 0, 0, 0, 0}; GPR_ASSERT(memcmp(status, &expected, sizeof(expected)) == 0); census_context_destroy(context); } @@ -121,7 +109,7 @@ static void basic_test(void) { const census_context_status *status; struct census_context *context = census_context_create(NULL, basic_tags, BASIC_TAG_COUNT, &status); - census_context_status expected = {2, 2, 4, 0, 8, 0, 0, 0}; + census_context_status expected = {4, 4, 0, 8, 0, 0, 0}; GPR_ASSERT(memcmp(status, &expected, sizeof(expected)) == 0); census_context_iterator it; census_context_initialize_iterator(context, &it); @@ -161,15 +149,18 @@ static void invalid_test(void) { memset(key, 'k', 299); key[299] = 0; char value[300]; - memset(value, 'v', 300); - census_tag tag = {key, value, 3, CENSUS_TAG_BINARY}; + memset(value, 'v', 299); + value[299] = 0; + census_tag tag = {key, value, 0}; // long keys, short value. Key lengths (including terminator) should be // <= 255 (CENSUS_MAX_TAG_KV_LEN) + value[3] = 0; + GPR_ASSERT(strlen(value) == 3); GPR_ASSERT(strlen(key) == 299); const census_context_status *status; struct census_context *context = census_context_create(NULL, &tag, 1, &status); - census_context_status expected = {0, 0, 0, 0, 0, 0, 1, 0}; + census_context_status expected = {0, 0, 0, 0, 0, 1, 0}; GPR_ASSERT(memcmp(status, &expected, sizeof(expected)) == 0); census_context_destroy(context); key[CENSUS_MAX_TAG_KV_LEN] = 0; @@ -180,24 +171,28 @@ static void invalid_test(void) { key[CENSUS_MAX_TAG_KV_LEN - 1] = 0; GPR_ASSERT(strlen(key) == CENSUS_MAX_TAG_KV_LEN - 1); context = census_context_create(NULL, &tag, 1, &status); - census_context_status expected2 = {0, 0, 1, 0, 1, 0, 0, 0}; + census_context_status expected2 = {0, 1, 0, 1, 0, 0, 0}; GPR_ASSERT(memcmp(status, &expected2, sizeof(expected2)) == 0); census_context_destroy(context); // now try with long values - tag.value_len = 300; + value[3] = 'v'; + GPR_ASSERT(strlen(value) == 299); context = census_context_create(NULL, &tag, 1, &status); GPR_ASSERT(memcmp(status, &expected, sizeof(expected)) == 0); census_context_destroy(context); - tag.value_len = CENSUS_MAX_TAG_KV_LEN + 1; + value[CENSUS_MAX_TAG_KV_LEN] = 0; + GPR_ASSERT(strlen(value) == CENSUS_MAX_TAG_KV_LEN); context = census_context_create(NULL, &tag, 1, &status); GPR_ASSERT(memcmp(status, &expected, sizeof(expected)) == 0); census_context_destroy(context); - tag.value_len = CENSUS_MAX_TAG_KV_LEN; + value[CENSUS_MAX_TAG_KV_LEN - 1] = 0; + GPR_ASSERT(strlen(value) == CENSUS_MAX_TAG_KV_LEN - 1); context = census_context_create(NULL, &tag, 1, &status); GPR_ASSERT(memcmp(status, &expected2, sizeof(expected2)) == 0); census_context_destroy(context); // 0 length key. key[0] = 0; + GPR_ASSERT(strlen(key) == 0); context = census_context_create(NULL, &tag, 1, &status); GPR_ASSERT(memcmp(status, &expected, sizeof(expected)) == 0); census_context_destroy(context); @@ -210,7 +205,7 @@ static void copy_test(void) { const census_context_status *status; struct census_context *context2 = census_context_create(context, NULL, 0, &status); - census_context_status expected = {2, 2, 4, 0, 0, 0, 0, 0}; + census_context_status expected = {4, 4, 0, 0, 0, 0, 0}; GPR_ASSERT(memcmp(status, &expected, sizeof(expected)) == 0); for (int i = 0; i < BASIC_TAG_COUNT; i++) { census_tag tag; @@ -228,7 +223,7 @@ static void replace_value_test(void) { const census_context_status *status; struct census_context *context2 = census_context_create( context, modify_tags + REPLACE_VALUE_OFFSET, 1, &status); - census_context_status expected = {2, 2, 4, 0, 0, 1, 0, 0}; + census_context_status expected = {4, 4, 0, 0, 1, 0, 0}; GPR_ASSERT(memcmp(status, &expected, sizeof(expected)) == 0); census_tag tag; GPR_ASSERT(census_context_get_tag( @@ -245,7 +240,7 @@ static void replace_flags_test(void) { const census_context_status *status; struct census_context *context2 = census_context_create( context, modify_tags + REPLACE_FLAG_OFFSET, 1, &status); - census_context_status expected = {1, 2, 5, 0, 0, 1, 0, 0}; + census_context_status expected = {3, 5, 0, 0, 1, 0, 0}; GPR_ASSERT(memcmp(status, &expected, sizeof(expected)) == 0); census_tag tag; GPR_ASSERT(census_context_get_tag( @@ -262,7 +257,7 @@ static void delete_tag_test(void) { const census_context_status *status; struct census_context *context2 = census_context_create( context, modify_tags + DELETE_TAG_OFFSET, 1, &status); - census_context_status expected = {2, 1, 4, 1, 0, 0, 0, 0}; + census_context_status expected = {3, 4, 1, 0, 0, 0, 0}; GPR_ASSERT(memcmp(status, &expected, sizeof(expected)) == 0); census_tag tag; GPR_ASSERT(census_context_get_tag( @@ -278,7 +273,7 @@ static void add_tag_test(void) { const census_context_status *status; struct census_context *context2 = census_context_create(context, modify_tags + ADD_TAG_OFFSET, 1, &status); - census_context_status expected = {2, 2, 5, 0, 1, 0, 0, 0}; + census_context_status expected = {4, 5, 0, 1, 0, 0, 0}; GPR_ASSERT(memcmp(status, &expected, sizeof(expected)) == 0); census_tag tag; GPR_ASSERT(census_context_get_tag(context2, modify_tags[ADD_TAG_OFFSET].key, @@ -295,24 +290,24 @@ static void replace_add_delete_test(void) { const census_context_status *status; struct census_context *context2 = census_context_create(context, modify_tags, MODIFY_TAG_COUNT, &status); - census_context_status expected = {2, 1, 6, 2, 3, 4, 0, 2}; + census_context_status expected = {3, 7, 1, 3, 4, 0, 0}; GPR_ASSERT(memcmp(status, &expected, sizeof(expected)) == 0); // validate context contents. Use specific indices into the two arrays // holding tag values. GPR_ASSERT(validate_tag(context2, &basic_tags[3])); GPR_ASSERT(validate_tag(context2, &basic_tags[4])); + GPR_ASSERT(validate_tag(context2, &basic_tags[6])); GPR_ASSERT(validate_tag(context2, &modify_tags[0])); GPR_ASSERT(validate_tag(context2, &modify_tags[1])); + GPR_ASSERT(validate_tag(context2, &modify_tags[5])); GPR_ASSERT(validate_tag(context2, &modify_tags[6])); GPR_ASSERT(validate_tag(context2, &modify_tags[7])); GPR_ASSERT(validate_tag(context2, &modify_tags[8])); GPR_ASSERT(validate_tag(context2, &modify_tags[9])); - GPR_ASSERT(validate_tag(context2, &modify_tags[10])); GPR_ASSERT(!validate_tag(context2, &basic_tags[0])); GPR_ASSERT(!validate_tag(context2, &basic_tags[1])); GPR_ASSERT(!validate_tag(context2, &basic_tags[2])); GPR_ASSERT(!validate_tag(context2, &basic_tags[5])); - GPR_ASSERT(!validate_tag(context2, &basic_tags[6])); GPR_ASSERT(!validate_tag(context2, &basic_tags[7])); census_context_destroy(context); census_context_destroy(context2); @@ -325,21 +320,15 @@ static void encode_decode_test(void) { char buffer[BUF_SIZE]; struct census_context *context = census_context_create(NULL, basic_tags, BASIC_TAG_COUNT, NULL); - size_t print_bsize; - size_t bin_bsize; // Test with too small a buffer - GPR_ASSERT(census_context_encode(context, buffer, 2, &print_bsize, - &bin_bsize) == NULL); - char *b_buffer = census_context_encode(context, buffer, BUF_SIZE, - &print_bsize, &bin_bsize); - GPR_ASSERT(b_buffer != NULL && print_bsize > 0 && bin_bsize > 0 && - print_bsize + bin_bsize <= BUF_SIZE && - b_buffer == buffer + print_bsize); - census_context *context2 = - census_context_decode(buffer, print_bsize, b_buffer, bin_bsize); + GPR_ASSERT(census_context_encode(context, buffer, 2) == 0); + // Test with sufficient buffer + size_t buf_used = census_context_encode(context, buffer, BUF_SIZE); + GPR_ASSERT(buf_used != 0); + census_context *context2 = census_context_decode(buffer, buf_used); GPR_ASSERT(context2 != NULL); const census_context_status *status = census_context_get_status(context2); - census_context_status expected = {2, 2, 0, 0, 0, 0, 0, 0}; + census_context_status expected = {4, 0, 0, 0, 0, 0, 0}; GPR_ASSERT(memcmp(status, &expected, sizeof(expected)) == 0); for (int i = 0; i < BASIC_TAG_COUNT; i++) { census_tag tag; -- cgit v1.2.3 From 600e993e7a6cfa1b1c4d5b2d6e248cc6bc9ad0e0 Mon Sep 17 00:00:00 2001 From: Alistair Veitch Date: Fri, 26 Feb 2016 09:04:19 -0800 Subject: add checking of character values --- src/core/census/context.c | 36 +++++++++++++++++++++++++++++------- test/core/census/context_test.c | 16 ++++++++++++++++ 2 files changed, 45 insertions(+), 7 deletions(-) (limited to 'test') diff --git a/src/core/census/context.c b/src/core/census/context.c index 441d3b89a6..89b8ee0b39 100644 --- a/src/core/census/context.c +++ b/src/core/census/context.c @@ -61,6 +61,10 @@ // * Keep all tag information (keys/values/flags) in a single memory buffer, // that can be directly copied to the wire. +// min and max valid chars in tag keys and values. All printable ASCII is OK. +#define MIN_VALID_TAG_CHAR 32 // ' ' +#define MAX_VALID_TAG_CHAR 126 // '~' + // Structure representing a set of tags. Essentially a count of number of tags // present, and pointer to a chunk of memory that contains the per-tag details. struct tag_set { @@ -117,6 +121,24 @@ struct census_context { #define PROPAGATED_TAGS 0 #define LOCAL_TAGS 1 +// Validate (check all characters are in range and size is less than limit) a +// key or value string. Returns 0 if the string is invalid, or the length +// (including terminator) if valid. +static size_t validate_tag(const char *kv) { + size_t len = 1; + char ch; + while ((ch = *kv++) != 0) { + if (ch < MIN_VALID_TAG_CHAR || ch > MAX_VALID_TAG_CHAR) { + return 0; + } + len++; + } + if (len > CENSUS_MAX_TAG_KV_LEN) { + return 0; + } + return len; +} + // Extract a raw tag given a pointer (raw) to the tag header. Allow for some // extra bytes in the tag header (see encode/decode functions for usage: this // allows for future expansion of the tag header). @@ -281,12 +303,14 @@ census_context *census_context_create(const census_context *base, // the context to add/replace/delete as required. for (int i = 0; i < ntags; i++) { const census_tag *tag = &tags[i]; - size_t key_len = strlen(tag->key) + 1; - // ignore the tag if it is too long/short. - if (key_len != 1 && key_len <= CENSUS_MAX_TAG_KV_LEN) { + size_t key_len = validate_tag(tag->key); + // ignore the tag if it is invalid or too short. + if (key_len <= 1) { + context->status.n_invalid_tags++; + } else { if (tag->value != NULL) { - size_t value_len = strlen(tag->value) + 1; - if (value_len <= CENSUS_MAX_TAG_KV_LEN) { + size_t value_len = validate_tag(tag->value); + if (value_len != 0) { context_modify_tag(context, tag, key_len, value_len); } else { context->status.n_invalid_tags++; @@ -296,8 +320,6 @@ census_context *census_context_create(const census_context *base, context->status.n_deleted_tags++; } } - } else { - context->status.n_invalid_tags++; } } // Remove any deleted tags, update status if needed, and return. diff --git a/test/core/census/context_test.c b/test/core/census/context_test.c index b59ac7c094..ad4c337465 100644 --- a/test/core/census/context_test.c +++ b/test/core/census/context_test.c @@ -196,6 +196,22 @@ static void invalid_test(void) { context = census_context_create(NULL, &tag, 1, &status); GPR_ASSERT(memcmp(status, &expected, sizeof(expected)) == 0); census_context_destroy(context); + // invalid key character + key[0] = 31; // 32 (' ') is the first valid character value + key[1] = 0; + GPR_ASSERT(strlen(key) == 1); + context = census_context_create(NULL, &tag, 1, &status); + GPR_ASSERT(memcmp(status, &expected, sizeof(expected)) == 0); + census_context_destroy(context); + // invalid value character + key[0] = ' '; + value[5] = 127; // 127 (DEL) is ('~' + 1) + value[8] = 0; + GPR_ASSERT(strlen(key) == 1); + GPR_ASSERT(strlen(value) == 8); + context = census_context_create(NULL, &tag, 1, &status); + GPR_ASSERT(memcmp(status, &expected, sizeof(expected)) == 0); + census_context_destroy(context); } // Make a copy of a context -- cgit v1.2.3 From fa4b163feac008bf7147cdad4f2dd88ef778ad2c Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Fri, 26 Feb 2016 17:14:01 -0800 Subject: windows C# distribtest --- test/distrib/csharp/DistribTest.sln | 6 ++++++ test/distrib/csharp/DistribTest/DistribTest.csproj | 20 ++++++++++++++++++++ test/distrib/csharp/run_distrib_test.bat | 21 +++++++++++++++++++++ test/distrib/csharp/run_distrib_test.sh | 4 +--- test/distrib/csharp/update_version.sh | 10 +++++++++- tools/run_tests/distribtest_targets.py | 11 ++++++++++- 6 files changed, 67 insertions(+), 5 deletions(-) create mode 100644 test/distrib/csharp/run_distrib_test.bat (limited to 'test') diff --git a/test/distrib/csharp/DistribTest.sln b/test/distrib/csharp/DistribTest.sln index 0eca35c30f..78d5397ca9 100644 --- a/test/distrib/csharp/DistribTest.sln +++ b/test/distrib/csharp/DistribTest.sln @@ -8,13 +8,19 @@ EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU + Debug|x64 = Debug|x64 Release|Any CPU = Release|Any CPU + Release|x64 = Release|x64 EndGlobalSection GlobalSection(ProjectConfigurationPlatforms) = postSolution {A3E61CC3-3710-49A3-A830-A0066EDBCE2F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {A3E61CC3-3710-49A3-A830-A0066EDBCE2F}.Debug|Any CPU.Build.0 = Debug|Any CPU + {A3E61CC3-3710-49A3-A830-A0066EDBCE2F}.Debug|x64.ActiveCfg = Debug|x64 + {A3E61CC3-3710-49A3-A830-A0066EDBCE2F}.Debug|x64.Build.0 = Debug|x64 {A3E61CC3-3710-49A3-A830-A0066EDBCE2F}.Release|Any CPU.ActiveCfg = Release|Any CPU {A3E61CC3-3710-49A3-A830-A0066EDBCE2F}.Release|Any CPU.Build.0 = Release|Any CPU + {A3E61CC3-3710-49A3-A830-A0066EDBCE2F}.Release|x64.ActiveCfg = Release|x64 + {A3E61CC3-3710-49A3-A830-A0066EDBCE2F}.Release|x64.Build.0 = Release|x64 EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/test/distrib/csharp/DistribTest/DistribTest.csproj b/test/distrib/csharp/DistribTest/DistribTest.csproj index 124fc1bdf0..7605495f0f 100644 --- a/test/distrib/csharp/DistribTest/DistribTest.csproj +++ b/test/distrib/csharp/DistribTest/DistribTest.csproj @@ -32,6 +32,26 @@ prompt 4 + + true + bin\x64\Debug\ + DEBUG;TRACE + full + x64 + prompt + MinimumRecommendedRules.ruleset + true + + + bin\x64\Release\ + TRACE + true + pdbonly + x64 + prompt + MinimumRecommendedRules.ruleset + true + ..\packages\BouncyCastle.1.7.0\lib\Net40-Client\BouncyCastle.Crypto.dll diff --git a/test/distrib/csharp/run_distrib_test.bat b/test/distrib/csharp/run_distrib_test.bat new file mode 100644 index 0000000000..950894f668 --- /dev/null +++ b/test/distrib/csharp/run_distrib_test.bat @@ -0,0 +1,21 @@ + +@rem enter this directory +cd /d %~dp0 + +@rem extract input artifacts +powershell -Command "Add-Type -Assembly 'System.IO.Compression.FileSystem'; [System.IO.Compression.ZipFile]::ExtractToDirectory('../../../input_artifacts/csharp_nugets.zip', 'TestNugetFeed');" + +update_version.sh auto + +set NUGET=C:\nuget\nuget.exe +%NUGET% restore || goto :error + +@call build_vs2015.bat DistribTest.sln %MSBUILD_EXTRA_ARGS% || goto :error + +%DISTRIBTEST_OUTPATH%\DistribTest.exe || goto :error + +goto :EOF + +:error +echo Failed! +exit /b %errorlevel% diff --git a/test/distrib/csharp/run_distrib_test.sh b/test/distrib/csharp/run_distrib_test.sh index 1de62041b3..934174a9a4 100755 --- a/test/distrib/csharp/run_distrib_test.sh +++ b/test/distrib/csharp/run_distrib_test.sh @@ -34,9 +34,7 @@ cd $(dirname $0) unzip -o "$EXTERNAL_GIT_ROOT/input_artifacts/csharp_nugets.zip" -d TestNugetFeed -# Extract the version number from Grpc nuget package name. -CSHARP_VERSION=$(ls TestNugetFeed | grep '^Grpc\.[0-9].*\.nupkg$' | sed s/^Grpc\.// | sed s/\.nupkg$//) -./update_version.sh $CSHARP_VERSION +./update_version.sh auto nuget restore diff --git a/test/distrib/csharp/update_version.sh b/test/distrib/csharp/update_version.sh index f2554e8998..b0d07721f6 100755 --- a/test/distrib/csharp/update_version.sh +++ b/test/distrib/csharp/update_version.sh @@ -32,5 +32,13 @@ set -e cd $(dirname $0) +CSHARP_VERSION="$1" +if [ "$CSHARP_VERSION" == "auto" ] +then + # autodetect C# version + CSHARP_VERSION=$(ls TestNugetFeed | grep '^Grpc\.[0-9].*\.nupkg$' | sed s/^Grpc\.// | sed s/\.nupkg$//) + echo "Autodetected nuget ${CSHARP_VERSION}" +fi + # Replaces version placeholder with value provided as first argument. -sed -ibak "s/__GRPC_NUGET_VERSION__/$1/g" DistribTest/packages.config DistribTest/DistribTest.csproj +sed -ibak "s/__GRPC_NUGET_VERSION__/${CSHARP_VERSION}/g" DistribTest/packages.config DistribTest/DistribTest.csproj diff --git a/tools/run_tests/distribtest_targets.py b/tools/run_tests/distribtest_targets.py index 933103f0a0..fb951b68f9 100644 --- a/tools/run_tests/distribtest_targets.py +++ b/tools/run_tests/distribtest_targets.py @@ -97,7 +97,14 @@ class CSharpDistribTest(object): ['test/distrib/csharp/run_distrib_test.sh'], environ={'EXTERNAL_GIT_ROOT': '../../..'}) else: - raise Exception("Not supported yet.") + if self.arch == 'x64': + environ={'MSBUILD_EXTRA_ARGS': '/p:Platform=x64', + 'DISTRIBTEST_OUTPATH': 'DistribTest\\bin\\x64\\Debug'} + else: + environ={'DISTRIBTEST_OUTPATH': 'DistribTest\\bin\\\Debug'} + return create_jobspec(self.name, + ['test\\distrib\\csharp\\run_distrib_test.bat'], + environ=environ) def __str__(self): return self.name @@ -240,6 +247,8 @@ def targets(): CSharpDistribTest('linux', 'x64', 'ubuntu1510'), CSharpDistribTest('linux', 'x64', 'ubuntu1604'), CSharpDistribTest('macos', 'x86'), + CSharpDistribTest('windows', 'x86'), + CSharpDistribTest('windows', 'x64'), PythonDistribTest('linux', 'x64', 'wheezy'), PythonDistribTest('linux', 'x64', 'jessie'), PythonDistribTest('linux', 'x86', 'jessie'), -- cgit v1.2.3 From e1dd18a945ea11d1eba412ea0483b3996a222c3b Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Mon, 29 Feb 2016 13:28:55 -0800 Subject: Fix copyright --- test/cpp/interop/metrics_client.cc | 2 +- tools/gcp/utils/big_query_utils.py | 2 +- tools/gcp/utils/kubernetes_api.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) (limited to 'test') diff --git a/test/cpp/interop/metrics_client.cc b/test/cpp/interop/metrics_client.cc index cc304f2e89..bd48c7d4ef 100644 --- a/test/cpp/interop/metrics_client.cc +++ b/test/cpp/interop/metrics_client.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/tools/gcp/utils/big_query_utils.py b/tools/gcp/utils/big_query_utils.py index e2379fd1aa..7bb1e14354 100755 --- a/tools/gcp/utils/big_query_utils.py +++ b/tools/gcp/utils/big_query_utils.py @@ -1,5 +1,5 @@ #!/usr/bin/env python2.7 -# Copyright 2015-2016 Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without diff --git a/tools/gcp/utils/kubernetes_api.py b/tools/gcp/utils/kubernetes_api.py index 2d3f771e93..e8ddd2f1b3 100755 --- a/tools/gcp/utils/kubernetes_api.py +++ b/tools/gcp/utils/kubernetes_api.py @@ -1,5 +1,5 @@ #!/usr/bin/env python2.7 -# Copyright 2015-2016 Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without -- cgit v1.2.3 From 3f1aa9b99ae6752cc22cab2707b1d8c3846f21be Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Mon, 29 Feb 2016 15:15:23 -0800 Subject: add copyright and cleanup python code --- test/distrib/csharp/run_distrib_test.bat | 28 ++++++++++++++++++++++++++++ tools/run_tests/distribtest_targets.py | 4 +++- 2 files changed, 31 insertions(+), 1 deletion(-) (limited to 'test') diff --git a/test/distrib/csharp/run_distrib_test.bat b/test/distrib/csharp/run_distrib_test.bat index 950894f668..67bfc58ac8 100644 --- a/test/distrib/csharp/run_distrib_test.bat +++ b/test/distrib/csharp/run_distrib_test.bat @@ -1,3 +1,31 @@ +@rem Copyright 2016, Google Inc. +@rem All rights reserved. +@rem +@rem Redistribution and use in source and binary forms, with or without +@rem modification, are permitted provided that the following conditions are +@rem met: +@rem +@rem * Redistributions of source code must retain the above copyright +@rem notice, this list of conditions and the following disclaimer. +@rem * Redistributions in binary form must reproduce the above +@rem copyright notice, this list of conditions and the following disclaimer +@rem in the documentation and/or other materials provided with the +@rem distribution. +@rem * Neither the name of Google Inc. nor the names of its +@rem contributors may be used to endorse or promote products derived from +@rem this software without specific prior written permission. +@rem +@rem THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +@rem "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +@rem LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +@rem A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +@rem OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +@rem SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +@rem LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +@rem DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +@rem THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +@rem (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +@rem OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. @rem enter this directory cd /d %~dp0 diff --git a/tools/run_tests/distribtest_targets.py b/tools/run_tests/distribtest_targets.py index fb951b68f9..34cc1cd710 100644 --- a/tools/run_tests/distribtest_targets.py +++ b/tools/run_tests/distribtest_targets.py @@ -96,7 +96,7 @@ class CSharpDistribTest(object): return create_jobspec(self.name, ['test/distrib/csharp/run_distrib_test.sh'], environ={'EXTERNAL_GIT_ROOT': '../../..'}) - else: + elif self.platform == 'windows': if self.arch == 'x64': environ={'MSBUILD_EXTRA_ARGS': '/p:Platform=x64', 'DISTRIBTEST_OUTPATH': 'DistribTest\\bin\\x64\\Debug'} @@ -105,6 +105,8 @@ class CSharpDistribTest(object): return create_jobspec(self.name, ['test\\distrib\\csharp\\run_distrib_test.bat'], environ=environ) + else: + raise Exception("Not supported yet.") def __str__(self): return self.name -- cgit v1.2.3 From 8d543e8e309fb95cc423c19724197ffde2f5dd28 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Mon, 29 Feb 2016 18:22:25 -0800 Subject: Fix ResponseStreamServerCancelAfter test flake --- test/cpp/end2end/end2end_test.cc | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) (limited to 'test') diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 42757974b2..dc2c4f6426 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -437,9 +437,10 @@ class End2endServerTryCancelTest : public End2endTest { break; case CANCEL_AFTER_PROCESSING: - // Server cancelled after writing all messages. Client must have read - // all messages - EXPECT_EQ(num_msgs_read, kNumResponseStreamsMsgs); + // Even though the Server cancelled after writing all messages, the RPC + // may be cancelled before the Client got a chance to read all the + // messages. + EXPECT_LE(num_msgs_read, kNumResponseStreamsMsgs); break; default: { @@ -519,7 +520,11 @@ class End2endServerTryCancelTest : public End2endTest { case CANCEL_AFTER_PROCESSING: EXPECT_EQ(num_msgs_sent, num_messages); - EXPECT_EQ(num_msgs_read, num_msgs_sent); + + // The Server cancelled after reading the last message and after writing + // the message to the client. However, the RPC cancellation might have + // taken effect before the client actually read the response. + EXPECT_LE(num_msgs_read, num_msgs_sent); break; default: -- cgit v1.2.3 From 7fe08a23f17701266e69a3d3d3cab642482e7cdf Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 29 Feb 2016 20:17:48 -0800 Subject: clang-format --- test/cpp/util/test_credentials_provider.cc | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) (limited to 'test') diff --git a/test/cpp/util/test_credentials_provider.cc b/test/cpp/util/test_credentials_provider.cc index 65d3205767..e314fd6d75 100644 --- a/test/cpp/util/test_credentials_provider.cc +++ b/test/cpp/util/test_credentials_provider.cc @@ -139,9 +139,7 @@ class DefaultCredentialsProvider : public CredentialsProvider { gpr_once g_once_init_provider = GPR_ONCE_INIT; CredentialsProvider* g_provider = nullptr; -void CreateDefaultProvider() { - g_provider = new DefaultCredentialsProvider; -} +void CreateDefaultProvider() { g_provider = new DefaultCredentialsProvider; } CredentialsProvider* GetProvider() { gpr_once_init(&g_once_init_provider, &CreateDefaultProvider); -- cgit v1.2.3 From 7c075b39538e047f4dd4d114b5cb7647ef4ce01a Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 29 Feb 2016 21:46:06 -0800 Subject: Remove broken test This test is inherently flaky and I don't see any way to make it not so. Historically this test has not (in my memory) given any signal that something is actually broken. Let's save maintenance and just nuke it. --- test/core/iomgr/tcp_client_posix_test.c | 93 --------------------------------- 1 file changed, 93 deletions(-) (limited to 'test') diff --git a/test/core/iomgr/tcp_client_posix_test.c b/test/core/iomgr/tcp_client_posix_test.c index 1e6fa5d45a..746dfd85be 100644 --- a/test/core/iomgr/tcp_client_posix_test.c +++ b/test/core/iomgr/tcp_client_posix_test.c @@ -178,98 +178,6 @@ void test_fails(void) { grpc_exec_ctx_finish(&exec_ctx); } -void test_times_out(void) { - struct sockaddr_in addr; - socklen_t addr_len = sizeof(addr); - int svr_fd; -#define NUM_CLIENT_CONNECTS 100 - int client_fd[NUM_CLIENT_CONNECTS]; - int i; - int r; - int connections_complete_before; - gpr_timespec connect_deadline; - grpc_closure done; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - - gpr_log(GPR_DEBUG, "test_times_out"); - - memset(&addr, 0, sizeof(addr)); - addr.sin_family = AF_INET; - - /* create a dummy server */ - svr_fd = socket(AF_INET, SOCK_STREAM, 0); - GPR_ASSERT(svr_fd >= 0); - GPR_ASSERT(0 == bind(svr_fd, (struct sockaddr *)&addr, addr_len)); - GPR_ASSERT(0 == listen(svr_fd, 1)); - /* Get its address */ - GPR_ASSERT(getsockname(svr_fd, (struct sockaddr *)&addr, &addr_len) == 0); - - /* tie up the listen buffer, which is somewhat arbitrarily sized. */ - for (i = 0; i < NUM_CLIENT_CONNECTS; ++i) { - client_fd[i] = socket(AF_INET, SOCK_STREAM, 0); - grpc_set_socket_nonblocking(client_fd[i], 1); - do { - r = connect(client_fd[i], (struct sockaddr *)&addr, addr_len); - } while (r == -1 && errno == EINTR); - GPR_ASSERT(r < 0); - GPR_ASSERT(errno == EWOULDBLOCK || errno == EINPROGRESS); - } - - /* connect to dummy server address */ - - connect_deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1); - - gpr_mu_lock(g_mu); - connections_complete_before = g_connections_complete; - gpr_mu_unlock(g_mu); - - grpc_closure_init(&done, must_fail, NULL); - grpc_tcp_client_connect(&exec_ctx, &done, &g_connecting, g_pollset_set, - (struct sockaddr *)&addr, addr_len, connect_deadline); - - /* Make sure the event doesn't trigger early */ - gpr_mu_lock(g_mu); - for (;;) { - grpc_pollset_worker *worker = NULL; - gpr_timespec now = gpr_now(connect_deadline.clock_type); - gpr_timespec continue_verifying_time = - gpr_time_from_seconds(5, GPR_TIMESPAN); - gpr_timespec grace_time = gpr_time_from_seconds(3, GPR_TIMESPAN); - gpr_timespec finish_time = - gpr_time_add(connect_deadline, continue_verifying_time); - gpr_timespec restart_verifying_time = - gpr_time_add(connect_deadline, grace_time); - int is_after_deadline = gpr_time_cmp(now, connect_deadline) > 0; - if (gpr_time_cmp(now, finish_time) > 0) { - break; - } - gpr_log(GPR_DEBUG, "now=%lld.%09d connect_deadline=%lld.%09d", - (long long)now.tv_sec, (int)now.tv_nsec, - (long long)connect_deadline.tv_sec, (int)connect_deadline.tv_nsec); - if (is_after_deadline && gpr_time_cmp(now, restart_verifying_time) <= 0) { - /* allow some slack before insisting that things be done */ - } else { - GPR_ASSERT(g_connections_complete == - connections_complete_before + is_after_deadline); - } - gpr_timespec polling_deadline = GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10); - if (!grpc_timer_check(&exec_ctx, now, &polling_deadline)) { - grpc_pollset_work(&exec_ctx, g_pollset, &worker, now, polling_deadline); - } - gpr_mu_unlock(g_mu); - grpc_exec_ctx_flush(&exec_ctx); - gpr_mu_lock(g_mu); - } - gpr_mu_unlock(g_mu); - - grpc_exec_ctx_finish(&exec_ctx); - - close(svr_fd); - for (i = 0; i < NUM_CLIENT_CONNECTS; ++i) { - close(client_fd[i]); - } -} - static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool success) { grpc_pollset_destroy(p); } @@ -287,7 +195,6 @@ int main(int argc, char **argv) { test_succeeds(); gpr_log(GPR_ERROR, "End of first test"); test_fails(); - test_times_out(); grpc_pollset_set_destroy(g_pollset_set); grpc_closure_init(&destroyed, destroy_pollset, g_pollset); grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); -- cgit v1.2.3 From 9e5a05af8a322991ce4f909ce84a58889b1f151b Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Tue, 1 Mar 2016 09:40:12 -0800 Subject: Fix clang format issue --- test/cpp/util/test_credentials_provider.cc | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) (limited to 'test') diff --git a/test/cpp/util/test_credentials_provider.cc b/test/cpp/util/test_credentials_provider.cc index 65d3205767..e314fd6d75 100644 --- a/test/cpp/util/test_credentials_provider.cc +++ b/test/cpp/util/test_credentials_provider.cc @@ -139,9 +139,7 @@ class DefaultCredentialsProvider : public CredentialsProvider { gpr_once g_once_init_provider = GPR_ONCE_INIT; CredentialsProvider* g_provider = nullptr; -void CreateDefaultProvider() { - g_provider = new DefaultCredentialsProvider; -} +void CreateDefaultProvider() { g_provider = new DefaultCredentialsProvider; } CredentialsProvider* GetProvider() { gpr_once_init(&g_once_init_provider, &CreateDefaultProvider); -- cgit v1.2.3 From 6c016efa34113916810cf16a0a0981c637204f50 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Tue, 1 Mar 2016 16:27:53 -0800 Subject: ServerTryCancel was not actually respecting the API since it could be an arbitrary amount of time between when the cancel is tried and actually observable. --- test/cpp/end2end/test_service_impl.cc | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'test') diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc index 66d11d0dfc..7c3e514eff 100644 --- a/test/cpp/end2end/test_service_impl.cc +++ b/test/cpp/end2end/test_service_impl.cc @@ -326,7 +326,11 @@ void TestServiceImpl::ServerTryCancel(ServerContext* context) { EXPECT_FALSE(context->IsCancelled()); context->TryCancel(); gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request"); - EXPECT_TRUE(context->IsCancelled()); + // Now wait until it's really canceled + while (!context->IsCancelled()) { + gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(1000, GPR_TIMESPAN))); + } } } // namespace testing -- cgit v1.2.3 From bd50f305a3af3c45aadb2b7eb7f4752475f6d820 Mon Sep 17 00:00:00 2001 From: makdharma Date: Wed, 2 Mar 2016 13:49:51 -0800 Subject: Update reconnect_interop_client.cc --- test/cpp/interop/reconnect_interop_client.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'test') diff --git a/test/cpp/interop/reconnect_interop_client.cc b/test/cpp/interop/reconnect_interop_client.cc index 1f6b352db1..3ad733e111 100644 --- a/test/cpp/interop/reconnect_interop_client.cc +++ b/test/cpp/interop/reconnect_interop_client.cc @@ -30,7 +30,7 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ - +// Test description at doc/connection-backoff-interop-test-description.md #include #include -- cgit v1.2.3 From 38a560b6ba45bb8db16fac609a72a6c235469841 Mon Sep 17 00:00:00 2001 From: makdharma Date: Wed, 2 Mar 2016 13:50:58 -0800 Subject: Update reconnect_interop_server.cc --- test/cpp/interop/reconnect_interop_server.cc | 2 ++ 1 file changed, 2 insertions(+) (limited to 'test') diff --git a/test/cpp/interop/reconnect_interop_server.cc b/test/cpp/interop/reconnect_interop_server.cc index 3602b8c2b0..785f9c7ad5 100644 --- a/test/cpp/interop/reconnect_interop_server.cc +++ b/test/cpp/interop/reconnect_interop_server.cc @@ -30,6 +30,8 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ + +// Test description at doc/connection-backoff-interop-test-description.md #include #include -- cgit v1.2.3 From c6611efb67e8f5eae0451963e98df1890dcdb3d5 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 2 Mar 2016 17:43:09 -0800 Subject: Revert "Update reconnect_interop_client.cc" --- test/cpp/interop/reconnect_interop_client.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'test') diff --git a/test/cpp/interop/reconnect_interop_client.cc b/test/cpp/interop/reconnect_interop_client.cc index 3ad733e111..1f6b352db1 100644 --- a/test/cpp/interop/reconnect_interop_client.cc +++ b/test/cpp/interop/reconnect_interop_client.cc @@ -30,7 +30,7 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ -// Test description at doc/connection-backoff-interop-test-description.md + #include #include -- cgit v1.2.3 From 98990726a02829c29674f143ab6e02df5415514d Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 2 Mar 2016 22:04:00 -0800 Subject: Revert "Update reconnect_interop_server.cc" --- test/cpp/interop/reconnect_interop_server.cc | 2 -- 1 file changed, 2 deletions(-) (limited to 'test') diff --git a/test/cpp/interop/reconnect_interop_server.cc b/test/cpp/interop/reconnect_interop_server.cc index 785f9c7ad5..3602b8c2b0 100644 --- a/test/cpp/interop/reconnect_interop_server.cc +++ b/test/cpp/interop/reconnect_interop_server.cc @@ -30,8 +30,6 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ - -// Test description at doc/connection-backoff-interop-test-description.md #include #include -- cgit v1.2.3 From 0cb803d9ca4286601e9e6a3240cfa3488b662b7c Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 2 Mar 2016 22:17:24 -0800 Subject: Always ref writable streams We suffered a bug whereby doing a follow-up write to another write could resurrect a deleted stream, causing all sorts of crash. Fix: when a stream becomes writable (vs when we start writing) take a ref on the stream, and only relinquish it once we're done writing. --- grpc.def | 1 + include/grpc/impl/codegen/sync.h | 4 ++ src/core/iomgr/iomgr.c | 16 ++++++++ src/core/iomgr/iomgr_internal.h | 6 ++- src/core/support/sync.c | 7 +++- src/core/transport/chttp2/internal.h | 18 +++++---- src/core/transport/chttp2/parsing.c | 6 +-- src/core/transport/chttp2/stream_lists.c | 38 ++++++++++--------- src/core/transport/chttp2/writing.c | 37 +++++++------------ src/core/transport/chttp2_transport.c | 43 +++++++++++++--------- src/core/transport/metadata.c | 8 ++++ src/core/transport/transport.c | 2 +- src/python/grpcio/grpc/_cython/imports.generated.c | 2 + src/python/grpcio/grpc/_cython/imports.generated.h | 3 ++ src/ruby/ext/grpc/rb_grpc_imports.generated.c | 2 + src/ruby/ext/grpc/rb_grpc_imports.generated.h | 3 ++ test/cpp/interop/reconnect_interop_client.cc | 2 +- 17 files changed, 127 insertions(+), 71 deletions(-) (limited to 'test') diff --git a/grpc.def b/grpc.def index bd0bc85a7c..f81aa1b05a 100644 --- a/grpc.def +++ b/grpc.def @@ -182,6 +182,7 @@ EXPORTS gpr_event_wait gpr_ref_init gpr_ref + gpr_ref_non_zero gpr_refn gpr_unref gpr_stats_init diff --git a/include/grpc/impl/codegen/sync.h b/include/grpc/impl/codegen/sync.h index d2f19d37d6..6fd7d64b29 100644 --- a/include/grpc/impl/codegen/sync.h +++ b/include/grpc/impl/codegen/sync.h @@ -182,6 +182,10 @@ GPRAPI void gpr_ref_init(gpr_refcount *r, int n); /* Increment the reference count *r. Requires *r initialized. */ GPRAPI void gpr_ref(gpr_refcount *r); +/* Increment the reference count *r. Requires *r initialized. + Crashes if refcount is zero */ +GPRAPI void gpr_ref_non_zero(gpr_refcount *r); + /* Increment the reference count *r by n. Requires *r initialized, n > 0. */ GPRAPI void gpr_refn(gpr_refcount *r, int n); diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index 04580150f3..9c89c2c08a 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -41,9 +41,11 @@ #include #include #include +#include #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/timer.h" +#include "src/core/support/env.h" #include "src/core/support/string.h" static gpr_mu g_mu; @@ -116,6 +118,9 @@ void grpc_iomgr_shutdown(void) { "memory leaks are likely", count_objects()); dump_objects("LEAKED"); + if (grpc_iomgr_abort_on_leaks()) { + abort(); + } } break; } @@ -154,3 +159,14 @@ void grpc_iomgr_unregister_object(grpc_iomgr_object *obj) { gpr_mu_unlock(&g_mu); gpr_free(obj->name); } + +bool grpc_iomgr_abort_on_leaks(void) { + char *env = gpr_getenv("GRPC_ABORT_ON_LEAKS"); + if (env == NULL) return false; + static const char *truthy[] = {"yes", "Yes", "YES", "true", + "True", "TRUE", "1"}; + for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) { + if (0 == strcmp(env, truthy[i])) return true; + } + return false; +} diff --git a/src/core/iomgr/iomgr_internal.h b/src/core/iomgr/iomgr_internal.h index e372c18e8a..ac2c46ebe6 100644 --- a/src/core/iomgr/iomgr_internal.h +++ b/src/core/iomgr/iomgr_internal.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -34,6 +34,8 @@ #ifndef GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H #define GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H +#include + #include "src/core/iomgr/iomgr.h" #include @@ -55,4 +57,6 @@ void grpc_iomgr_platform_flush(void); /** tear down all platform specific global iomgr structures */ void grpc_iomgr_platform_shutdown(void); +bool grpc_iomgr_abort_on_leaks(void); + #endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H */ diff --git a/src/core/support/sync.c b/src/core/support/sync.c index d368422d9e..69e3e39c5c 100644 --- a/src/core/support/sync.c +++ b/src/core/support/sync.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -98,6 +98,11 @@ void gpr_ref_init(gpr_refcount *r, int n) { gpr_atm_rel_store(&r->count, n); } void gpr_ref(gpr_refcount *r) { gpr_atm_no_barrier_fetch_add(&r->count, 1); } +void gpr_ref_non_zero(gpr_refcount *r) { + gpr_atm prior = gpr_atm_no_barrier_fetch_add(&r->count, 1); + GPR_ASSERT(prior > 0); +} + void gpr_refn(gpr_refcount *r, int n) { gpr_atm_no_barrier_fetch_add(&r->count, n); } diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index d76d31be23..891aad6ef2 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -417,7 +417,7 @@ typedef struct { /** HTTP2 stream id for this stream, or zero if one has not been assigned */ uint32_t id; uint8_t fetching; - uint8_t sent_initial_metadata; + bool sent_initial_metadata; uint8_t sent_message; uint8_t sent_trailing_metadata; uint8_t read_closed; @@ -509,7 +509,7 @@ void grpc_chttp2_publish_reads(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *global, grpc_chttp2_transport_parsing *parsing); -void grpc_chttp2_list_add_writable_stream( +bool grpc_chttp2_list_add_writable_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); /** Get a writable stream @@ -519,14 +519,13 @@ int grpc_chttp2_list_pop_writable_stream( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_global **stream_global, grpc_chttp2_stream_writing **stream_writing); -void grpc_chttp2_list_remove_writable_stream( +bool grpc_chttp2_list_remove_writable_stream( grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global); + grpc_chttp2_stream_global *stream_global) GRPC_MUST_USE_RESULT; -/* returns 1 if stream added, 0 if it was already present */ -int grpc_chttp2_list_add_writing_stream( +void grpc_chttp2_list_add_writing_stream( grpc_chttp2_transport_writing *transport_writing, - grpc_chttp2_stream_writing *stream_writing) GRPC_MUST_USE_RESULT; + grpc_chttp2_stream_writing *stream_writing); int grpc_chttp2_list_have_writing_streams( grpc_chttp2_transport_writing *transport_writing); int grpc_chttp2_list_pop_writing_stream( @@ -770,4 +769,9 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *parsing, const uint8_t *opaque_8bytes); +/** add a ref to the stream and add it to the writable list; + ref will be dropped in writing.c */ +void grpc_chttp2_become_writable(grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global); + #endif diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c index 8fdebd7f13..0516f39fa9 100644 --- a/src/core/transport/chttp2/parsing.c +++ b/src/core/transport/chttp2/parsing.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -149,7 +149,7 @@ void grpc_chttp2_publish_reads( if (was_zero && !is_zero) { while (grpc_chttp2_list_pop_stalled_by_transport(transport_global, &stream_global)) { - grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + grpc_chttp2_become_writable(transport_global, stream_global); } } @@ -178,7 +178,7 @@ void grpc_chttp2_publish_reads( outgoing_window); is_zero = stream_global->outgoing_window <= 0; if (was_zero && !is_zero) { - grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + grpc_chttp2_become_writable(transport_global, stream_global); } stream_global->max_recv_bytes -= (uint32_t)GPR_MIN( diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c index b284c78818..60fe735cfc 100644 --- a/src/core/transport/chttp2/stream_lists.c +++ b/src/core/transport/chttp2/stream_lists.c @@ -100,11 +100,14 @@ static void stream_list_remove(grpc_chttp2_transport *t, grpc_chttp2_stream *s, } } -static void stream_list_maybe_remove(grpc_chttp2_transport *t, +static bool stream_list_maybe_remove(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_chttp2_stream_list_id id) { if (s->included[id]) { stream_list_remove(t, s, id); + return true; + } else { + return false; } } @@ -125,23 +128,24 @@ static void stream_list_add_tail(grpc_chttp2_transport *t, s->included[id] = 1; } -static int stream_list_add(grpc_chttp2_transport *t, grpc_chttp2_stream *s, - grpc_chttp2_stream_list_id id) { +static bool stream_list_add(grpc_chttp2_transport *t, grpc_chttp2_stream *s, + grpc_chttp2_stream_list_id id) { if (s->included[id]) { - return 0; + return false; } stream_list_add_tail(t, s, id); - return 1; + return true; } /* wrappers for specializations */ -void grpc_chttp2_list_add_writable_stream( +bool grpc_chttp2_list_add_writable_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) { GPR_ASSERT(stream_global->id != 0); - stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), - STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE); + return stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), + STREAM_FROM_GLOBAL(stream_global), + GRPC_CHTTP2_LIST_WRITABLE); } int grpc_chttp2_list_pop_writable_stream( @@ -159,20 +163,20 @@ int grpc_chttp2_list_pop_writable_stream( return r; } -void grpc_chttp2_list_remove_writable_stream( +bool grpc_chttp2_list_remove_writable_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) { - stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global), - STREAM_FROM_GLOBAL(stream_global), - GRPC_CHTTP2_LIST_WRITABLE); + return stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global), + STREAM_FROM_GLOBAL(stream_global), + GRPC_CHTTP2_LIST_WRITABLE); } -int grpc_chttp2_list_add_writing_stream( +void grpc_chttp2_list_add_writing_stream( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing) { - return stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), - STREAM_FROM_WRITING(stream_writing), - GRPC_CHTTP2_LIST_WRITING); + GPR_ASSERT(stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), + STREAM_FROM_WRITING(stream_writing), + GRPC_CHTTP2_LIST_WRITING)); } int grpc_chttp2_list_have_writing_streams( @@ -332,7 +336,7 @@ void grpc_chttp2_list_flush_writing_stalled_by_transport( while (stream_list_pop(transport, &stream, GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT)) { if (is_window_available) { - grpc_chttp2_list_add_writable_stream(&transport->global, &stream->global); + grpc_chttp2_become_writable(&transport->global, &stream->global); } else { grpc_chttp2_list_add_stalled_by_transport(transport_writing, &stream->writing); diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index 356fd8174a..107725cbc7 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -83,7 +83,8 @@ int grpc_chttp2_unlocking_check_writes( (according to available window sizes) and add to the output buffer */ while (grpc_chttp2_list_pop_writable_stream( transport_global, transport_writing, &stream_global, &stream_writing)) { - uint8_t sent_initial_metadata; + bool sent_initial_metadata = stream_writing->sent_initial_metadata; + bool become_writable = false; stream_writing->id = stream_global->id; stream_writing->read_closed = stream_global->read_closed; @@ -92,16 +93,12 @@ int grpc_chttp2_unlocking_check_writes( outgoing_window, stream_global, outgoing_window); - sent_initial_metadata = stream_writing->sent_initial_metadata; if (!sent_initial_metadata && stream_global->send_initial_metadata) { stream_writing->send_initial_metadata = stream_global->send_initial_metadata; stream_global->send_initial_metadata = NULL; - if (grpc_chttp2_list_add_writing_stream(transport_writing, - stream_writing)) { - GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing"); - } - sent_initial_metadata = 1; + become_writable = true; + sent_initial_metadata = true; } if (sent_initial_metadata) { if (stream_global->send_message != NULL) { @@ -128,10 +125,7 @@ int grpc_chttp2_unlocking_check_writes( stream_writing->flow_controlled_buffer.length > 0) && stream_writing->outgoing_window > 0) { if (transport_writing->outgoing_window > 0) { - if (grpc_chttp2_list_add_writing_stream(transport_writing, - stream_writing)) { - GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing"); - } + become_writable = true; } else { grpc_chttp2_list_add_stalled_by_transport(transport_writing, stream_writing); @@ -141,10 +135,7 @@ int grpc_chttp2_unlocking_check_writes( stream_writing->send_trailing_metadata = stream_global->send_trailing_metadata; stream_global->send_trailing_metadata = NULL; - if (grpc_chttp2_list_add_writing_stream(transport_writing, - stream_writing)) { - GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing"); - } + become_writable = true; } } @@ -153,10 +144,13 @@ int grpc_chttp2_unlocking_check_writes( GRPC_CHTTP2_FLOW_MOVE_STREAM("write", transport_global, stream_writing, announce_window, stream_global, unannounced_incoming_window_for_writing); - if (grpc_chttp2_list_add_writing_stream(transport_writing, - stream_writing)) { - GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing"); - } + become_writable = true; + } + + if (become_writable) { + grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing); + } else { + GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing"); } } @@ -310,10 +304,7 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx, (stream_writing->send_message && !stream_writing->fetching)) && stream_writing->outgoing_window > 0) { if (transport_writing->outgoing_window > 0) { - if (grpc_chttp2_list_add_writing_stream(transport_writing, - stream_writing)) { - /* do nothing - already reffed */ - } + grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing); } else { grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing, stream_writing); diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index b9f511e946..a1ca78d4c4 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -142,7 +142,7 @@ static void incoming_byte_stream_update_flow_control( static void fail_pending_writes(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_global *stream_global); -/* +/******************************************************************************* * CONSTRUCTION/DESTRUCTION/REFCOUNTING */ @@ -521,7 +521,6 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->global.id) == NULL); } - grpc_chttp2_list_remove_writable_stream(&t->global, &s->global); grpc_chttp2_list_remove_unannounced_incoming_window_available(&t->global, &s->global); grpc_chttp2_list_remove_stalled_by_transport(&t->global, &s->global); @@ -583,7 +582,7 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( return &accepting->parsing; } -/* +/******************************************************************************* * LOCK MANAGEMENT */ @@ -611,10 +610,18 @@ static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { GPR_TIMER_END("unlock", 0); } -/* +/******************************************************************************* * OUTPUT PROCESSING */ +void grpc_chttp2_become_writable(grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global) { + if (!TRANSPORT_FROM_GLOBAL(transport_global)->closed && + grpc_chttp2_list_add_writable_stream(transport_global, stream_global)) { + GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing"); + } +} + static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, uint32_t value) { const grpc_chttp2_setting_parameters *sp = @@ -732,7 +739,7 @@ static void maybe_start_some_streams( stream_global->id, STREAM_FROM_GLOBAL(stream_global)); stream_global->in_stream_map = 1; transport_global->concurrent_stream_count++; - grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + grpc_chttp2_become_writable(transport_global, stream_global); } /* cancel out streams that will never be started */ while (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID && @@ -821,7 +828,7 @@ static void perform_stream_op_locked( maybe_start_some_streams(exec_ctx, transport_global); } else { GPR_ASSERT(stream_global->id != 0); - grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + grpc_chttp2_become_writable(transport_global, stream_global); } } else { grpc_chttp2_complete_closure_step( @@ -838,7 +845,7 @@ static void perform_stream_op_locked( exec_ctx, &stream_global->send_message_finished, 0); } else if (stream_global->id != 0) { stream_global->send_message = op->send_message; - grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + grpc_chttp2_become_writable(transport_global, stream_global); } } @@ -858,7 +865,7 @@ static void perform_stream_op_locked( } else if (stream_global->id != 0) { /* TODO(ctiller): check if there's flow control for any outstanding bytes before going writable */ - grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + grpc_chttp2_become_writable(transport_global, stream_global); } } @@ -999,7 +1006,7 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, } } -/* +/******************************************************************************* * INPUT PROCESSING */ @@ -1064,7 +1071,6 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, if (!s) { s = grpc_chttp2_stream_map_delete(&t->new_stream_map, id); } - grpc_chttp2_list_remove_writable_stream(&t->global, &s->global); GPR_ASSERT(s); s->global.in_stream_map = 0; if (t->parsing.incoming_stream == &s->parsing) { @@ -1080,6 +1086,9 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) { close_transport_locked(exec_ctx, t); } + if (grpc_chttp2_list_remove_writable_stream(&t->global, &s->global)) { + GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &s->global, "chttp2_writing"); + } new_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map) + grpc_chttp2_stream_map_size(&t->new_stream_map); @@ -1331,7 +1340,7 @@ static void update_global_window(void *args, uint32_t id, void *stream) { is_zero = stream_global->outgoing_window <= 0; if (was_zero && !is_zero) { - grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + grpc_chttp2_become_writable(transport_global, stream_global); } } @@ -1426,7 +1435,7 @@ static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, bool success) { GPR_TIMER_END("recv_data", 0); } -/* +/******************************************************************************* * CALLBACK LOOP */ @@ -1440,7 +1449,7 @@ static void connectivity_state_set( state, reason); } -/* +/******************************************************************************* * POLLSET STUFF */ @@ -1468,7 +1477,7 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt, unlock(exec_ctx, t); } -/* +/******************************************************************************* * BYTE STREAM */ @@ -1508,7 +1517,7 @@ static void incoming_byte_stream_update_flow_control( add_max_recv_bytes); grpc_chttp2_list_add_unannounced_incoming_window_available(transport_global, stream_global); - grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + grpc_chttp2_become_writable(transport_global, stream_global); } } @@ -1623,7 +1632,7 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( return incoming_byte_stream; } -/* +/******************************************************************************* * TRACING */ @@ -1709,7 +1718,7 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase, gpr_free(prefix); } -/* +/******************************************************************************* * INTEGRATION GLUE */ diff --git a/src/core/transport/metadata.c b/src/core/transport/metadata.c index 14912af7df..807ae071a3 100644 --- a/src/core/transport/metadata.c +++ b/src/core/transport/metadata.c @@ -43,11 +43,13 @@ #include #include #include + #include "src/core/profiling/timers.h" #include "src/core/support/murmur_hash.h" #include "src/core/support/string.h" #include "src/core/transport/chttp2/bin_encoder.h" #include "src/core/transport/static_metadata.h" +#include "src/core/iomgr/iomgr_internal.h" /* There are two kinds of mdelem and mdstr instances. * Static instances are declared in static_metadata.{h,c} and @@ -227,6 +229,9 @@ void grpc_mdctx_global_shutdown(void) { if (shard->count != 0) { gpr_log(GPR_DEBUG, "WARNING: %d metadata elements were leaked", shard->count); + if (grpc_iomgr_abort_on_leaks()) { + abort(); + } } gpr_free(shard->elems); } @@ -237,6 +242,9 @@ void grpc_mdctx_global_shutdown(void) { if (shard->count != 0) { gpr_log(GPR_DEBUG, "WARNING: %d metadata strings were leaked", shard->count); + if (grpc_iomgr_abort_on_leaks()) { + abort(); + } } gpr_free(shard->strs); } diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index 6e154b629a..3b555fa933 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -45,7 +45,7 @@ void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason) { #else void grpc_stream_ref(grpc_stream_refcount *refcount) { #endif - gpr_ref(&refcount->refs); + gpr_ref_non_zero(&refcount->refs); } #ifdef GRPC_STREAM_REFCOUNT_DEBUG diff --git a/src/python/grpcio/grpc/_cython/imports.generated.c b/src/python/grpcio/grpc/_cython/imports.generated.c index 4b1860ce8c..8bd6ae6372 100644 --- a/src/python/grpcio/grpc/_cython/imports.generated.c +++ b/src/python/grpcio/grpc/_cython/imports.generated.c @@ -220,6 +220,7 @@ gpr_event_get_type gpr_event_get_import; gpr_event_wait_type gpr_event_wait_import; gpr_ref_init_type gpr_ref_init_import; gpr_ref_type gpr_ref_import; +gpr_ref_non_zero_type gpr_ref_non_zero_import; gpr_refn_type gpr_refn_import; gpr_unref_type gpr_unref_import; gpr_stats_init_type gpr_stats_init_import; @@ -485,6 +486,7 @@ void pygrpc_load_imports(HMODULE library) { gpr_event_wait_import = (gpr_event_wait_type) GetProcAddress(library, "gpr_event_wait"); gpr_ref_init_import = (gpr_ref_init_type) GetProcAddress(library, "gpr_ref_init"); gpr_ref_import = (gpr_ref_type) GetProcAddress(library, "gpr_ref"); + gpr_ref_non_zero_import = (gpr_ref_non_zero_type) GetProcAddress(library, "gpr_ref_non_zero"); gpr_refn_import = (gpr_refn_type) GetProcAddress(library, "gpr_refn"); gpr_unref_import = (gpr_unref_type) GetProcAddress(library, "gpr_unref"); gpr_stats_init_import = (gpr_stats_init_type) GetProcAddress(library, "gpr_stats_init"); diff --git a/src/python/grpcio/grpc/_cython/imports.generated.h b/src/python/grpcio/grpc/_cython/imports.generated.h index a395dce7d6..b70dcccd17 100644 --- a/src/python/grpcio/grpc/_cython/imports.generated.h +++ b/src/python/grpcio/grpc/_cython/imports.generated.h @@ -610,6 +610,9 @@ extern gpr_ref_init_type gpr_ref_init_import; typedef void(*gpr_ref_type)(gpr_refcount *r); extern gpr_ref_type gpr_ref_import; #define gpr_ref gpr_ref_import +typedef void(*gpr_ref_non_zero_type)(gpr_refcount *r); +extern gpr_ref_non_zero_type gpr_ref_non_zero_import; +#define gpr_ref_non_zero gpr_ref_non_zero_import typedef void(*gpr_refn_type)(gpr_refcount *r, int n); extern gpr_refn_type gpr_refn_import; #define gpr_refn gpr_refn_import diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c index 1af34d97fb..56db4ec686 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c @@ -220,6 +220,7 @@ gpr_event_get_type gpr_event_get_import; gpr_event_wait_type gpr_event_wait_import; gpr_ref_init_type gpr_ref_init_import; gpr_ref_type gpr_ref_import; +gpr_ref_non_zero_type gpr_ref_non_zero_import; gpr_refn_type gpr_refn_import; gpr_unref_type gpr_unref_import; gpr_stats_init_type gpr_stats_init_import; @@ -481,6 +482,7 @@ void grpc_rb_load_imports(HMODULE library) { gpr_event_wait_import = (gpr_event_wait_type) GetProcAddress(library, "gpr_event_wait"); gpr_ref_init_import = (gpr_ref_init_type) GetProcAddress(library, "gpr_ref_init"); gpr_ref_import = (gpr_ref_type) GetProcAddress(library, "gpr_ref"); + gpr_ref_non_zero_import = (gpr_ref_non_zero_type) GetProcAddress(library, "gpr_ref_non_zero"); gpr_refn_import = (gpr_refn_type) GetProcAddress(library, "gpr_refn"); gpr_unref_import = (gpr_unref_type) GetProcAddress(library, "gpr_unref"); gpr_stats_init_import = (gpr_stats_init_type) GetProcAddress(library, "gpr_stats_init"); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index 38aabfaca8..b972f60fc3 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -610,6 +610,9 @@ extern gpr_ref_init_type gpr_ref_init_import; typedef void(*gpr_ref_type)(gpr_refcount *r); extern gpr_ref_type gpr_ref_import; #define gpr_ref gpr_ref_import +typedef void(*gpr_ref_non_zero_type)(gpr_refcount *r); +extern gpr_ref_non_zero_type gpr_ref_non_zero_import; +#define gpr_ref_non_zero gpr_ref_non_zero_import typedef void(*gpr_refn_type)(gpr_refcount *r, int n); extern gpr_refn_type gpr_refn_import; #define gpr_refn gpr_refn_import diff --git a/test/cpp/interop/reconnect_interop_client.cc b/test/cpp/interop/reconnect_interop_client.cc index 1f6b352db1..79a60cc860 100644 --- a/test/cpp/interop/reconnect_interop_client.cc +++ b/test/cpp/interop/reconnect_interop_client.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without -- cgit v1.2.3 From 13ee2f2df3ded81d096a1440a0f273eeece6a2cc Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Thu, 3 Mar 2016 13:57:32 -0800 Subject: Properly integrate async API with server-side cancellations. There is a comment above IsCancelled that says when it is ok to use this. --- include/grpc++/impl/codegen/completion_queue.h | 1 + include/grpc++/impl/codegen/server_context.h | 3 + src/cpp/server/server_context.cc | 26 ++- test/cpp/end2end/async_end2end_test.cc | 218 +++++++++++++++++++------ 4 files changed, 187 insertions(+), 61 deletions(-) (limited to 'test') diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h index 102831e1c9..928ab2db31 100644 --- a/include/grpc++/impl/codegen/completion_queue.h +++ b/include/grpc++/impl/codegen/completion_queue.h @@ -184,6 +184,7 @@ class CompletionQueue : private GrpcLibrary { bool Pluck(CompletionQueueTag* tag); /// Performs a single polling pluck on \a tag. + /// \warning Must not be mixed with calls to \a Next. void TryPluck(CompletionQueueTag* tag); grpc_completion_queue* cq_; // owned diff --git a/include/grpc++/impl/codegen/server_context.h b/include/grpc++/impl/codegen/server_context.h index ad08b8210d..91ebe574b1 100644 --- a/include/grpc++/impl/codegen/server_context.h +++ b/include/grpc++/impl/codegen/server_context.h @@ -103,6 +103,9 @@ class ServerContext { void AddInitialMetadata(const grpc::string& key, const grpc::string& value); void AddTrailingMetadata(const grpc::string& key, const grpc::string& value); + // IsCancelled is always safe to call when using sync API + // When using async API, it is only safe to call IsCancelled after + // the AsyncNotifyWhenDone tag has been delivered bool IsCancelled() const; // Cancel the Call from the server. This is a best-effort API and depending on diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index e205a1969b..eb49b21037 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -62,7 +62,11 @@ class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface { void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE; bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; - bool CheckCancelled(CompletionQueue* cq); + bool CheckCancelled(CompletionQueue* cq) { + cq->TryPluck(this); + return CheckCancelledNoPluck(); + } + bool CheckCancelledAsync() { return CheckCancelledNoPluck(); } void set_tag(void* tag) { has_tag_ = true; @@ -72,6 +76,11 @@ class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface { void Unref(); private: + bool CheckCancelledNoPluck() { + grpc::lock_guard g(mu_); + return finalized_ ? (cancelled_ != 0) : false; + } + bool has_tag_; void* tag_; grpc::mutex mu_; @@ -88,12 +97,6 @@ void ServerContext::CompletionOp::Unref() { } } -bool ServerContext::CompletionOp::CheckCancelled(CompletionQueue* cq) { - cq->TryPluck(this); - grpc::lock_guard g(mu_); - return finalized_ ? cancelled_ != 0 : false; -} - void ServerContext::CompletionOp::FillOps(grpc_op* ops, size_t* nops) { ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER; ops->data.recv_close_on_server.cancelled = &cancelled_; @@ -182,7 +185,14 @@ void ServerContext::TryCancel() const { } bool ServerContext::IsCancelled() const { - return completion_op_ && completion_op_->CheckCancelled(cq_); + if (has_notify_when_done_tag_) { + // when using async API, but the result is only valid + // if the tag has already been delivered at the completion queue + return completion_op_ && completion_op_->CheckCancelledAsync(); + } else { + // when using sync API + return completion_op_ && completion_op_->CheckCancelled(cq_); + } } void ServerContext::set_compression_level(grpc_compression_level level) { diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 9ca3bf98f8..09e973f28d 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -68,6 +68,7 @@ namespace testing { namespace { void* tag(int i) { return (void*)(intptr_t)i; } +int detag(void* p) { return static_cast(reinterpret_cast(p)); } #ifdef GPR_POSIX_SOCKET static int maybe_assert_non_blocking_poll(struct pollfd* pfds, nfds_t nfds, @@ -111,30 +112,35 @@ class Verifier { return *this; } + int Next(CompletionQueue* cq, bool ignore_ok) { + bool ok; + void* got_tag; + if (spin_) { + for (;;) { + auto r = cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME)); + if (r == CompletionQueue::TIMEOUT) continue; + if (r == CompletionQueue::GOT_EVENT) break; + gpr_log(GPR_ERROR, "unexpected result from AsyncNext"); + abort(); + } + } else { + EXPECT_TRUE(cq->Next(&got_tag, &ok)); + } + auto it = expectations_.find(got_tag); + EXPECT_TRUE(it != expectations_.end()); + if (!ignore_ok) { + EXPECT_EQ(it->second, ok); + } + expectations_.erase(it); + return detag(got_tag); + } + void Verify(CompletionQueue* cq) { Verify(cq, false); } void Verify(CompletionQueue* cq, bool ignore_ok) { GPR_ASSERT(!expectations_.empty()); while (!expectations_.empty()) { - bool ok; - void* got_tag; - if (spin_) { - for (;;) { - auto r = cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME)); - if (r == CompletionQueue::TIMEOUT) continue; - if (r == CompletionQueue::GOT_EVENT) break; - gpr_log(GPR_ERROR, "unexpected result from AsyncNext"); - abort(); - } - } else { - EXPECT_TRUE(cq->Next(&got_tag, &ok)); - } - auto it = expectations_.find(got_tag); - EXPECT_TRUE(it != expectations_.end()); - if (!ignore_ok) { - EXPECT_EQ(it->second, ok); - } - expectations_.erase(it); + Next(cq, ignore_ok); } } void Verify(CompletionQueue* cq, @@ -793,7 +799,8 @@ TEST_P(AsyncEnd2endTest, UnimplementedRpc) { } // This class is for testing scenarios where RPCs are cancelled on the server -// by calling ServerContext::TryCancel() +// by calling ServerContext::TryCancel(). Server uses AsyncNotifyWhenDone +// API to check for cancellation class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { protected: typedef enum { @@ -803,13 +810,6 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { CANCEL_AFTER_PROCESSING } ServerTryCancelRequestPhase; - void ServerTryCancel(ServerContext* context) { - EXPECT_FALSE(context->IsCancelled()); - context->TryCancel(); - gpr_log(GPR_INFO, "Server called TryCancel()"); - EXPECT_TRUE(context->IsCancelled()); - } - // Helper for testing client-streaming RPCs which are cancelled on the server. // Depending on the value of server_try_cancel parameter, this will test one // of the following three scenarios: @@ -843,6 +843,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // On the server, request to be notified of 'RequestStream' calls // and receive the 'RequestStream' call just made by the client + srv_ctx.AsyncNotifyWhenDone(tag(11)); service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), tag(2)); Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); @@ -858,9 +859,12 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { bool expected_server_cq_result = true; bool ignore_cq_result = false; + bool want_done_tag = false; if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { - ServerTryCancel(&srv_ctx); + srv_ctx.TryCancel(); + Verifier(GetParam()).Expect(11, true).Verify(cq_.get()); + EXPECT_TRUE(srv_ctx.IsCancelled()); // Since cancellation is done before server reads any results, we know // for sure that all cq results will return false from this point forward @@ -868,22 +872,39 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { } std::thread* server_try_cancel_thd = NULL; + + auto verif = Verifier(GetParam()); + if (server_try_cancel == CANCEL_DURING_PROCESSING) { server_try_cancel_thd = new std::thread( - &AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx); + &ServerContext::TryCancel, &srv_ctx); // Server will cancel the RPC in a parallel thread while reading the // requests from the client. Since the cancellation can happen at anytime, // some of the cq results (i.e those until cancellation) might be true but // its non deterministic. So better to ignore the cq results ignore_cq_result = true; + // Expect that we might possibly see the done tag that + // indicates cancellation completion in this case + want_done_tag = true; + verif.Expect(11, true); } // Server reads 3 messages (tags 6, 7 and 8) + // But if want_done_tag is true, we might also see tag 11 for (int tag_idx = 6; tag_idx <= 8; tag_idx++) { srv_stream.Read(&recv_request, tag(tag_idx)); - Verifier(GetParam()) - .Expect(tag_idx, expected_server_cq_result) - .Verify(cq_.get(), ignore_cq_result); + // Note that we'll add something to the verifier and verify that + // something was seen, but it might be tag 11 and not what we + // just added + int got_tag = verif.Expect(tag_idx, expected_server_cq_result) + .Next(cq_.get(), ignore_cq_result); + GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag)); + if (got_tag == 11) { + EXPECT_TRUE(srv_ctx.IsCancelled()); + want_done_tag = false; + // Now get the other entry that we were waiting on + EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx); + } } if (server_try_cancel_thd != NULL) { @@ -892,7 +913,15 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { } if (server_try_cancel == CANCEL_AFTER_PROCESSING) { - ServerTryCancel(&srv_ctx); + srv_ctx.TryCancel(); + want_done_tag = true; + verif.Expect(11, true); + } + + if (want_done_tag) { + verif.Verify(cq_.get()); + EXPECT_TRUE(srv_ctx.IsCancelled()); + want_done_tag = false; } // The RPC has been cancelled at this point for sure (i.e irrespective of @@ -945,6 +974,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { Verifier(GetParam()).Expect(1, true).Verify(cq_.get()); // On the server, request to be notified of 'ResponseStream' calls and // receive the call just made by the client + srv_ctx.AsyncNotifyWhenDone(tag(11)); service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, cq_.get(), cq_.get(), tag(2)); Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); @@ -952,9 +982,12 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { bool expected_cq_result = true; bool ignore_cq_result = false; + bool want_done_tag = false; if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { - ServerTryCancel(&srv_ctx); + srv_ctx.TryCancel(); + Verifier(GetParam()).Expect(11, true).Verify(cq_.get()); + EXPECT_TRUE(srv_ctx.IsCancelled()); // We know for sure that all cq results will be false from this point // since the server cancelled the RPC @@ -962,24 +995,41 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { } std::thread* server_try_cancel_thd = NULL; + + auto verif = Verifier(GetParam()); + if (server_try_cancel == CANCEL_DURING_PROCESSING) { server_try_cancel_thd = new std::thread( - &AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx); + &ServerContext::TryCancel, &srv_ctx); // Server will cancel the RPC in a parallel thread while writing responses // to the client. Since the cancellation can happen at anytime, some of // the cq results (i.e those until cancellation) might be true but it is // non deterministic. So better to ignore the cq results ignore_cq_result = true; + // Expect that we might possibly see the done tag that + // indicates cancellation completion in this case + want_done_tag = true; + verif.Expect(11, true); } // Server sends three messages (tags 3, 4 and 5) + // But if want_done tag is true, we might also see tag 11 for (int tag_idx = 3; tag_idx <= 5; tag_idx++) { send_response.set_message("Pong " + std::to_string(tag_idx)); srv_stream.Write(send_response, tag(tag_idx)); - Verifier(GetParam()) - .Expect(tag_idx, expected_cq_result) - .Verify(cq_.get(), ignore_cq_result); + // Note that we'll add something to the verifier and verify that + // something was seen, but it might be tag 11 and not what we + // just added + int got_tag = verif.Expect(tag_idx, expected_cq_result) + .Next(cq_.get(), ignore_cq_result); + GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag)); + if (got_tag == 11) { + EXPECT_TRUE(srv_ctx.IsCancelled()); + want_done_tag = false; + // Now get the other entry that we were waiting on + EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx); + } } if (server_try_cancel_thd != NULL) { @@ -988,13 +1038,21 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { } if (server_try_cancel == CANCEL_AFTER_PROCESSING) { - ServerTryCancel(&srv_ctx); + srv_ctx.TryCancel(); + want_done_tag = true; + verif.Expect(11, true); // Client reads may fail bacause it is notified that the stream is // cancelled. ignore_cq_result = true; } + if (want_done_tag) { + verif.Verify(cq_.get()); + EXPECT_TRUE(srv_ctx.IsCancelled()); + want_done_tag = false; + } + // Client attemts to read the three messages from the server for (int tag_idx = 6; tag_idx <= 8; tag_idx++) { cli_stream->Read(&recv_response, tag(tag_idx)); @@ -1052,6 +1110,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // On the server, request to be notified of the 'BidiStream' call and // receive the call just made by the client + srv_ctx.AsyncNotifyWhenDone(tag(11)); service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), tag(2)); Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); @@ -1063,9 +1122,12 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { bool expected_cq_result = true; bool ignore_cq_result = false; + bool want_done_tag = false; if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { - ServerTryCancel(&srv_ctx); + srv_ctx.TryCancel(); + Verifier(GetParam()).Expect(11, true).Verify(cq_.get()); + EXPECT_TRUE(srv_ctx.IsCancelled()); // We know for sure that all cq results will be false from this point // since the server cancelled the RPC @@ -1073,42 +1135,84 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { } std::thread* server_try_cancel_thd = NULL; + + auto verif = Verifier(GetParam()); + if (server_try_cancel == CANCEL_DURING_PROCESSING) { server_try_cancel_thd = new std::thread( - &AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx); + &ServerContext::TryCancel, &srv_ctx); // Since server is going to cancel the RPC in a parallel thread, some of // the cq results (i.e those until the cancellation) might be true. Since // that number is non-deterministic, it is better to ignore the cq results ignore_cq_result = true; + // Expect that we might possibly see the done tag that + // indicates cancellation completion in this case + want_done_tag = true; + verif.Expect(11, true); } + int got_tag; srv_stream.Read(&recv_request, tag(4)); - Verifier(GetParam()) - .Expect(4, expected_cq_result) - .Verify(cq_.get(), ignore_cq_result); + verif.Expect(4, expected_cq_result); + got_tag = verif.Next(cq_.get(), ignore_cq_result); + GPR_ASSERT((got_tag == 4) || (got_tag == 11 && want_done_tag)); + if (got_tag == 11) { + EXPECT_TRUE(srv_ctx.IsCancelled()); + want_done_tag = false; + // Now get the other entry that we were waiting on + EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 4); + } send_response.set_message("Pong"); srv_stream.Write(send_response, tag(5)); - Verifier(GetParam()) - .Expect(5, expected_cq_result) - .Verify(cq_.get(), ignore_cq_result); + verif.Expect(5, expected_cq_result); + got_tag = verif.Next(cq_.get(), ignore_cq_result); + GPR_ASSERT((got_tag == 5) || (got_tag == 11 && want_done_tag)); + if (got_tag == 11) { + EXPECT_TRUE(srv_ctx.IsCancelled()); + want_done_tag = false; + // Now get the other entry that we were waiting on + EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 5); + } cli_stream->Read(&recv_response, tag(6)); - Verifier(GetParam()) - .Expect(6, expected_cq_result) - .Verify(cq_.get(), ignore_cq_result); + verif.Expect(6, expected_cq_result); + got_tag = verif.Next(cq_.get(), ignore_cq_result); + GPR_ASSERT((got_tag == 6) || (got_tag == 11 && want_done_tag)); + if (got_tag == 11) { + EXPECT_TRUE(srv_ctx.IsCancelled()); + want_done_tag = false; + // Now get the other entry that we were waiting on + EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 6); + } // This is expected to succeed in all cases cli_stream->WritesDone(tag(7)); - Verifier(GetParam()).Expect(7, true).Verify(cq_.get()); + verif.Expect(7, true); + got_tag = verif.Next(cq_.get(), ignore_cq_result); + GPR_ASSERT((got_tag == 7) || (got_tag == 11 && want_done_tag)); + if (got_tag == 11) { + EXPECT_TRUE(srv_ctx.IsCancelled()); + want_done_tag = false; + // Now get the other entry that we were waiting on + EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 7); + } // This is expected to fail in all cases i.e for all values of // server_try_cancel. This is because at this point, either there are no // more msgs from the client (because client called WritesDone) or the RPC // is cancelled on the server srv_stream.Read(&recv_request, tag(8)); - Verifier(GetParam()).Expect(8, false).Verify(cq_.get()); + verif.Expect(8, false); + got_tag = verif.Next(cq_.get(), ignore_cq_result); + GPR_ASSERT((got_tag == 8) || (got_tag == 11 && want_done_tag)); + if (got_tag == 11) { + EXPECT_TRUE(srv_ctx.IsCancelled()); + want_done_tag = false; + // Now get the other entry that we were waiting on + EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 8); + } if (server_try_cancel_thd != NULL) { server_try_cancel_thd->join(); @@ -1116,7 +1220,15 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { } if (server_try_cancel == CANCEL_AFTER_PROCESSING) { - ServerTryCancel(&srv_ctx); + srv_ctx.TryCancel(); + want_done_tag = true; + verif.Expect(11, true); + } + + if (want_done_tag) { + verif.Verify(cq_.get()); + EXPECT_TRUE(srv_ctx.IsCancelled()); + want_done_tag = false; } // The RPC has been cancelled at this point for sure (i.e irrespective of -- cgit v1.2.3 From 2e729387f7943971de2591d0db09a0a87026cfd7 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Thu, 3 Mar 2016 14:22:12 -0800 Subject: clang-format --- test/cpp/end2end/async_end2end_test.cc | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) (limited to 'test') diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 09e973f28d..30028dd4aa 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -876,8 +876,8 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { auto verif = Verifier(GetParam()); if (server_try_cancel == CANCEL_DURING_PROCESSING) { - server_try_cancel_thd = new std::thread( - &ServerContext::TryCancel, &srv_ctx); + server_try_cancel_thd = + new std::thread(&ServerContext::TryCancel, &srv_ctx); // Server will cancel the RPC in a parallel thread while reading the // requests from the client. Since the cancellation can happen at anytime, // some of the cq results (i.e those until cancellation) might be true but @@ -897,7 +897,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // something was seen, but it might be tag 11 and not what we // just added int got_tag = verif.Expect(tag_idx, expected_server_cq_result) - .Next(cq_.get(), ignore_cq_result); + .Next(cq_.get(), ignore_cq_result); GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag)); if (got_tag == 11) { EXPECT_TRUE(srv_ctx.IsCancelled()); @@ -999,8 +999,8 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { auto verif = Verifier(GetParam()); if (server_try_cancel == CANCEL_DURING_PROCESSING) { - server_try_cancel_thd = new std::thread( - &ServerContext::TryCancel, &srv_ctx); + server_try_cancel_thd = + new std::thread(&ServerContext::TryCancel, &srv_ctx); // Server will cancel the RPC in a parallel thread while writing responses // to the client. Since the cancellation can happen at anytime, some of @@ -1022,7 +1022,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // something was seen, but it might be tag 11 and not what we // just added int got_tag = verif.Expect(tag_idx, expected_cq_result) - .Next(cq_.get(), ignore_cq_result); + .Next(cq_.get(), ignore_cq_result); GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag)); if (got_tag == 11) { EXPECT_TRUE(srv_ctx.IsCancelled()); @@ -1139,8 +1139,8 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { auto verif = Verifier(GetParam()); if (server_try_cancel == CANCEL_DURING_PROCESSING) { - server_try_cancel_thd = new std::thread( - &ServerContext::TryCancel, &srv_ctx); + server_try_cancel_thd = + new std::thread(&ServerContext::TryCancel, &srv_ctx); // Since server is going to cancel the RPC in a parallel thread, some of // the cq results (i.e those until the cancellation) might be true. Since -- cgit v1.2.3 From dbf47fabd406b51e8d3d03c5eb06c5fe6f7d0d5d Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Thu, 3 Mar 2016 16:25:06 -0800 Subject: Better comments. --- test/cpp/end2end/async_end2end_test.cc | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'test') diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 30028dd4aa..dc8c2bb6e5 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -107,11 +107,14 @@ class PollingOverrider { class Verifier { public: explicit Verifier(bool spin) : spin_(spin) {} + // Expect sets the expected ok value for a specific tag Verifier& Expect(int i, bool expect_ok) { expectations_[tag(i)] = expect_ok; return *this; } + // Next waits for 1 async tag to complete, checks its + // expectations, and returns the tag int Next(CompletionQueue* cq, bool ignore_ok) { bool ok; void* got_tag; @@ -135,14 +138,19 @@ class Verifier { return detag(got_tag); } + // Verify keeps calling Next until all currently set + // expected tags are complete void Verify(CompletionQueue* cq) { Verify(cq, false); } + // This version of Verify allows optionally ignoring the + // outcome of the expectation void Verify(CompletionQueue* cq, bool ignore_ok) { GPR_ASSERT(!expectations_.empty()); while (!expectations_.empty()) { Next(cq, ignore_ok); } } + // This version of Verify stops after a certain deadline void Verify(CompletionQueue* cq, std::chrono::system_clock::time_point deadline) { if (expectations_.empty()) { -- cgit v1.2.3 From edd96e4926373644bac1c169431c213d361bed21 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Fri, 4 Mar 2016 14:32:50 -0500 Subject: Revert "Properly integrate async API with server-side cancellations." --- include/grpc++/impl/codegen/completion_queue.h | 1 - include/grpc++/impl/codegen/server_context.h | 3 - src/cpp/server/server_context.cc | 26 +-- test/cpp/end2end/async_end2end_test.cc | 232 ++++++------------------- 4 files changed, 64 insertions(+), 198 deletions(-) (limited to 'test') diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h index 928ab2db31..102831e1c9 100644 --- a/include/grpc++/impl/codegen/completion_queue.h +++ b/include/grpc++/impl/codegen/completion_queue.h @@ -184,7 +184,6 @@ class CompletionQueue : private GrpcLibrary { bool Pluck(CompletionQueueTag* tag); /// Performs a single polling pluck on \a tag. - /// \warning Must not be mixed with calls to \a Next. void TryPluck(CompletionQueueTag* tag); grpc_completion_queue* cq_; // owned diff --git a/include/grpc++/impl/codegen/server_context.h b/include/grpc++/impl/codegen/server_context.h index 91ebe574b1..ad08b8210d 100644 --- a/include/grpc++/impl/codegen/server_context.h +++ b/include/grpc++/impl/codegen/server_context.h @@ -103,9 +103,6 @@ class ServerContext { void AddInitialMetadata(const grpc::string& key, const grpc::string& value); void AddTrailingMetadata(const grpc::string& key, const grpc::string& value); - // IsCancelled is always safe to call when using sync API - // When using async API, it is only safe to call IsCancelled after - // the AsyncNotifyWhenDone tag has been delivered bool IsCancelled() const; // Cancel the Call from the server. This is a best-effort API and depending on diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index eb49b21037..e205a1969b 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -62,11 +62,7 @@ class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface { void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE; bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; - bool CheckCancelled(CompletionQueue* cq) { - cq->TryPluck(this); - return CheckCancelledNoPluck(); - } - bool CheckCancelledAsync() { return CheckCancelledNoPluck(); } + bool CheckCancelled(CompletionQueue* cq); void set_tag(void* tag) { has_tag_ = true; @@ -76,11 +72,6 @@ class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface { void Unref(); private: - bool CheckCancelledNoPluck() { - grpc::lock_guard g(mu_); - return finalized_ ? (cancelled_ != 0) : false; - } - bool has_tag_; void* tag_; grpc::mutex mu_; @@ -97,6 +88,12 @@ void ServerContext::CompletionOp::Unref() { } } +bool ServerContext::CompletionOp::CheckCancelled(CompletionQueue* cq) { + cq->TryPluck(this); + grpc::lock_guard g(mu_); + return finalized_ ? cancelled_ != 0 : false; +} + void ServerContext::CompletionOp::FillOps(grpc_op* ops, size_t* nops) { ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER; ops->data.recv_close_on_server.cancelled = &cancelled_; @@ -185,14 +182,7 @@ void ServerContext::TryCancel() const { } bool ServerContext::IsCancelled() const { - if (has_notify_when_done_tag_) { - // when using async API, but the result is only valid - // if the tag has already been delivered at the completion queue - return completion_op_ && completion_op_->CheckCancelledAsync(); - } else { - // when using sync API - return completion_op_ && completion_op_->CheckCancelled(cq_); - } + return completion_op_ && completion_op_->CheckCancelled(cq_); } void ServerContext::set_compression_level(grpc_compression_level level) { diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index dc8c2bb6e5..9ca3bf98f8 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -68,7 +68,6 @@ namespace testing { namespace { void* tag(int i) { return (void*)(intptr_t)i; } -int detag(void* p) { return static_cast(reinterpret_cast(p)); } #ifdef GPR_POSIX_SOCKET static int maybe_assert_non_blocking_poll(struct pollfd* pfds, nfds_t nfds, @@ -107,50 +106,37 @@ class PollingOverrider { class Verifier { public: explicit Verifier(bool spin) : spin_(spin) {} - // Expect sets the expected ok value for a specific tag Verifier& Expect(int i, bool expect_ok) { expectations_[tag(i)] = expect_ok; return *this; } - // Next waits for 1 async tag to complete, checks its - // expectations, and returns the tag - int Next(CompletionQueue* cq, bool ignore_ok) { - bool ok; - void* got_tag; - if (spin_) { - for (;;) { - auto r = cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME)); - if (r == CompletionQueue::TIMEOUT) continue; - if (r == CompletionQueue::GOT_EVENT) break; - gpr_log(GPR_ERROR, "unexpected result from AsyncNext"); - abort(); - } - } else { - EXPECT_TRUE(cq->Next(&got_tag, &ok)); - } - auto it = expectations_.find(got_tag); - EXPECT_TRUE(it != expectations_.end()); - if (!ignore_ok) { - EXPECT_EQ(it->second, ok); - } - expectations_.erase(it); - return detag(got_tag); - } - - // Verify keeps calling Next until all currently set - // expected tags are complete void Verify(CompletionQueue* cq) { Verify(cq, false); } - // This version of Verify allows optionally ignoring the - // outcome of the expectation void Verify(CompletionQueue* cq, bool ignore_ok) { GPR_ASSERT(!expectations_.empty()); while (!expectations_.empty()) { - Next(cq, ignore_ok); + bool ok; + void* got_tag; + if (spin_) { + for (;;) { + auto r = cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME)); + if (r == CompletionQueue::TIMEOUT) continue; + if (r == CompletionQueue::GOT_EVENT) break; + gpr_log(GPR_ERROR, "unexpected result from AsyncNext"); + abort(); + } + } else { + EXPECT_TRUE(cq->Next(&got_tag, &ok)); + } + auto it = expectations_.find(got_tag); + EXPECT_TRUE(it != expectations_.end()); + if (!ignore_ok) { + EXPECT_EQ(it->second, ok); + } + expectations_.erase(it); } } - // This version of Verify stops after a certain deadline void Verify(CompletionQueue* cq, std::chrono::system_clock::time_point deadline) { if (expectations_.empty()) { @@ -807,8 +793,7 @@ TEST_P(AsyncEnd2endTest, UnimplementedRpc) { } // This class is for testing scenarios where RPCs are cancelled on the server -// by calling ServerContext::TryCancel(). Server uses AsyncNotifyWhenDone -// API to check for cancellation +// by calling ServerContext::TryCancel() class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { protected: typedef enum { @@ -818,6 +803,13 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { CANCEL_AFTER_PROCESSING } ServerTryCancelRequestPhase; + void ServerTryCancel(ServerContext* context) { + EXPECT_FALSE(context->IsCancelled()); + context->TryCancel(); + gpr_log(GPR_INFO, "Server called TryCancel()"); + EXPECT_TRUE(context->IsCancelled()); + } + // Helper for testing client-streaming RPCs which are cancelled on the server. // Depending on the value of server_try_cancel parameter, this will test one // of the following three scenarios: @@ -851,7 +843,6 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // On the server, request to be notified of 'RequestStream' calls // and receive the 'RequestStream' call just made by the client - srv_ctx.AsyncNotifyWhenDone(tag(11)); service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), tag(2)); Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); @@ -867,12 +858,9 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { bool expected_server_cq_result = true; bool ignore_cq_result = false; - bool want_done_tag = false; if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { - srv_ctx.TryCancel(); - Verifier(GetParam()).Expect(11, true).Verify(cq_.get()); - EXPECT_TRUE(srv_ctx.IsCancelled()); + ServerTryCancel(&srv_ctx); // Since cancellation is done before server reads any results, we know // for sure that all cq results will return false from this point forward @@ -880,39 +868,22 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { } std::thread* server_try_cancel_thd = NULL; - - auto verif = Verifier(GetParam()); - if (server_try_cancel == CANCEL_DURING_PROCESSING) { - server_try_cancel_thd = - new std::thread(&ServerContext::TryCancel, &srv_ctx); + server_try_cancel_thd = new std::thread( + &AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx); // Server will cancel the RPC in a parallel thread while reading the // requests from the client. Since the cancellation can happen at anytime, // some of the cq results (i.e those until cancellation) might be true but // its non deterministic. So better to ignore the cq results ignore_cq_result = true; - // Expect that we might possibly see the done tag that - // indicates cancellation completion in this case - want_done_tag = true; - verif.Expect(11, true); } // Server reads 3 messages (tags 6, 7 and 8) - // But if want_done_tag is true, we might also see tag 11 for (int tag_idx = 6; tag_idx <= 8; tag_idx++) { srv_stream.Read(&recv_request, tag(tag_idx)); - // Note that we'll add something to the verifier and verify that - // something was seen, but it might be tag 11 and not what we - // just added - int got_tag = verif.Expect(tag_idx, expected_server_cq_result) - .Next(cq_.get(), ignore_cq_result); - GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag)); - if (got_tag == 11) { - EXPECT_TRUE(srv_ctx.IsCancelled()); - want_done_tag = false; - // Now get the other entry that we were waiting on - EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx); - } + Verifier(GetParam()) + .Expect(tag_idx, expected_server_cq_result) + .Verify(cq_.get(), ignore_cq_result); } if (server_try_cancel_thd != NULL) { @@ -921,15 +892,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { } if (server_try_cancel == CANCEL_AFTER_PROCESSING) { - srv_ctx.TryCancel(); - want_done_tag = true; - verif.Expect(11, true); - } - - if (want_done_tag) { - verif.Verify(cq_.get()); - EXPECT_TRUE(srv_ctx.IsCancelled()); - want_done_tag = false; + ServerTryCancel(&srv_ctx); } // The RPC has been cancelled at this point for sure (i.e irrespective of @@ -982,7 +945,6 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { Verifier(GetParam()).Expect(1, true).Verify(cq_.get()); // On the server, request to be notified of 'ResponseStream' calls and // receive the call just made by the client - srv_ctx.AsyncNotifyWhenDone(tag(11)); service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, cq_.get(), cq_.get(), tag(2)); Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); @@ -990,12 +952,9 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { bool expected_cq_result = true; bool ignore_cq_result = false; - bool want_done_tag = false; if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { - srv_ctx.TryCancel(); - Verifier(GetParam()).Expect(11, true).Verify(cq_.get()); - EXPECT_TRUE(srv_ctx.IsCancelled()); + ServerTryCancel(&srv_ctx); // We know for sure that all cq results will be false from this point // since the server cancelled the RPC @@ -1003,41 +962,24 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { } std::thread* server_try_cancel_thd = NULL; - - auto verif = Verifier(GetParam()); - if (server_try_cancel == CANCEL_DURING_PROCESSING) { - server_try_cancel_thd = - new std::thread(&ServerContext::TryCancel, &srv_ctx); + server_try_cancel_thd = new std::thread( + &AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx); // Server will cancel the RPC in a parallel thread while writing responses // to the client. Since the cancellation can happen at anytime, some of // the cq results (i.e those until cancellation) might be true but it is // non deterministic. So better to ignore the cq results ignore_cq_result = true; - // Expect that we might possibly see the done tag that - // indicates cancellation completion in this case - want_done_tag = true; - verif.Expect(11, true); } // Server sends three messages (tags 3, 4 and 5) - // But if want_done tag is true, we might also see tag 11 for (int tag_idx = 3; tag_idx <= 5; tag_idx++) { send_response.set_message("Pong " + std::to_string(tag_idx)); srv_stream.Write(send_response, tag(tag_idx)); - // Note that we'll add something to the verifier and verify that - // something was seen, but it might be tag 11 and not what we - // just added - int got_tag = verif.Expect(tag_idx, expected_cq_result) - .Next(cq_.get(), ignore_cq_result); - GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag)); - if (got_tag == 11) { - EXPECT_TRUE(srv_ctx.IsCancelled()); - want_done_tag = false; - // Now get the other entry that we were waiting on - EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx); - } + Verifier(GetParam()) + .Expect(tag_idx, expected_cq_result) + .Verify(cq_.get(), ignore_cq_result); } if (server_try_cancel_thd != NULL) { @@ -1046,21 +988,13 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { } if (server_try_cancel == CANCEL_AFTER_PROCESSING) { - srv_ctx.TryCancel(); - want_done_tag = true; - verif.Expect(11, true); + ServerTryCancel(&srv_ctx); // Client reads may fail bacause it is notified that the stream is // cancelled. ignore_cq_result = true; } - if (want_done_tag) { - verif.Verify(cq_.get()); - EXPECT_TRUE(srv_ctx.IsCancelled()); - want_done_tag = false; - } - // Client attemts to read the three messages from the server for (int tag_idx = 6; tag_idx <= 8; tag_idx++) { cli_stream->Read(&recv_response, tag(tag_idx)); @@ -1118,7 +1052,6 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // On the server, request to be notified of the 'BidiStream' call and // receive the call just made by the client - srv_ctx.AsyncNotifyWhenDone(tag(11)); service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), tag(2)); Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); @@ -1130,12 +1063,9 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { bool expected_cq_result = true; bool ignore_cq_result = false; - bool want_done_tag = false; if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { - srv_ctx.TryCancel(); - Verifier(GetParam()).Expect(11, true).Verify(cq_.get()); - EXPECT_TRUE(srv_ctx.IsCancelled()); + ServerTryCancel(&srv_ctx); // We know for sure that all cq results will be false from this point // since the server cancelled the RPC @@ -1143,84 +1073,42 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { } std::thread* server_try_cancel_thd = NULL; - - auto verif = Verifier(GetParam()); - if (server_try_cancel == CANCEL_DURING_PROCESSING) { - server_try_cancel_thd = - new std::thread(&ServerContext::TryCancel, &srv_ctx); + server_try_cancel_thd = new std::thread( + &AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx); // Since server is going to cancel the RPC in a parallel thread, some of // the cq results (i.e those until the cancellation) might be true. Since // that number is non-deterministic, it is better to ignore the cq results ignore_cq_result = true; - // Expect that we might possibly see the done tag that - // indicates cancellation completion in this case - want_done_tag = true; - verif.Expect(11, true); } - int got_tag; srv_stream.Read(&recv_request, tag(4)); - verif.Expect(4, expected_cq_result); - got_tag = verif.Next(cq_.get(), ignore_cq_result); - GPR_ASSERT((got_tag == 4) || (got_tag == 11 && want_done_tag)); - if (got_tag == 11) { - EXPECT_TRUE(srv_ctx.IsCancelled()); - want_done_tag = false; - // Now get the other entry that we were waiting on - EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 4); - } + Verifier(GetParam()) + .Expect(4, expected_cq_result) + .Verify(cq_.get(), ignore_cq_result); send_response.set_message("Pong"); srv_stream.Write(send_response, tag(5)); - verif.Expect(5, expected_cq_result); - got_tag = verif.Next(cq_.get(), ignore_cq_result); - GPR_ASSERT((got_tag == 5) || (got_tag == 11 && want_done_tag)); - if (got_tag == 11) { - EXPECT_TRUE(srv_ctx.IsCancelled()); - want_done_tag = false; - // Now get the other entry that we were waiting on - EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 5); - } + Verifier(GetParam()) + .Expect(5, expected_cq_result) + .Verify(cq_.get(), ignore_cq_result); cli_stream->Read(&recv_response, tag(6)); - verif.Expect(6, expected_cq_result); - got_tag = verif.Next(cq_.get(), ignore_cq_result); - GPR_ASSERT((got_tag == 6) || (got_tag == 11 && want_done_tag)); - if (got_tag == 11) { - EXPECT_TRUE(srv_ctx.IsCancelled()); - want_done_tag = false; - // Now get the other entry that we were waiting on - EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 6); - } + Verifier(GetParam()) + .Expect(6, expected_cq_result) + .Verify(cq_.get(), ignore_cq_result); // This is expected to succeed in all cases cli_stream->WritesDone(tag(7)); - verif.Expect(7, true); - got_tag = verif.Next(cq_.get(), ignore_cq_result); - GPR_ASSERT((got_tag == 7) || (got_tag == 11 && want_done_tag)); - if (got_tag == 11) { - EXPECT_TRUE(srv_ctx.IsCancelled()); - want_done_tag = false; - // Now get the other entry that we were waiting on - EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 7); - } + Verifier(GetParam()).Expect(7, true).Verify(cq_.get()); // This is expected to fail in all cases i.e for all values of // server_try_cancel. This is because at this point, either there are no // more msgs from the client (because client called WritesDone) or the RPC // is cancelled on the server srv_stream.Read(&recv_request, tag(8)); - verif.Expect(8, false); - got_tag = verif.Next(cq_.get(), ignore_cq_result); - GPR_ASSERT((got_tag == 8) || (got_tag == 11 && want_done_tag)); - if (got_tag == 11) { - EXPECT_TRUE(srv_ctx.IsCancelled()); - want_done_tag = false; - // Now get the other entry that we were waiting on - EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 8); - } + Verifier(GetParam()).Expect(8, false).Verify(cq_.get()); if (server_try_cancel_thd != NULL) { server_try_cancel_thd->join(); @@ -1228,15 +1116,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { } if (server_try_cancel == CANCEL_AFTER_PROCESSING) { - srv_ctx.TryCancel(); - want_done_tag = true; - verif.Expect(11, true); - } - - if (want_done_tag) { - verif.Verify(cq_.get()); - EXPECT_TRUE(srv_ctx.IsCancelled()); - want_done_tag = false; + ServerTryCancel(&srv_ctx); } // The RPC has been cancelled at this point for sure (i.e irrespective of -- cgit v1.2.3