diff options
Diffstat (limited to 'test')
32 files changed, 745 insertions, 955 deletions
diff --git a/test/compiler/python_plugin_test.py b/test/compiler/python_plugin_test.py deleted file mode 100644 index 0e58d912b9..0000000000 --- a/test/compiler/python_plugin_test.py +++ /dev/null @@ -1,537 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import argparse -import contextlib -import errno -import itertools -import os -import shutil -import subprocess -import sys -import tempfile -import threading -import time -import unittest - -from grpc.framework.alpha import exceptions -from grpc.framework.foundation import future - -# Identifiers of entities we expect to find in the generated module. -SERVICER_IDENTIFIER = 'EarlyAdopterTestServiceServicer' -SERVER_IDENTIFIER = 'EarlyAdopterTestServiceServer' -STUB_IDENTIFIER = 'EarlyAdopterTestServiceStub' -SERVER_FACTORY_IDENTIFIER = 'early_adopter_create_TestService_server' -STUB_FACTORY_IDENTIFIER = 'early_adopter_create_TestService_stub' - -# The timeout used in tests of RPCs that are supposed to expire. -SHORT_TIMEOUT = 2 -# The timeout used in tests of RPCs that are not supposed to expire. The -# absurdly large value doesn't matter since no passing execution of this test -# module will ever wait the duration. -LONG_TIMEOUT = 600 -NO_DELAY = 0 - -# Build mode environment variable set by tools/run_tests/run_tests.py. -_build_mode = os.environ['CONFIG'] - - -class _ServicerMethods(object): - - def __init__(self, test_pb2, delay): - self._condition = threading.Condition() - self._delay = delay - self._paused = False - self._fail = False - self._test_pb2 = test_pb2 - - @contextlib.contextmanager - def pause(self): # pylint: disable=invalid-name - with self._condition: - self._paused = True - yield - with self._condition: - self._paused = False - self._condition.notify_all() - - @contextlib.contextmanager - def fail(self): # pylint: disable=invalid-name - with self._condition: - self._fail = True - yield - with self._condition: - self._fail = False - - def _control(self): # pylint: disable=invalid-name - with self._condition: - if self._fail: - raise ValueError() - while self._paused: - self._condition.wait() - time.sleep(self._delay) - - def UnaryCall(self, request, unused_rpc_context): - response = self._test_pb2.SimpleResponse() - response.payload.payload_type = self._test_pb2.COMPRESSABLE - response.payload.payload_compressable = 'a' * request.response_size - self._control() - return response - - def StreamingOutputCall(self, request, unused_rpc_context): - for parameter in request.response_parameters: - response = self._test_pb2.StreamingOutputCallResponse() - response.payload.payload_type = self._test_pb2.COMPRESSABLE - response.payload.payload_compressable = 'a' * parameter.size - self._control() - yield response - - def StreamingInputCall(self, request_iter, unused_rpc_context): - response = self._test_pb2.StreamingInputCallResponse() - aggregated_payload_size = 0 - for request in request_iter: - aggregated_payload_size += len(request.payload.payload_compressable) - response.aggregated_payload_size = aggregated_payload_size - self._control() - return response - - def FullDuplexCall(self, request_iter, unused_rpc_context): - for request in request_iter: - for parameter in request.response_parameters: - response = self._test_pb2.StreamingOutputCallResponse() - response.payload.payload_type = self._test_pb2.COMPRESSABLE - response.payload.payload_compressable = 'a' * parameter.size - self._control() - yield response - - def HalfDuplexCall(self, request_iter, unused_rpc_context): - responses = [] - for request in request_iter: - for parameter in request.response_parameters: - response = self._test_pb2.StreamingOutputCallResponse() - response.payload.payload_type = self._test_pb2.COMPRESSABLE - response.payload.payload_compressable = 'a' * parameter.size - self._control() - responses.append(response) - for response in responses: - yield response - - -@contextlib.contextmanager -def _CreateService(test_pb2, delay): - """Provides a servicer backend and a stub. - - The servicer is just the implementation - of the actual servicer passed to the face player of the python RPC - implementation; the two are detached. - - Non-zero delay puts a delay on each call to the servicer, representative of - communication latency. Timeout is the default timeout for the stub while - waiting for the service. - - Args: - test_pb2: The test_pb2 module generated by this test. - delay: Delay in seconds per response from the servicer. - - Yields: - A (servicer_methods, servicer, stub) three-tuple where servicer_methods is - the back-end of the service bound to the stub and the server and stub - are both activated and ready for use. - """ - servicer_methods = _ServicerMethods(test_pb2, delay) - - class Servicer(getattr(test_pb2, SERVICER_IDENTIFIER)): - - def UnaryCall(self, request, context): - return servicer_methods.UnaryCall(request, context) - - def StreamingOutputCall(self, request, context): - return servicer_methods.StreamingOutputCall(request, context) - - def StreamingInputCall(self, request_iter, context): - return servicer_methods.StreamingInputCall(request_iter, context) - - def FullDuplexCall(self, request_iter, context): - return servicer_methods.FullDuplexCall(request_iter, context) - - def HalfDuplexCall(self, request_iter, context): - return servicer_methods.HalfDuplexCall(request_iter, context) - - servicer = Servicer() - server = getattr( - test_pb2, SERVER_FACTORY_IDENTIFIER)(servicer, 0) - with server: - port = server.port() - stub = getattr(test_pb2, STUB_FACTORY_IDENTIFIER)('localhost', port) - with stub: - yield servicer_methods, stub, server - - -def _streaming_input_request_iterator(test_pb2): - for _ in range(3): - request = test_pb2.StreamingInputCallRequest() - request.payload.payload_type = test_pb2.COMPRESSABLE - request.payload.payload_compressable = 'a' - yield request - - -def _streaming_output_request(test_pb2): - request = test_pb2.StreamingOutputCallRequest() - sizes = [1, 2, 3] - request.response_parameters.add(size=sizes[0], interval_us=0) - request.response_parameters.add(size=sizes[1], interval_us=0) - request.response_parameters.add(size=sizes[2], interval_us=0) - return request - - -def _full_duplex_request_iterator(test_pb2): - request = test_pb2.StreamingOutputCallRequest() - request.response_parameters.add(size=1, interval_us=0) - yield request - request = test_pb2.StreamingOutputCallRequest() - request.response_parameters.add(size=2, interval_us=0) - request.response_parameters.add(size=3, interval_us=0) - yield request - - -class PythonPluginTest(unittest.TestCase): - """Test case for the gRPC Python protoc-plugin. - - While reading these tests, remember that the futures API - (`stub.method.async()`) only gives futures for the *non-streaming* responses, - else it behaves like its blocking cousin. - """ - - def setUp(self): - protoc_command = '../../bins/%s/protobuf/protoc' % _build_mode - protoc_plugin_filename = '../../bins/%s/grpc_python_plugin' % _build_mode - test_proto_filename = './test.proto' - if not os.path.isfile(protoc_command): - # Assume that if we haven't built protoc that it's on the system. - protoc_command = 'protoc' - - # Ensure that the output directory exists. - self.outdir = tempfile.mkdtemp() - - # Invoke protoc with the plugin. - cmd = [ - protoc_command, - '--plugin=protoc-gen-python-grpc=%s' % protoc_plugin_filename, - '-I %s' % os.path.dirname(test_proto_filename), - '--python_out=%s' % self.outdir, - '--python-grpc_out=%s' % self.outdir, - os.path.basename(test_proto_filename), - ] - subprocess.call(' '.join(cmd), shell=True) - sys.path.append(self.outdir) - - def tearDown(self): - try: - shutil.rmtree(self.outdir) - except OSError as exc: - if exc.errno != errno.ENOENT: - raise - - # TODO(atash): Figure out which of these tests is hanging flakily with small - # probability. - - def testImportAttributes(self): - # check that we can access the generated module and its members. - import test_pb2 # pylint: disable=g-import-not-at-top - self.assertIsNotNone(getattr(test_pb2, SERVICER_IDENTIFIER, None)) - self.assertIsNotNone(getattr(test_pb2, SERVER_IDENTIFIER, None)) - self.assertIsNotNone(getattr(test_pb2, STUB_IDENTIFIER, None)) - self.assertIsNotNone(getattr(test_pb2, SERVER_FACTORY_IDENTIFIER, None)) - self.assertIsNotNone(getattr(test_pb2, STUB_FACTORY_IDENTIFIER, None)) - - def testUpDown(self): - import test_pb2 - with _CreateService( - test_pb2, NO_DELAY) as (servicer, stub, unused_server): - request = test_pb2.SimpleRequest(response_size=13) - - def testUnaryCall(self): - import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): - timeout = 6 # TODO(issue 2039): LONG_TIMEOUT like the other methods. - request = test_pb2.SimpleRequest(response_size=13) - response = stub.UnaryCall(request, timeout) - expected_response = methods.UnaryCall(request, 'not a real RpcContext!') - self.assertEqual(expected_response, response) - - def testUnaryCallAsync(self): - import test_pb2 # pylint: disable=g-import-not-at-top - request = test_pb2.SimpleRequest(response_size=13) - with _CreateService(test_pb2, NO_DELAY) as ( - methods, stub, unused_server): - # Check that the call does not block waiting for the server to respond. - with methods.pause(): - response_future = stub.UnaryCall.async(request, LONG_TIMEOUT) - response = response_future.result() - expected_response = methods.UnaryCall(request, 'not a real RpcContext!') - self.assertEqual(expected_response, response) - - def testUnaryCallAsyncExpired(self): - import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, NO_DELAY) as ( - methods, stub, unused_server): - request = test_pb2.SimpleRequest(response_size=13) - with methods.pause(): - response_future = stub.UnaryCall.async(request, SHORT_TIMEOUT) - with self.assertRaises(exceptions.ExpirationError): - response_future.result() - - @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' - 'forever and fix.') - def testUnaryCallAsyncCancelled(self): - import test_pb2 # pylint: disable=g-import-not-at-top - request = test_pb2.SimpleRequest(response_size=13) - with _CreateService(test_pb2, NO_DELAY) as ( - methods, stub, unused_server): - with methods.pause(): - response_future = stub.UnaryCall.async(request, 1) - response_future.cancel() - self.assertTrue(response_future.cancelled()) - - def testUnaryCallAsyncFailed(self): - import test_pb2 # pylint: disable=g-import-not-at-top - request = test_pb2.SimpleRequest(response_size=13) - with _CreateService(test_pb2, NO_DELAY) as ( - methods, stub, unused_server): - with methods.fail(): - response_future = stub.UnaryCall.async(request, LONG_TIMEOUT) - self.assertIsNotNone(response_future.exception()) - - def testStreamingOutputCall(self): - import test_pb2 # pylint: disable=g-import-not-at-top - request = _streaming_output_request(test_pb2) - with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): - responses = stub.StreamingOutputCall(request, LONG_TIMEOUT) - expected_responses = methods.StreamingOutputCall( - request, 'not a real RpcContext!') - for expected_response, response in itertools.izip_longest( - expected_responses, responses): - self.assertEqual(expected_response, response) - - @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' - 'forever and fix.') - def testStreamingOutputCallExpired(self): - import test_pb2 # pylint: disable=g-import-not-at-top - request = _streaming_output_request(test_pb2) - with _CreateService(test_pb2, NO_DELAY) as ( - methods, stub, unused_server): - with methods.pause(): - responses = stub.StreamingOutputCall(request, SHORT_TIMEOUT) - with self.assertRaises(exceptions.ExpirationError): - list(responses) - - @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' - 'forever and fix.') - def testStreamingOutputCallCancelled(self): - import test_pb2 # pylint: disable=g-import-not-at-top - request = _streaming_output_request(test_pb2) - with _CreateService(test_pb2, NO_DELAY) as ( - unused_methods, stub, unused_server): - responses = stub.StreamingOutputCall(request, SHORT_TIMEOUT) - next(responses) - responses.cancel() - with self.assertRaises(future.CancelledError): - next(responses) - - @unittest.skip('TODO(atash,nathaniel): figure out why this times out ' - 'instead of raising the proper error.') - def testStreamingOutputCallFailed(self): - import test_pb2 # pylint: disable=g-import-not-at-top - request = _streaming_output_request(test_pb2) - with _CreateService(test_pb2, NO_DELAY) as ( - methods, stub, unused_server): - with methods.fail(): - responses = stub.StreamingOutputCall(request, 1) - self.assertIsNotNone(responses) - with self.assertRaises(exceptions.ServicerError): - next(responses) - - @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' - 'forever and fix.') - def testStreamingInputCall(self): - import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): - response = stub.StreamingInputCall( - _streaming_input_request_iterator(test_pb2), LONG_TIMEOUT) - expected_response = methods.StreamingInputCall( - _streaming_input_request_iterator(test_pb2), 'not a real RpcContext!') - self.assertEqual(expected_response, response) - - def testStreamingInputCallAsync(self): - import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, NO_DELAY) as ( - methods, stub, unused_server): - with methods.pause(): - response_future = stub.StreamingInputCall.async( - _streaming_input_request_iterator(test_pb2), LONG_TIMEOUT) - response = response_future.result() - expected_response = methods.StreamingInputCall( - _streaming_input_request_iterator(test_pb2), 'not a real RpcContext!') - self.assertEqual(expected_response, response) - - def testStreamingInputCallAsyncExpired(self): - import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, NO_DELAY) as ( - methods, stub, unused_server): - with methods.pause(): - response_future = stub.StreamingInputCall.async( - _streaming_input_request_iterator(test_pb2), SHORT_TIMEOUT) - with self.assertRaises(exceptions.ExpirationError): - response_future.result() - self.assertIsInstance( - response_future.exception(), exceptions.ExpirationError) - - def testStreamingInputCallAsyncCancelled(self): - import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, NO_DELAY) as ( - methods, stub, unused_server): - with methods.pause(): - timeout = 6 # TODO(issue 2039): LONG_TIMEOUT like the other methods. - response_future = stub.StreamingInputCall.async( - _streaming_input_request_iterator(test_pb2), timeout) - response_future.cancel() - self.assertTrue(response_future.cancelled()) - with self.assertRaises(future.CancelledError): - response_future.result() - - def testStreamingInputCallAsyncFailed(self): - import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, NO_DELAY) as ( - methods, stub, unused_server): - with methods.fail(): - response_future = stub.StreamingInputCall.async( - _streaming_input_request_iterator(test_pb2), SHORT_TIMEOUT) - self.assertIsNotNone(response_future.exception()) - - def testFullDuplexCall(self): - import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): - responses = stub.FullDuplexCall( - _full_duplex_request_iterator(test_pb2), LONG_TIMEOUT) - expected_responses = methods.FullDuplexCall( - _full_duplex_request_iterator(test_pb2), 'not a real RpcContext!') - for expected_response, response in itertools.izip_longest( - expected_responses, responses): - self.assertEqual(expected_response, response) - - @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' - 'forever and fix.') - def testFullDuplexCallExpired(self): - import test_pb2 # pylint: disable=g-import-not-at-top - request_iterator = _full_duplex_request_iterator(test_pb2) - with _CreateService(test_pb2, NO_DELAY) as ( - methods, stub, unused_server): - with methods.pause(): - responses = stub.FullDuplexCall(request_iterator, SHORT_TIMEOUT) - with self.assertRaises(exceptions.ExpirationError): - list(responses) - - @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' - 'forever and fix.') - def testFullDuplexCallCancelled(self): - import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): - request_iterator = _full_duplex_request_iterator(test_pb2) - responses = stub.FullDuplexCall(request_iterator, LONG_TIMEOUT) - next(responses) - responses.cancel() - with self.assertRaises(future.CancelledError): - next(responses) - - @unittest.skip('TODO(atash,nathaniel): figure out why this hangs forever ' - 'and fix.') - def testFullDuplexCallFailed(self): - import test_pb2 # pylint: disable=g-import-not-at-top - request_iterator = _full_duplex_request_iterator(test_pb2) - with _CreateService(test_pb2, NO_DELAY) as ( - methods, stub, unused_server): - with methods.fail(): - responses = stub.FullDuplexCall(request_iterator, LONG_TIMEOUT) - self.assertIsNotNone(responses) - with self.assertRaises(exceptions.ServicerError): - next(responses) - - @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' - 'forever and fix.') - def testHalfDuplexCall(self): - import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, NO_DELAY) as ( - methods, stub, unused_server): - def half_duplex_request_iterator(): - request = test_pb2.StreamingOutputCallRequest() - request.response_parameters.add(size=1, interval_us=0) - yield request - request = test_pb2.StreamingOutputCallRequest() - request.response_parameters.add(size=2, interval_us=0) - request.response_parameters.add(size=3, interval_us=0) - yield request - responses = stub.HalfDuplexCall( - half_duplex_request_iterator(), LONG_TIMEOUT) - expected_responses = methods.HalfDuplexCall( - half_duplex_request_iterator(), 'not a real RpcContext!') - for check in itertools.izip_longest(expected_responses, responses): - expected_response, response = check - self.assertEqual(expected_response, response) - - def testHalfDuplexCallWedged(self): - import test_pb2 # pylint: disable=g-import-not-at-top - condition = threading.Condition() - wait_cell = [False] - @contextlib.contextmanager - def wait(): # pylint: disable=invalid-name - # Where's Python 3's 'nonlocal' statement when you need it? - with condition: - wait_cell[0] = True - yield - with condition: - wait_cell[0] = False - condition.notify_all() - def half_duplex_request_iterator(): - request = test_pb2.StreamingOutputCallRequest() - request.response_parameters.add(size=1, interval_us=0) - yield request - with condition: - while wait_cell[0]: - condition.wait() - with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): - with wait(): - responses = stub.HalfDuplexCall( - half_duplex_request_iterator(), SHORT_TIMEOUT) - # half-duplex waits for the client to send all info - with self.assertRaises(exceptions.ExpirationError): - next(responses) - - -if __name__ == '__main__': - os.chdir(os.path.dirname(sys.argv[0])) - unittest.main(verbosity=2) diff --git a/test/compiler/test.proto b/test/compiler/test.proto deleted file mode 100644 index ed7c6a7b79..0000000000 --- a/test/compiler/test.proto +++ /dev/null @@ -1,139 +0,0 @@ -// Copyright 2015, Google Inc. -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are -// met: -// -// * Redistributions of source code must retain the above copyright -// notice, this list of conditions and the following disclaimer. -// * Redistributions in binary form must reproduce the above -// copyright notice, this list of conditions and the following disclaimer -// in the documentation and/or other materials provided with the -// distribution. -// * Neither the name of Google Inc. nor the names of its -// contributors may be used to endorse or promote products derived from -// this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -// An integration test service that covers all the method signature permutations -// of unary/streaming requests/responses. -// This file is duplicated around the code base. See GitHub issue #526. -syntax = "proto2"; - -package grpc.testing; - -enum PayloadType { - // Compressable text format. - COMPRESSABLE= 1; - - // Uncompressable binary format. - UNCOMPRESSABLE = 2; - - // Randomly chosen from all other formats defined in this enum. - RANDOM = 3; -} - -message Payload { - required PayloadType payload_type = 1; - oneof payload_body { - string payload_compressable = 2; - bytes payload_uncompressable = 3; - } -} - -message SimpleRequest { - // Desired payload type in the response from the server. - // If response_type is RANDOM, server randomly chooses one from other formats. - optional PayloadType response_type = 1 [default=COMPRESSABLE]; - - // Desired payload size in the response from the server. - // If response_type is COMPRESSABLE, this denotes the size before compression. - optional int32 response_size = 2; - - // Optional input payload sent along with the request. - optional Payload payload = 3; -} - -message SimpleResponse { - optional Payload payload = 1; -} - -message StreamingInputCallRequest { - // Optional input payload sent along with the request. - optional Payload payload = 1; - - // Not expecting any payload from the response. -} - -message StreamingInputCallResponse { - // Aggregated size of payloads received from the client. - optional int32 aggregated_payload_size = 1; -} - -message ResponseParameters { - // Desired payload sizes in responses from the server. - // If response_type is COMPRESSABLE, this denotes the size before compression. - required int32 size = 1; - - // Desired interval between consecutive responses in the response stream in - // microseconds. - required int32 interval_us = 2; -} - -message StreamingOutputCallRequest { - // Desired payload type in the response from the server. - // If response_type is RANDOM, the payload from each response in the stream - // might be of different types. This is to simulate a mixed type of payload - // stream. - optional PayloadType response_type = 1 [default=COMPRESSABLE]; - - repeated ResponseParameters response_parameters = 2; - - // Optional input payload sent along with the request. - optional Payload payload = 3; -} - -message StreamingOutputCallResponse { - optional Payload payload = 1; -} - -service TestService { - // One request followed by one response. - // The server returns the client payload as-is. - rpc UnaryCall(SimpleRequest) returns (SimpleResponse); - - // One request followed by a sequence of responses (streamed download). - // The server returns the payload with client desired type and sizes. - rpc StreamingOutputCall(StreamingOutputCallRequest) - returns (stream StreamingOutputCallResponse); - - // A sequence of requests followed by one response (streamed upload). - // The server returns the aggregated size of client payload as the result. - rpc StreamingInputCall(stream StreamingInputCallRequest) - returns (StreamingInputCallResponse); - - // A sequence of requests with each request served by the server immediately. - // As one request could lead to multiple responses, this interface - // demonstrates the idea of full duplexing. - rpc FullDuplexCall(stream StreamingOutputCallRequest) - returns (stream StreamingOutputCallResponse); - - // A sequence of requests followed by a sequence of responses. - // The server buffers all the client requests and then serves them in order. A - // stream of responses are returned to the client when the server starts with - // first request. - rpc HalfDuplexCall(stream StreamingOutputCallRequest) - returns (stream StreamingOutputCallResponse); -} diff --git a/test/core/end2end/end2end_tests.h b/test/core/end2end/end2end_tests.h index a18c702951..3f1665613c 100644 --- a/test/core/end2end/end2end_tests.h +++ b/test/core/end2end/end2end_tests.h @@ -43,6 +43,8 @@ typedef struct grpc_end2end_test_config grpc_end2end_test_config; #define FEATURE_MASK_SUPPORTS_HOSTNAME_VERIFICATION 2 #define FEATURE_MASK_SUPPORTS_PER_CALL_CREDENTIALS 4 +#define FAIL_AUTH_CHECK_SERVER_ARG_NAME "fail_auth_check" + struct grpc_end2end_test_fixture { grpc_completion_queue *cq; grpc_server *server; diff --git a/test/core/end2end/fixtures/chttp2_fake_security.c b/test/core/end2end/fixtures/chttp2_fake_security.c index f879b43f79..78b692a45d 100644 --- a/test/core/end2end/fixtures/chttp2_fake_security.c +++ b/test/core/end2end/fixtures/chttp2_fake_security.c @@ -65,6 +65,14 @@ static grpc_end2end_test_fixture chttp2_create_fixture_secure_fullstack( return f; } +static void process_auth_failure(void *state, grpc_auth_context *ctx, + const grpc_metadata *md, size_t md_count, + grpc_process_auth_metadata_done_cb cb, + void *user_data) { + GPR_ASSERT(state == NULL); + cb(user_data, NULL, 0, 0); +} + static void chttp2_init_client_secure_fullstack(grpc_end2end_test_fixture *f, grpc_channel_args *client_args, grpc_credentials *creds) { @@ -102,10 +110,27 @@ static void chttp2_init_client_fake_secure_fullstack( chttp2_init_client_secure_fullstack(f, client_args, fake_ts_creds); } +static int fail_server_auth_check(grpc_channel_args *server_args) { + size_t i; + if (server_args == NULL) return 0; + for (i = 0; i < server_args->num_args; i++) { + if (strcmp(server_args->args[i].key, FAIL_AUTH_CHECK_SERVER_ARG_NAME) == + 0) { + return 1; + } + } + return 0; +} + static void chttp2_init_server_fake_secure_fullstack( grpc_end2end_test_fixture *f, grpc_channel_args *server_args) { grpc_server_credentials *fake_ts_creds = grpc_fake_transport_security_server_credentials_create(); + if (fail_server_auth_check(server_args)) { + grpc_auth_metadata_processor processor = {process_auth_failure, NULL}; + grpc_server_credentials_set_auth_metadata_processor(fake_ts_creds, + processor); + } chttp2_init_server_secure_fullstack(f, server_args, fake_ts_creds); } diff --git a/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c b/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c index 6d5669d05a..9850aac69b 100644 --- a/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c +++ b/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c @@ -68,6 +68,14 @@ static grpc_end2end_test_fixture chttp2_create_fixture_secure_fullstack( return f; } +static void process_auth_failure(void *state, grpc_auth_context *ctx, + const grpc_metadata *md, size_t md_count, + grpc_process_auth_metadata_done_cb cb, + void *user_data) { + GPR_ASSERT(state == NULL); + cb(user_data, NULL, 0, 0); +} + static void chttp2_init_client_secure_fullstack(grpc_end2end_test_fixture *f, grpc_channel_args *client_args, grpc_credentials *creds) { @@ -110,12 +118,28 @@ static void chttp2_init_client_simple_ssl_secure_fullstack( grpc_channel_args_destroy(new_client_args); } +static int fail_server_auth_check(grpc_channel_args *server_args) { + size_t i; + if (server_args == NULL) return 0; + for (i = 0; i < server_args->num_args; i++) { + if (strcmp(server_args->args[i].key, FAIL_AUTH_CHECK_SERVER_ARG_NAME) == + 0) { + return 1; + } + } + return 0; +} + static void chttp2_init_server_simple_ssl_secure_fullstack( grpc_end2end_test_fixture *f, grpc_channel_args *server_args) { grpc_ssl_pem_key_cert_pair pem_cert_key_pair = {test_server1_key, test_server1_cert}; grpc_server_credentials *ssl_creds = grpc_ssl_server_credentials_create(NULL, &pem_cert_key_pair, 1, 0); + if (fail_server_auth_check(server_args)) { + grpc_auth_metadata_processor processor = {process_auth_failure, NULL}; + grpc_server_credentials_set_auth_metadata_processor(ssl_creds, processor); + } chttp2_init_server_secure_fullstack(f, server_args, ssl_creds); } diff --git a/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack_with_poll.c b/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack_with_poll.c index d0cc3dd74a..3df2acd296 100644 --- a/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack_with_poll.c +++ b/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack_with_poll.c @@ -68,6 +68,14 @@ static grpc_end2end_test_fixture chttp2_create_fixture_secure_fullstack( return f; } +static void process_auth_failure(void *state, grpc_auth_context *ctx, + const grpc_metadata *md, size_t md_count, + grpc_process_auth_metadata_done_cb cb, + void *user_data) { + GPR_ASSERT(state == NULL); + cb(user_data, NULL, 0, 0); +} + static void chttp2_init_client_secure_fullstack(grpc_end2end_test_fixture *f, grpc_channel_args *client_args, grpc_credentials *creds) { @@ -110,12 +118,28 @@ static void chttp2_init_client_simple_ssl_secure_fullstack( grpc_channel_args_destroy(new_client_args); } +static int fail_server_auth_check(grpc_channel_args *server_args) { + size_t i; + if (server_args == NULL) return 0; + for (i = 0; i < server_args->num_args; i++) { + if (strcmp(server_args->args[i].key, FAIL_AUTH_CHECK_SERVER_ARG_NAME) == + 0) { + return 1; + } + } + return 0; +} + static void chttp2_init_server_simple_ssl_secure_fullstack( grpc_end2end_test_fixture *f, grpc_channel_args *server_args) { grpc_ssl_pem_key_cert_pair pem_cert_key_pair = {test_server1_key, test_server1_cert}; grpc_server_credentials *ssl_creds = grpc_ssl_server_credentials_create(NULL, &pem_cert_key_pair, 1, 0); + if (fail_server_auth_check(server_args)) { + grpc_auth_metadata_processor processor = {process_auth_failure, NULL}; + grpc_server_credentials_set_auth_metadata_processor(ssl_creds, processor); + } chttp2_init_server_secure_fullstack(f, server_args, ssl_creds); } diff --git a/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack_with_proxy.c b/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack_with_proxy.c index 46a64de6c5..4d77039cac 100644 --- a/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack_with_proxy.c +++ b/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack_with_proxy.c @@ -96,6 +96,14 @@ static grpc_end2end_test_fixture chttp2_create_fixture_secure_fullstack( return f; } +static void process_auth_failure(void *state, grpc_auth_context *ctx, + const grpc_metadata *md, size_t md_count, + grpc_process_auth_metadata_done_cb cb, + void *user_data) { + GPR_ASSERT(state == NULL); + cb(user_data, NULL, 0, 0); +} + static void chttp2_init_client_secure_fullstack(grpc_end2end_test_fixture *f, grpc_channel_args *client_args, grpc_credentials *creds) { @@ -139,12 +147,28 @@ static void chttp2_init_client_simple_ssl_secure_fullstack( grpc_channel_args_destroy(new_client_args); } +static int fail_server_auth_check(grpc_channel_args *server_args) { + size_t i; + if (server_args == NULL) return 0; + for (i = 0; i < server_args->num_args; i++) { + if (strcmp(server_args->args[i].key, FAIL_AUTH_CHECK_SERVER_ARG_NAME) == + 0) { + return 1; + } + } + return 0; +} + static void chttp2_init_server_simple_ssl_secure_fullstack( grpc_end2end_test_fixture *f, grpc_channel_args *server_args) { grpc_ssl_pem_key_cert_pair pem_cert_key_pair = {test_server1_key, test_server1_cert}; grpc_server_credentials *ssl_creds = grpc_ssl_server_credentials_create(NULL, &pem_cert_key_pair, 1, 0); + if (fail_server_auth_check(server_args)) { + grpc_auth_metadata_processor processor = {process_auth_failure, NULL}; + grpc_server_credentials_set_auth_metadata_processor(ssl_creds, processor); + } chttp2_init_server_secure_fullstack(f, server_args, ssl_creds); } diff --git a/test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c b/test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c index f74ed9365f..284d5f07ae 100644 --- a/test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c +++ b/test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c @@ -46,10 +46,54 @@ #include "test/core/util/port.h" #include "test/core/end2end/data/ssl_test_data.h" +static const char oauth2_md[] = "Bearer aaslkfjs424535asdf"; +static const char *client_identity_property_name = "smurf_name"; +static const char *client_identity = "Brainy Smurf"; + typedef struct fullstack_secure_fixture_data { char *localaddr; } fullstack_secure_fixture_data; +static const grpc_metadata *find_metadata(const grpc_metadata *md, + size_t md_count, + const char *key, + const char *value) { + size_t i; + for (i = 0; i < md_count; i++) { + if (strcmp(key, md[i].key) == 0 && strlen(value) == md[i].value_length && + memcmp(md[i].value, value, md[i].value_length) == 0) { + return &md[i]; + } + } + return NULL; +} + +static void process_oauth2_success(void *state, grpc_auth_context *ctx, + const grpc_metadata *md, size_t md_count, + grpc_process_auth_metadata_done_cb cb, + void *user_data) { + const grpc_metadata *oauth2 = + find_metadata(md, md_count, "Authorization", oauth2_md); + GPR_ASSERT(state == NULL); + GPR_ASSERT(oauth2 != NULL); + grpc_auth_context_add_cstring_property(ctx, client_identity_property_name, + client_identity); + GPR_ASSERT(grpc_auth_context_set_peer_identity_property_name( + ctx, client_identity_property_name) == 1); + cb(user_data, oauth2, 1, 1); +} + +static void process_oauth2_failure(void *state, grpc_auth_context *ctx, + const grpc_metadata *md, size_t md_count, + grpc_process_auth_metadata_done_cb cb, + void *user_data) { + const grpc_metadata *oauth2 = + find_metadata(md, md_count, "Authorization", oauth2_md); + GPR_ASSERT(state == NULL); + GPR_ASSERT(oauth2 != NULL); + cb(user_data, oauth2, 1, 0); +} + static grpc_end2end_test_fixture chttp2_create_fixture_secure_fullstack( grpc_channel_args *client_args, grpc_channel_args *server_args) { grpc_end2end_test_fixture f; @@ -101,7 +145,7 @@ static void chttp2_init_client_simple_ssl_with_oauth2_secure_fullstack( grpc_credentials *ssl_creds = grpc_ssl_credentials_create(test_root_cert, NULL); grpc_credentials *oauth2_creds = - grpc_fake_oauth2_credentials_create("Bearer aaslkfjs424535asdf", 1); + grpc_md_only_test_credentials_create("Authorization", oauth2_md, 1); grpc_credentials *ssl_oauth2_creds = grpc_composite_credentials_create(ssl_creds, oauth2_creds); grpc_arg ssl_name_override = {GRPC_ARG_STRING, @@ -115,12 +159,32 @@ static void chttp2_init_client_simple_ssl_with_oauth2_secure_fullstack( grpc_credentials_release(oauth2_creds); } +static int fail_server_auth_check(grpc_channel_args *server_args) { + size_t i; + if (server_args == NULL) return 0; + for (i = 0; i < server_args->num_args; i++) { + if (strcmp(server_args->args[i].key, FAIL_AUTH_CHECK_SERVER_ARG_NAME) == + 0) { + return 1; + } + } + return 0; +} + static void chttp2_init_server_simple_ssl_secure_fullstack( grpc_end2end_test_fixture *f, grpc_channel_args *server_args) { grpc_ssl_pem_key_cert_pair pem_key_cert_pair = {test_server1_key, test_server1_cert}; grpc_server_credentials *ssl_creds = grpc_ssl_server_credentials_create(NULL, &pem_key_cert_pair, 1, 0); + grpc_auth_metadata_processor processor; + processor.state = NULL; + if (fail_server_auth_check(server_args)) { + processor.process = process_oauth2_failure; + } else { + processor.process = process_oauth2_success; + } + grpc_server_credentials_set_auth_metadata_processor(ssl_creds, processor); chttp2_init_server_secure_fullstack(f, server_args, ssl_creds); } diff --git a/test/core/end2end/fixtures/chttp2_socket_pair_with_grpc_trace.c b/test/core/end2end/fixtures/chttp2_socket_pair_with_grpc_trace.c index c59628b959..9d798ad1d2 100644 --- a/test/core/end2end/fixtures/chttp2_socket_pair_with_grpc_trace.c +++ b/test/core/end2end/fixtures/chttp2_socket_pair_with_grpc_trace.c @@ -148,6 +148,11 @@ int main(int argc, char **argv) { /* force tracing on, with a value to force many code paths in trace.c to be taken */ gpr_setenv("GRPC_TRACE", "doesnt-exist,http,all"); +#ifdef GPR_POSIX_SOCKET + g_fixture_slowdown_factor = isatty(STDOUT_FILENO) ? 10.0 : 1.0; +#else + g_fixture_slowdown_factor = 10.0; +#endif grpc_test_init(argc, argv); grpc_init(); diff --git a/test/core/end2end/gen_build_json.py b/test/core/end2end/gen_build_json.py index c6e869364c..6f10b78dad 100755 --- a/test/core/end2end/gen_build_json.py +++ b/test/core/end2end/gen_build_json.py @@ -36,27 +36,27 @@ import simplejson import collections -FixtureOptions = collections.namedtuple('FixtureOptions', 'fullstack includes_proxy dns_resolver secure platforms') -default_unsecure_fixture_options = FixtureOptions(True, False, True, False, ['windows', 'posix']) +FixtureOptions = collections.namedtuple('FixtureOptions', 'fullstack includes_proxy dns_resolver secure platforms ci_mac') +default_unsecure_fixture_options = FixtureOptions(True, False, True, False, ['windows', 'linux', 'mac', 'posix'], True) socketpair_unsecure_fixture_options = default_unsecure_fixture_options._replace(fullstack=False, dns_resolver=False) default_secure_fixture_options = default_unsecure_fixture_options._replace(secure=True) -uds_fixture_options = default_unsecure_fixture_options._replace(dns_resolver=False, platforms=['posix']) +uds_fixture_options = default_unsecure_fixture_options._replace(dns_resolver=False, platforms=['linux', 'mac', 'posix']) # maps fixture name to whether it requires the security library END2END_FIXTURES = { - 'chttp2_fake_security': default_secure_fixture_options, + 'chttp2_fake_security': default_secure_fixture_options._replace(ci_mac=False), 'chttp2_fullstack': default_unsecure_fixture_options, 'chttp2_fullstack_compression': default_unsecure_fixture_options, 'chttp2_fullstack_uds_posix': uds_fixture_options, - 'chttp2_fullstack_uds_posix_with_poll': uds_fixture_options, - 'chttp2_fullstack_with_poll': default_unsecure_fixture_options._replace(platforms=['posix']), - 'chttp2_fullstack_with_proxy': default_unsecure_fixture_options._replace(includes_proxy=True), + 'chttp2_fullstack_uds_posix_with_poll': uds_fixture_options._replace(platforms=['linux']), + 'chttp2_fullstack_with_poll': default_unsecure_fixture_options._replace(platforms=['linux']), + 'chttp2_fullstack_with_proxy': default_unsecure_fixture_options._replace(includes_proxy=True, ci_mac=False), 'chttp2_simple_ssl_fullstack': default_secure_fixture_options, - 'chttp2_simple_ssl_fullstack_with_poll': default_secure_fixture_options._replace(platforms=['posix']), - 'chttp2_simple_ssl_fullstack_with_proxy': default_secure_fixture_options._replace(includes_proxy=True), - 'chttp2_simple_ssl_with_oauth2_fullstack': default_secure_fixture_options, - 'chttp2_socket_pair': socketpair_unsecure_fixture_options, - 'chttp2_socket_pair_one_byte_at_a_time': socketpair_unsecure_fixture_options, + 'chttp2_simple_ssl_fullstack_with_poll': default_secure_fixture_options._replace(platforms=['linux']), + 'chttp2_simple_ssl_fullstack_with_proxy': default_secure_fixture_options._replace(includes_proxy=True, ci_mac=False), + 'chttp2_simple_ssl_with_oauth2_fullstack': default_secure_fixture_options._replace(ci_mac=False), + 'chttp2_socket_pair': socketpair_unsecure_fixture_options._replace(ci_mac=False), + 'chttp2_socket_pair_one_byte_at_a_time': socketpair_unsecure_fixture_options._replace(ci_mac=False), 'chttp2_socket_pair_with_grpc_trace': socketpair_unsecure_fixture_options, } @@ -115,6 +115,12 @@ def compatible(f, t): return True +def without(l, e): + l = l[:] + l.remove(e) + return l + + def main(): sec_deps = [ 'end2end_certs', @@ -138,7 +144,7 @@ def main(): 'language': 'c', 'secure': 'check' if END2END_FIXTURES[f].secure else 'no', 'src': ['test/core/end2end/fixtures/%s.c' % f], - 'platforms': [ 'posix' ] if f.endswith('_posix') else END2END_FIXTURES[f].platforms, + 'platforms': [ 'linux', 'mac', 'posix' ] if f.endswith('_posix') else END2END_FIXTURES[f].platforms, 'deps': sec_deps if END2END_FIXTURES[f].secure else unsec_deps, 'headers': ['test/core/end2end/end2end_tests.h'], } @@ -173,6 +179,9 @@ def main(): 'src': [], 'flaky': END2END_TESTS[t].flaky, 'platforms': END2END_FIXTURES[f].platforms, + 'ci_platforms': (END2END_FIXTURES[f].platforms + if END2END_FIXTURES[f].ci_mac + else without(END2END_FIXTURES[f].platforms, 'mac')), 'deps': [ 'end2end_fixture_%s' % f, 'end2end_test_%s' % t] + sec_deps @@ -188,6 +197,9 @@ def main(): 'src': [], 'flaky': END2END_TESTS[t].flaky, 'platforms': END2END_FIXTURES[f].platforms, + 'ci_platforms': (END2END_FIXTURES[f].platforms + if END2END_FIXTURES[f].ci_mac + else without(END2END_FIXTURES[f].platforms, 'mac')), 'deps': [ 'end2end_fixture_%s' % f, 'end2end_test_%s' % t] + unsec_deps diff --git a/test/core/end2end/tests/bad_hostname.c b/test/core/end2end/tests/bad_hostname.c index 198ba46cd2..501db89b7b 100644 --- a/test/core/end2end/tests/bad_hostname.c +++ b/test/core/end2end/tests/bad_hostname.c @@ -146,7 +146,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) { cq_expect_completion(cqv, tag(1), 1); cq_verify(cqv); - GPR_ASSERT(status == GRPC_STATUS_UNAUTHENTICATED); + GPR_ASSERT(status == GRPC_STATUS_INVALID_ARGUMENT); gpr_free(details); grpc_metadata_array_destroy(&initial_metadata_recv); diff --git a/test/core/end2end/tests/request_response_with_payload_and_call_creds.c b/test/core/end2end/tests/request_response_with_payload_and_call_creds.c index ba312d0d5c..342dfa03f6 100644 --- a/test/core/end2end/tests/request_response_with_payload_and_call_creds.c +++ b/test/core/end2end/tests/request_response_with_payload_and_call_creds.c @@ -59,13 +59,21 @@ static void *tag(gpr_intptr t) { return (void *)t; } static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, const char *test_name, - grpc_channel_args *client_args, - grpc_channel_args *server_args) { + int fail_server_auth_check) { grpc_end2end_test_fixture f; gpr_log(GPR_INFO, "%s/%s", test_name, config.name); - f = config.create_fixture(client_args, server_args); - config.init_client(&f, client_args); - config.init_server(&f, server_args); + f = config.create_fixture(NULL, NULL); + config.init_client(&f, NULL); + if (fail_server_auth_check) { + grpc_arg fail_auth_arg = { + GRPC_ARG_STRING, FAIL_AUTH_CHECK_SERVER_ARG_NAME, {NULL}}; + grpc_channel_args args; + args.num_args= 1; + args.args = &fail_auth_arg; + config.init_server(&f, &args); + } else { + config.init_server(&f, NULL); + } return f; } @@ -128,7 +136,7 @@ static void test_call_creds_failure(grpc_end2end_test_config config) { grpc_call *c; grpc_credentials *creds = NULL; grpc_end2end_test_fixture f = - begin_test(config, "test_call_creds_failure", NULL, NULL); + begin_test(config, "test_call_creds_failure", 0); gpr_timespec deadline = five_seconds_time(); c = grpc_channel_create_call(f.client, NULL, GRPC_PROPAGATE_DEFAULTS, f.cq, "/foo", "foo.test.google.fr", deadline); @@ -157,9 +165,8 @@ static void request_response_with_payload_and_call_creds( grpc_byte_buffer *response_payload = grpc_raw_byte_buffer_create(&response_payload_slice, 1); gpr_timespec deadline = five_seconds_time(); - - grpc_end2end_test_fixture f = begin_test(config, test_name, NULL, NULL); - cq_verifier *cqv = cq_verifier_create(f.cq); + grpc_end2end_test_fixture f; + cq_verifier *cqv; grpc_op ops[6]; grpc_op *op; grpc_metadata_array initial_metadata_recv; @@ -174,6 +181,10 @@ static void request_response_with_payload_and_call_creds( int was_cancelled = 2; grpc_credentials *creds = NULL; grpc_auth_context *s_auth_context = NULL; + grpc_auth_context *c_auth_context = NULL; + + f = begin_test(config, test_name, 0); + cqv = cq_verifier_create(f.cq); c = grpc_channel_create_call(f.client, NULL, GRPC_PROPAGATE_DEFAULTS, f.cq, "/foo", "foo.test.google.fr", deadline); @@ -241,6 +252,11 @@ static void request_response_with_payload_and_call_creds( print_auth_context(0, s_auth_context); grpc_auth_context_release(s_auth_context); + c_auth_context = grpc_call_auth_context(c); + GPR_ASSERT(c_auth_context != NULL); + print_auth_context(1, c_auth_context); + grpc_auth_context_release(c_auth_context); + /* Cannot set creds on the server call object. */ GPR_ASSERT(grpc_call_set_credentials(s, NULL) != GRPC_CALL_OK); @@ -340,31 +356,120 @@ static void request_response_with_payload_and_call_creds( config.tear_down_data(&f); } -void test_request_response_with_payload_and_call_creds( +static void test_request_response_with_payload_and_call_creds( grpc_end2end_test_config config) { request_response_with_payload_and_call_creds( "test_request_response_with_payload_and_call_creds", config, NONE); } -void test_request_response_with_payload_and_overridden_call_creds( +static void test_request_response_with_payload_and_overridden_call_creds( grpc_end2end_test_config config) { request_response_with_payload_and_call_creds( "test_request_response_with_payload_and_overridden_call_creds", config, OVERRIDE); } -void test_request_response_with_payload_and_deleted_call_creds( +static void test_request_response_with_payload_and_deleted_call_creds( grpc_end2end_test_config config) { request_response_with_payload_and_call_creds( "test_request_response_with_payload_and_deleted_call_creds", config, DESTROY); } +static void test_request_with_server_rejecting_client_creds( + grpc_end2end_test_config config) { + grpc_op ops[6]; + grpc_op *op; + grpc_call *c; + grpc_end2end_test_fixture f; + gpr_timespec deadline = five_seconds_time(); + cq_verifier *cqv; + grpc_metadata_array initial_metadata_recv; + grpc_metadata_array trailing_metadata_recv; + grpc_metadata_array request_metadata_recv; + grpc_call_details call_details; + grpc_status_code status; + char *details = NULL; + size_t details_capacity = 0; + grpc_byte_buffer *response_payload_recv = NULL; + gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world"); + grpc_byte_buffer *request_payload = + grpc_raw_byte_buffer_create(&request_payload_slice, 1); + grpc_credentials *creds; + + f = begin_test(config, "test_request_with_server_rejecting_client_creds", 1); + cqv = cq_verifier_create(f.cq); + + c = grpc_channel_create_call(f.client, NULL, GRPC_PROPAGATE_DEFAULTS, f.cq, + "/foo", "foo.test.google.fr", deadline); + GPR_ASSERT(c); + + creds = grpc_iam_credentials_create(iam_token, iam_selector); + GPR_ASSERT(creds != NULL); + GPR_ASSERT(grpc_call_set_credentials(c, creds) == GRPC_CALL_OK); + grpc_credentials_release(creds); + + grpc_metadata_array_init(&initial_metadata_recv); + grpc_metadata_array_init(&trailing_metadata_recv); + grpc_metadata_array_init(&request_metadata_recv); + grpc_call_details_init(&call_details); + + op = ops; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; + op->data.recv_status_on_client.status = &status; + op->data.recv_status_on_client.status_details = &details; + op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = 0; + op++; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op++; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message = request_payload; + op->flags = 0; + op++; + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; + op++; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; + op++; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message = &response_payload_recv; + op->flags = 0; + op++; + GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); + + cq_expect_completion(cqv, tag(1), 1); + cq_verify(cqv); + + GPR_ASSERT(status == GRPC_STATUS_UNAUTHENTICATED); + + grpc_metadata_array_destroy(&initial_metadata_recv); + grpc_metadata_array_destroy(&trailing_metadata_recv); + grpc_metadata_array_destroy(&request_metadata_recv); + grpc_call_details_destroy(&call_details); + + grpc_byte_buffer_destroy(request_payload); + grpc_byte_buffer_destroy(response_payload_recv); + gpr_free(details); + + grpc_call_destroy(c); + + cq_verifier_destroy(cqv); + end_test(&f); + config.tear_down_data(&f); +} + void grpc_end2end_tests(grpc_end2end_test_config config) { if (config.feature_mask & FEATURE_MASK_SUPPORTS_PER_CALL_CREDENTIALS) { test_call_creds_failure(config); test_request_response_with_payload_and_call_creds(config); test_request_response_with_payload_and_overridden_call_creds(config); test_request_response_with_payload_and_deleted_call_creds(config); + test_request_with_server_rejecting_client_creds(config); } } diff --git a/test/core/httpcli/httpcli_test.c b/test/core/httpcli/httpcli_test.c index 390afcdf63..8dddfbee98 100644 --- a/test/core/httpcli/httpcli_test.c +++ b/test/core/httpcli/httpcli_test.c @@ -81,7 +81,7 @@ static void test_get(int use_ssl, int port) { memset(&req, 0, sizeof(req)); req.host = host; req.path = "/get"; - req.use_ssl = use_ssl; + req.handshaker = use_ssl ? &grpc_httpcli_ssl : &grpc_httpcli_plaintext; grpc_httpcli_get(&g_context, &g_pollset, &req, n_seconds_time(15), on_finish, (void *)42); @@ -107,7 +107,7 @@ static void test_post(int use_ssl, int port) { memset(&req, 0, sizeof(req)); req.host = host; req.path = "/post"; - req.use_ssl = use_ssl; + req.handshaker = use_ssl ? &grpc_httpcli_ssl : &grpc_httpcli_plaintext; grpc_httpcli_post(&g_context, &g_pollset, &req, "hello", 5, n_seconds_time(15), on_finish, (void *)42); diff --git a/test/core/security/auth_context_test.c b/test/core/security/auth_context_test.c index a30505a63b..d785eb6064 100644 --- a/test/core/security/auth_context_test.c +++ b/test/core/security/auth_context_test.c @@ -40,7 +40,7 @@ #include <grpc/support/log.h> static void test_empty_context(void) { - grpc_auth_context *ctx = grpc_auth_context_create(NULL, 0); + grpc_auth_context *ctx = grpc_auth_context_create(NULL); grpc_auth_property_iterator it; gpr_log(GPR_INFO, "test_empty_context"); @@ -52,87 +52,98 @@ static void test_empty_context(void) { GPR_ASSERT(grpc_auth_property_iterator_next(&it) == NULL); it = grpc_auth_context_find_properties_by_name(ctx, "foo"); GPR_ASSERT(grpc_auth_property_iterator_next(&it) == NULL); + GPR_ASSERT(grpc_auth_context_set_peer_identity_property_name(ctx, "bar") == + 0); + GPR_ASSERT(grpc_auth_context_peer_identity_property_name(ctx) == NULL); GRPC_AUTH_CONTEXT_UNREF(ctx, "test"); } static void test_simple_context(void) { - grpc_auth_context *ctx = grpc_auth_context_create(NULL, 3); + grpc_auth_context *ctx = grpc_auth_context_create(NULL); grpc_auth_property_iterator it; size_t i; gpr_log(GPR_INFO, "test_simple_context"); GPR_ASSERT(ctx != NULL); - GPR_ASSERT(ctx->property_count == 3); - ctx->properties[0] = grpc_auth_property_init_from_cstring("name", "chapi"); - ctx->properties[1] = grpc_auth_property_init_from_cstring("name", "chapo"); - ctx->properties[2] = grpc_auth_property_init_from_cstring("foo", "bar"); - ctx->peer_identity_property_name = ctx->properties[0].name; + grpc_auth_context_add_cstring_property(ctx, "name", "chapi"); + grpc_auth_context_add_cstring_property(ctx, "name", "chapo"); + grpc_auth_context_add_cstring_property(ctx, "foo", "bar"); + GPR_ASSERT(ctx->properties.count == 3); + GPR_ASSERT(grpc_auth_context_set_peer_identity_property_name(ctx, "name") == + 1); GPR_ASSERT( strcmp(grpc_auth_context_peer_identity_property_name(ctx), "name") == 0); it = grpc_auth_context_property_iterator(ctx); - for (i = 0; i < ctx->property_count; i++) { + for (i = 0; i < ctx->properties.count; i++) { const grpc_auth_property *p = grpc_auth_property_iterator_next(&it); - GPR_ASSERT(p == &ctx->properties[i]); + GPR_ASSERT(p == &ctx->properties.array[i]); } GPR_ASSERT(grpc_auth_property_iterator_next(&it) == NULL); it = grpc_auth_context_find_properties_by_name(ctx, "foo"); - GPR_ASSERT(grpc_auth_property_iterator_next(&it) == &ctx->properties[2]); + GPR_ASSERT(grpc_auth_property_iterator_next(&it) == + &ctx->properties.array[2]); GPR_ASSERT(grpc_auth_property_iterator_next(&it) == NULL); it = grpc_auth_context_peer_identity(ctx); - GPR_ASSERT(grpc_auth_property_iterator_next(&it) == &ctx->properties[0]); - GPR_ASSERT(grpc_auth_property_iterator_next(&it) == &ctx->properties[1]); + GPR_ASSERT(grpc_auth_property_iterator_next(&it) == + &ctx->properties.array[0]); + GPR_ASSERT(grpc_auth_property_iterator_next(&it) == + &ctx->properties.array[1]); GPR_ASSERT(grpc_auth_property_iterator_next(&it) == NULL); GRPC_AUTH_CONTEXT_UNREF(ctx, "test"); } static void test_chained_context(void) { - grpc_auth_context *chained = grpc_auth_context_create(NULL, 2); - grpc_auth_context *ctx = grpc_auth_context_create(chained, 3); + grpc_auth_context *chained = grpc_auth_context_create(NULL); + grpc_auth_context *ctx = grpc_auth_context_create(chained); grpc_auth_property_iterator it; size_t i; gpr_log(GPR_INFO, "test_chained_context"); GRPC_AUTH_CONTEXT_UNREF(chained, "chained"); - chained->properties[0] = - grpc_auth_property_init_from_cstring("name", "padapo"); - chained->properties[1] = grpc_auth_property_init_from_cstring("foo", "baz"); - ctx->properties[0] = grpc_auth_property_init_from_cstring("name", "chapi"); - ctx->properties[1] = grpc_auth_property_init_from_cstring("name", "chapo"); - ctx->properties[2] = grpc_auth_property_init_from_cstring("foo", "bar"); - ctx->peer_identity_property_name = ctx->properties[0].name; + grpc_auth_context_add_cstring_property(chained, "name", "padapo"); + grpc_auth_context_add_cstring_property(chained, "foo", "baz"); + grpc_auth_context_add_cstring_property(ctx, "name", "chapi"); + grpc_auth_context_add_cstring_property(ctx, "name", "chap0"); + grpc_auth_context_add_cstring_property(ctx, "foo", "bar"); + GPR_ASSERT(grpc_auth_context_set_peer_identity_property_name(ctx, "name") == + 1); GPR_ASSERT( strcmp(grpc_auth_context_peer_identity_property_name(ctx), "name") == 0); it = grpc_auth_context_property_iterator(ctx); - for (i = 0; i < ctx->property_count; i++) { + for (i = 0; i < ctx->properties.count; i++) { const grpc_auth_property *p = grpc_auth_property_iterator_next(&it); - GPR_ASSERT(p == &ctx->properties[i]); + GPR_ASSERT(p == &ctx->properties.array[i]); } - for (i = 0; i < chained->property_count; i++) { + for (i = 0; i < chained->properties.count; i++) { const grpc_auth_property *p = grpc_auth_property_iterator_next(&it); - GPR_ASSERT(p == &chained->properties[i]); + GPR_ASSERT(p == &chained->properties.array[i]); } GPR_ASSERT(grpc_auth_property_iterator_next(&it) == NULL); it = grpc_auth_context_find_properties_by_name(ctx, "foo"); - GPR_ASSERT(grpc_auth_property_iterator_next(&it) == &ctx->properties[2]); - GPR_ASSERT(grpc_auth_property_iterator_next(&it) == &chained->properties[1]); + GPR_ASSERT(grpc_auth_property_iterator_next(&it) == + &ctx->properties.array[2]); + GPR_ASSERT(grpc_auth_property_iterator_next(&it) == + &chained->properties.array[1]); GPR_ASSERT(grpc_auth_property_iterator_next(&it) == NULL); it = grpc_auth_context_peer_identity(ctx); - GPR_ASSERT(grpc_auth_property_iterator_next(&it) == &ctx->properties[0]); - GPR_ASSERT(grpc_auth_property_iterator_next(&it) == &ctx->properties[1]); - GPR_ASSERT(grpc_auth_property_iterator_next(&it) == &chained->properties[0]); + GPR_ASSERT(grpc_auth_property_iterator_next(&it) == + &ctx->properties.array[0]); + GPR_ASSERT(grpc_auth_property_iterator_next(&it) == + &ctx->properties.array[1]); + GPR_ASSERT(grpc_auth_property_iterator_next(&it) == + &chained->properties.array[0]); GPR_ASSERT(grpc_auth_property_iterator_next(&it) == NULL); GRPC_AUTH_CONTEXT_UNREF(ctx, "test"); } - int main(int argc, char **argv) { grpc_test_init(argc, argv); test_empty_context(); diff --git a/test/core/security/credentials_test.c b/test/core/security/credentials_test.c index dd6e0d7bb3..e4a8144eaf 100644 --- a/test/core/security/credentials_test.c +++ b/test/core/security/credentials_test.c @@ -373,8 +373,8 @@ static void test_ssl_oauth2_composite_creds(void) { grpc_credentials *ssl_creds = grpc_ssl_credentials_create(test_root_cert, NULL); const grpc_credentials_array *creds_array; - grpc_credentials *oauth2_creds = - grpc_fake_oauth2_credentials_create(test_oauth2_bearer_token, 0); + grpc_credentials *oauth2_creds = grpc_md_only_test_credentials_create( + "Authorization", test_oauth2_bearer_token, 0); grpc_credentials *composite_creds = grpc_composite_credentials_create(ssl_creds, oauth2_creds); grpc_credentials_unref(ssl_creds); @@ -424,8 +424,8 @@ static void test_ssl_oauth2_iam_composite_creds(void) { grpc_credentials *ssl_creds = grpc_ssl_credentials_create(test_root_cert, NULL); const grpc_credentials_array *creds_array; - grpc_credentials *oauth2_creds = - grpc_fake_oauth2_credentials_create(test_oauth2_bearer_token, 0); + grpc_credentials *oauth2_creds = grpc_md_only_test_credentials_create( + "Authorization", test_oauth2_bearer_token, 0); grpc_credentials *aux_creds = grpc_composite_credentials_create(ssl_creds, oauth2_creds); grpc_credentials *iam_creds = grpc_iam_credentials_create( @@ -477,7 +477,7 @@ static void on_oauth2_creds_get_metadata_failure( static void validate_compute_engine_http_request( const grpc_httpcli_request *request) { - GPR_ASSERT(!request->use_ssl); + GPR_ASSERT(request->handshaker != &grpc_httpcli_ssl); GPR_ASSERT(strcmp(request->host, "metadata") == 0); GPR_ASSERT( strcmp(request->path, @@ -573,7 +573,7 @@ static void validate_refresh_token_http_request( GPR_ASSERT(strlen(expected_body) == body_size); GPR_ASSERT(memcmp(expected_body, body, body_size) == 0); gpr_free(expected_body); - GPR_ASSERT(request->use_ssl); + GPR_ASSERT(request->handshaker == &grpc_httpcli_ssl); GPR_ASSERT(strcmp(request->host, GRPC_GOOGLE_OAUTH2_SERVICE_HOST) == 0); GPR_ASSERT(strcmp(request->path, GRPC_GOOGLE_OAUTH2_SERVICE_TOKEN_PATH) == 0); GPR_ASSERT(request->hdr_count == 1); @@ -697,7 +697,7 @@ static void validate_service_account_http_request( GPR_ASSERT(strlen(expected_body) == body_size); GPR_ASSERT(memcmp(expected_body, body, body_size) == 0); gpr_free(expected_body); - GPR_ASSERT(request->use_ssl); + GPR_ASSERT(request->handshaker == &grpc_httpcli_ssl); GPR_ASSERT(strcmp(request->host, GRPC_GOOGLE_OAUTH2_SERVICE_HOST) == 0); GPR_ASSERT(strcmp(request->path, GRPC_GOOGLE_OAUTH2_SERVICE_TOKEN_PATH) == 0); GPR_ASSERT(request->hdr_count == 1); diff --git a/test/core/security/jwt_verifier_test.c b/test/core/security/jwt_verifier_test.c index 98db56c0ef..440286ea1a 100644 --- a/test/core/security/jwt_verifier_test.c +++ b/test/core/security/jwt_verifier_test.c @@ -286,7 +286,7 @@ static int httpcli_get_google_keys_for_email( const grpc_httpcli_request *request, gpr_timespec deadline, grpc_httpcli_response_cb on_response, void *user_data) { grpc_httpcli_response response = http_response(200, good_google_email_keys()); - GPR_ASSERT(request->use_ssl); + GPR_ASSERT(request->handshaker == &grpc_httpcli_ssl); GPR_ASSERT(strcmp(request->host, "www.googleapis.com") == 0); GPR_ASSERT(strcmp(request->path, "/robot/v1/metadata/x509/" @@ -331,7 +331,7 @@ static int httpcli_get_custom_keys_for_email( const grpc_httpcli_request *request, gpr_timespec deadline, grpc_httpcli_response_cb on_response, void *user_data) { grpc_httpcli_response response = http_response(200, gpr_strdup(good_jwk_set)); - GPR_ASSERT(request->use_ssl); + GPR_ASSERT(request->handshaker == &grpc_httpcli_ssl); GPR_ASSERT(strcmp(request->host, "keys.bar.com") == 0); GPR_ASSERT(strcmp(request->path, "/jwk/foo@bar.com") == 0); on_response(user_data, &response); @@ -363,7 +363,7 @@ static int httpcli_get_jwk_set( const grpc_httpcli_request *request, gpr_timespec deadline, grpc_httpcli_response_cb on_response, void *user_data) { grpc_httpcli_response response = http_response(200, gpr_strdup(good_jwk_set)); - GPR_ASSERT(request->use_ssl); + GPR_ASSERT(request->handshaker == &grpc_httpcli_ssl); GPR_ASSERT(strcmp(request->host, "www.googleapis.com") == 0); GPR_ASSERT(strcmp(request->path, "/oauth2/v3/certs") == 0); on_response(user_data, &response); @@ -377,7 +377,7 @@ static int httpcli_get_openid_config(const grpc_httpcli_request *request, void *user_data) { grpc_httpcli_response response = http_response(200, gpr_strdup(good_openid_config)); - GPR_ASSERT(request->use_ssl); + GPR_ASSERT(request->handshaker == &grpc_httpcli_ssl); GPR_ASSERT(strcmp(request->host, "accounts.google.com") == 0); GPR_ASSERT(strcmp(request->path, GRPC_OPENID_CONFIG_URL_SUFFIX) == 0); grpc_httpcli_set_override(httpcli_get_jwk_set, @@ -421,7 +421,7 @@ static int httpcli_get_bad_json(const grpc_httpcli_request *request, void *user_data) { grpc_httpcli_response response = http_response(200, gpr_strdup("{\"bad\": \"stuff\"}")); - GPR_ASSERT(request->use_ssl); + GPR_ASSERT(request->handshaker == &grpc_httpcli_ssl); on_response(user_data, &response); gpr_free(response.body); return 1; diff --git a/test/core/security/verify_jwt.c b/test/core/security/verify_jwt.c index 9b334b3c3e..69bbc3cc0c 100644 --- a/test/core/security/verify_jwt.c +++ b/test/core/security/verify_jwt.c @@ -112,7 +112,7 @@ int main(int argc, char **argv) { while (!sync.is_done) { grpc_pollset_worker worker; grpc_pollset_work(&sync.pollset, &worker, - gpr_inf_future(GPR_CLOCK_REALTIME)); + gpr_inf_future(GPR_CLOCK_MONOTONIC)); } gpr_mu_unlock(GRPC_POLLSET_MU(&sync.pollset)); diff --git a/test/core/util/port_posix.c b/test/core/util/port_posix.c index b07df391f9..9bff18d311 100644 --- a/test/core/util/port_posix.c +++ b/test/core/util/port_posix.c @@ -44,9 +44,13 @@ #include <string.h> #include <unistd.h> +#include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include "src/core/httpcli/httpcli.h" +#include "src/core/support/env.h" + #define NUM_RANDOM_PORTS_TO_PICK 100 static int *chosen_ports = NULL; @@ -126,6 +130,67 @@ static int is_port_available(int *port, int is_tcp) { return 1; } +typedef struct portreq { + grpc_pollset pollset; + int port; +} portreq; + +static void got_port_from_server(void *arg, + const grpc_httpcli_response *response) { + size_t i; + int port = 0; + portreq *pr = arg; + GPR_ASSERT(response); + GPR_ASSERT(response->status == 200); + for (i = 0; i < response->body_length; i++) { + GPR_ASSERT(response->body[i] >= '0' && response->body[i] <= '9'); + port = port * 10 + response->body[i] - '0'; + } + GPR_ASSERT(port > 1024); + gpr_mu_lock(GRPC_POLLSET_MU(&pr->pollset)); + pr->port = port; + grpc_pollset_kick(&pr->pollset, NULL); + gpr_mu_unlock(GRPC_POLLSET_MU(&pr->pollset)); +} + +static void destroy_pollset_and_shutdown(void *p) { + grpc_pollset_destroy(p); + grpc_shutdown(); +} + +static int pick_port_using_server(char *server) { + grpc_httpcli_context context; + grpc_httpcli_request req; + portreq pr; + + grpc_init(); + + memset(&pr, 0, sizeof(pr)); + memset(&req, 0, sizeof(req)); + grpc_pollset_init(&pr.pollset); + pr.port = -1; + + req.host = server; + req.path = "/get"; + + grpc_httpcli_context_init(&context); + grpc_httpcli_get(&context, &pr.pollset, &req, + GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10), got_port_from_server, + &pr); + gpr_mu_lock(GRPC_POLLSET_MU(&pr.pollset)); + while (pr.port == -1) { + grpc_pollset_worker worker; + grpc_pollset_work(&pr.pollset, &worker, + GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1)); + } + gpr_mu_unlock(GRPC_POLLSET_MU(&pr.pollset)); + + grpc_httpcli_context_destroy(&context); + grpc_pollset_shutdown(&pr.pollset, destroy_pollset_and_shutdown, &pr.pollset); + + return pr.port; +} + int grpc_pick_unused_port(void) { /* We repeatedly pick a port and then see whether or not it is available for use both as a TCP socket and a UDP socket. First, we @@ -143,6 +208,15 @@ int grpc_pick_unused_port(void) { int is_tcp = 1; int try = 0; + char *env = gpr_getenv("GRPC_TEST_PORT_SERVER"); + if (env) { + int port = pick_port_using_server(env); + gpr_free(env); + if (port != 0) { + return port; + } + } + for (;;) { int port; try++; diff --git a/test/core/util/test_config.c b/test/core/util/test_config.c index 225658f5e2..cadf88a7c6 100644 --- a/test/core/util/test_config.c +++ b/test/core/util/test_config.c @@ -38,6 +38,8 @@ #include <stdlib.h> #include <signal.h> +double g_fixture_slowdown_factor = 1.0; + #if GPR_GETPID_IN_UNISTD_H #include <unistd.h> static int seed(void) { return getpid(); } diff --git a/test/core/util/test_config.h b/test/core/util/test_config.h index 7028ade7b2..b2cc40bb47 100644 --- a/test/core/util/test_config.h +++ b/test/core/util/test_config.h @@ -48,8 +48,11 @@ extern "C" { #define GRPC_TEST_SLOWDOWN_MACHINE_FACTOR 1.0 #endif -#define GRPC_TEST_SLOWDOWN_FACTOR \ - (GRPC_TEST_SLOWDOWN_BUILD_FACTOR * GRPC_TEST_SLOWDOWN_MACHINE_FACTOR) +extern double g_fixture_slowdown_factor; + +#define GRPC_TEST_SLOWDOWN_FACTOR \ + (GRPC_TEST_SLOWDOWN_BUILD_FACTOR * GRPC_TEST_SLOWDOWN_MACHINE_FACTOR * \ + g_fixture_slowdown_factor) #define GRPC_TIMEOUT_SECONDS_TO_DEADLINE(x) \ gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), \ diff --git a/test/cpp/common/auth_property_iterator_test.cc b/test/cpp/common/auth_property_iterator_test.cc index 3d983fa310..74b18ced0d 100644 --- a/test/cpp/common/auth_property_iterator_test.cc +++ b/test/cpp/common/auth_property_iterator_test.cc @@ -31,10 +31,14 @@ * */ +#include <grpc/grpc_security.h> #include <grpc++/auth_context.h> #include <gtest/gtest.h> #include "src/cpp/common/secure_auth_context.h" + +extern "C" { #include "src/core/security/security_context.h" +} namespace grpc { namespace { @@ -50,14 +54,15 @@ class TestAuthPropertyIterator : public AuthPropertyIterator { class AuthPropertyIteratorTest : public ::testing::Test { protected: void SetUp() GRPC_OVERRIDE { - ctx_ = grpc_auth_context_create(NULL, 3); - ctx_->properties[0] = grpc_auth_property_init_from_cstring("name", "chapi"); - ctx_->properties[1] = grpc_auth_property_init_from_cstring("name", "chapo"); - ctx_->properties[2] = grpc_auth_property_init_from_cstring("foo", "bar"); - ctx_->peer_identity_property_name = ctx_->properties[0].name; + ctx_ = grpc_auth_context_create(NULL); + grpc_auth_context_add_cstring_property(ctx_, "name", "chapi"); + grpc_auth_context_add_cstring_property(ctx_, "name", "chapo"); + grpc_auth_context_add_cstring_property(ctx_, "foo", "bar"); + EXPECT_EQ(1, + grpc_auth_context_set_peer_identity_property_name(ctx_, "name")); } void TearDown() GRPC_OVERRIDE { - GRPC_AUTH_CONTEXT_UNREF(ctx_, "AuthPropertyIteratorTest"); + grpc_auth_context_release(ctx_); } grpc_auth_context* ctx_; diff --git a/test/cpp/common/secure_auth_context_test.cc b/test/cpp/common/secure_auth_context_test.cc index d0243a5432..075d4ce8c9 100644 --- a/test/cpp/common/secure_auth_context_test.cc +++ b/test/cpp/common/secure_auth_context_test.cc @@ -31,10 +31,14 @@ * */ +#include <grpc/grpc_security.h> #include <grpc++/auth_context.h> #include <gtest/gtest.h> #include "src/cpp/common/secure_auth_context.h" + +extern "C" { #include "src/core/security/security_context.h" +} namespace grpc { namespace { @@ -52,11 +56,11 @@ TEST_F(SecureAuthContextTest, EmptyContext) { } TEST_F(SecureAuthContextTest, Properties) { - grpc_auth_context* ctx = grpc_auth_context_create(NULL, 3); - ctx->properties[0] = grpc_auth_property_init_from_cstring("name", "chapi"); - ctx->properties[1] = grpc_auth_property_init_from_cstring("name", "chapo"); - ctx->properties[2] = grpc_auth_property_init_from_cstring("foo", "bar"); - ctx->peer_identity_property_name = ctx->properties[0].name; + grpc_auth_context* ctx = grpc_auth_context_create(NULL); + grpc_auth_context_add_cstring_property(ctx, "name", "chapi"); + grpc_auth_context_add_cstring_property(ctx, "name", "chapo"); + grpc_auth_context_add_cstring_property(ctx, "foo", "bar"); + EXPECT_EQ(1, grpc_auth_context_set_peer_identity_property_name(ctx, "name")); SecureAuthContext context(ctx); std::vector<grpc::string> peer_identity = context.GetPeerIdentity(); @@ -70,11 +74,11 @@ TEST_F(SecureAuthContextTest, Properties) { } TEST_F(SecureAuthContextTest, Iterators) { - grpc_auth_context* ctx = grpc_auth_context_create(NULL, 3); - ctx->properties[0] = grpc_auth_property_init_from_cstring("name", "chapi"); - ctx->properties[1] = grpc_auth_property_init_from_cstring("name", "chapo"); - ctx->properties[2] = grpc_auth_property_init_from_cstring("foo", "bar"); - ctx->peer_identity_property_name = ctx->properties[0].name; + grpc_auth_context* ctx = grpc_auth_context_create(NULL); + grpc_auth_context_add_cstring_property(ctx, "name", "chapi"); + grpc_auth_context_add_cstring_property(ctx, "name", "chapo"); + grpc_auth_context_add_cstring_property(ctx, "foo", "bar"); + EXPECT_EQ(1, grpc_auth_context_set_peer_identity_property_name(ctx, "name")); SecureAuthContext context(ctx); AuthPropertyIterator iter = context.begin(); diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 24d417d9e6..37669815c6 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -117,7 +117,7 @@ class Proxy : public ::grpc::cpp::test::util::TestService::Service { } private: - std::unique_ptr<::grpc::cpp::test::util::TestService::Stub> stub_; + std::unique_ptr< ::grpc::cpp::test::util::TestService::Stub> stub_; }; class TestServiceImpl : public ::grpc::cpp::test::util::TestService::Service { @@ -869,7 +869,8 @@ TEST_P(End2endTest, HugeResponse) { } namespace { -void ReaderThreadFunc(ClientReaderWriter<EchoRequest, EchoResponse>* stream, gpr_event *ev) { +void ReaderThreadFunc(ClientReaderWriter<EchoRequest, EchoResponse>* stream, + gpr_event *ev) { EchoResponse resp; gpr_event_set(ev, (void*)1); while (stream->Read(&resp)) { @@ -908,6 +909,27 @@ TEST_P(End2endTest, Peer) { EXPECT_TRUE(CheckIsLocalhost(context.peer())); } +TEST_F(End2endTest, ChannelState) { + ResetStub(false); + // Start IDLE + EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false)); + + // Did not ask to connect, no state change. + CompletionQueue cq; + std::chrono::system_clock::time_point deadline = + std::chrono::system_clock::now() + std::chrono::milliseconds(10); + channel_->NotifyOnStateChange(GRPC_CHANNEL_IDLE, deadline, &cq, NULL); + void* tag; + bool ok = true; + cq.Next(&tag, &ok); + EXPECT_FALSE(ok); + + EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(true)); + EXPECT_TRUE(channel_->WaitForStateChange( + GRPC_CHANNEL_IDLE, gpr_inf_future(GPR_CLOCK_REALTIME))); + EXPECT_EQ(GRPC_CHANNEL_CONNECTING, channel_->GetState(false)); +} + INSTANTIATE_TEST_CASE_P(End2end, End2endTest, ::testing::Values(false, true)); } // namespace testing diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 28cd32a197..1c4f46328f 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -41,6 +41,7 @@ #include <condition_variable> #include <mutex> +#include <grpc++/config.h> namespace grpc { @@ -67,10 +68,12 @@ typedef std::chrono::time_point<grpc_time_source> grpc_time; class Client { public: explicit Client(const ClientConfig& config) - : timer_(new Timer), interarrival_timer_() { + : channels_(config.client_channels()), + timer_(new Timer), + interarrival_timer_() { for (int i = 0; i < config.client_channels(); i++) { - channels_.push_back(ClientChannelInfo( - config.server_targets(i % config.server_targets_size()), config)); + channels_[i].init(config.server_targets(i % config.server_targets_size()), + config); } request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE); request_.set_response_size(config.payload_size()); @@ -79,7 +82,8 @@ class Client { ClientStats Mark() { Histogram latencies; - std::vector<Histogram> to_merge(threads_.size()); + // avoid std::vector for old compilers that expect a copy constructor + Histogram* to_merge = new Histogram[threads_.size()]; for (size_t i = 0; i < threads_.size(); i++) { threads_[i]->BeginSwap(&to_merge[i]); } @@ -89,6 +93,7 @@ class Client { threads_[i]->EndSwap(); latencies.Merge(&to_merge[i]); } + delete[] to_merge; auto timer_result = timer->Mark(); @@ -106,9 +111,20 @@ class Client { class ClientChannelInfo { public: - ClientChannelInfo(const grpc::string& target, const ClientConfig& config) - : channel_(CreateTestChannel(target, config.enable_ssl())), - stub_(TestService::NewStub(channel_)) {} + ClientChannelInfo() {} + ClientChannelInfo(const ClientChannelInfo& i) { + // The copy constructor is to satisfy old compilers + // that need it for using std::vector . It is only ever + // used for empty entries + GPR_ASSERT(!i.channel_ && !i.stub_); + } + void init(const grpc::string& target, const ClientConfig& config) { + // We have to use a 2-phase init like this with a default + // constructor followed by an initializer function to make + // old compilers happy with using this in std::vector + channel_ = CreateTestChannel(target, config.enable_ssl()); + stub_ = TestService::NewStub(channel_); + } ChannelInterface* get_channel() { return channel_.get(); } TestService::Stub* get_stub() { return stub_.get(); } @@ -189,27 +205,9 @@ class Client { Thread(Client* client, size_t idx) : done_(false), new_(nullptr), - impl_([this, idx, client]() { - for (;;) { - // run the loop body - bool thread_still_ok = client->ThreadFunc(&histogram_, idx); - // lock, see if we're done - std::lock_guard<std::mutex> g(mu_); - if (!thread_still_ok) { - gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); - done_ = true; - } - if (done_) { - return; - } - // check if we're marking, swap out the histogram if so - if (new_) { - new_->Swap(&histogram_); - new_ = nullptr; - cv_.notify_one(); - } - } - }) {} + client_(client), + idx_(idx), + impl_(&Thread::ThreadFunc, this) {} ~Thread() { { @@ -226,13 +224,37 @@ class Client { void EndSwap() { std::unique_lock<std::mutex> g(mu_); - cv_.wait(g, [this]() { return new_ == nullptr; }); + while (new_ != nullptr) { + cv_.wait(g); + }; } private: Thread(const Thread&); Thread& operator=(const Thread&); + void ThreadFunc() { + for (;;) { + // run the loop body + const bool thread_still_ok = client_->ThreadFunc(&histogram_, idx_); + // lock, see if we're done + std::lock_guard<std::mutex> g(mu_); + if (!thread_still_ok) { + gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); + done_ = true; + } + if (done_) { + return; + } + // check if we're marking, swap out the histogram if so + if (new_) { + new_->Swap(&histogram_); + new_ = nullptr; + cv_.notify_one(); + } + } + } + TestService::Stub* stub_; ClientConfig config_; std::mutex mu_; @@ -240,6 +262,8 @@ class Client { bool done_; Histogram* new_; Histogram histogram_; + Client* client_; + size_t idx_; std::thread impl_; }; diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index e1e44f9ac0..a337610cbf 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -156,7 +156,7 @@ class AsyncClient : public Client { std::function<ClientRpcContext*(int, TestService::Stub*, const SimpleRequest&)> setup_ctx) : Client(config), - channel_lock_(config.client_channels()), + channel_lock_(new std::mutex[config.client_channels()]), contexts_(config.client_channels()), max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()), channel_count_(config.client_channels()), @@ -208,6 +208,7 @@ class AsyncClient : public Client { delete ctx; } } + delete[] channel_lock_; } bool ThreadFunc(Histogram* histogram, @@ -316,23 +317,28 @@ class AsyncClient : public Client { } private: - class boolean { // exists only to avoid data-race on vector<bool> + class boolean { // exists only to avoid data-race on vector<bool> public: - boolean(): val_(false) {} - boolean(bool b): val_(b) {} - operator bool() const {return val_;} - boolean& operator=(bool b) {val_=b; return *this;} + boolean() : val_(false) {} + boolean(bool b) : val_(b) {} + operator bool() const { return val_; } + boolean& operator=(bool b) { + val_ = b; + return *this; + } + private: bool val_; }; std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_; std::vector<deadline_list> rpc_deadlines_; // per thread deadlines - std::vector<int> next_channel_; // per thread round-robin channel ctr - std::vector<boolean> issue_allowed_; // may this thread attempt to issue - std::vector<grpc_time> next_issue_; // when should it issue? + std::vector<int> next_channel_; // per thread round-robin channel ctr + std::vector<boolean> issue_allowed_; // may this thread attempt to issue + std::vector<grpc_time> next_issue_; // when should it issue? - std::vector<std::mutex> channel_lock_; + std::mutex* + channel_lock_; // a vector, but avoid std::vector for old compilers std::vector<context_list> contexts_; // per-channel list of idle contexts int max_outstanding_per_channel_; int channel_count_; @@ -348,15 +354,17 @@ class AsyncUnaryClient GRPC_FINAL : public AsyncClient { ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); } private: + static void CheckDone(grpc::Status s, SimpleResponse* response) {} + static std::unique_ptr<grpc::ClientAsyncResponseReader<SimpleResponse>> + StartReq(TestService::Stub* stub, grpc::ClientContext* ctx, + const SimpleRequest& request, CompletionQueue* cq) { + return stub->AsyncUnaryCall(ctx, request, cq); + }; static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub, const SimpleRequest& req) { - auto check_done = [](grpc::Status s, SimpleResponse* response) {}; - auto start_req = [](TestService::Stub* stub, grpc::ClientContext* ctx, - const SimpleRequest& request, CompletionQueue* cq) { - return stub->AsyncUnaryCall(ctx, request, cq); - }; return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( - channel_id, stub, req, start_req, check_done); + channel_id, stub, req, AsyncUnaryClient::StartReq, + AsyncUnaryClient::CheckDone); } }; @@ -442,16 +450,19 @@ class AsyncStreamingClient GRPC_FINAL : public AsyncClient { ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); } private: + static void CheckDone(grpc::Status s, SimpleResponse* response) {} + static std::unique_ptr< + grpc::ClientAsyncReaderWriter<SimpleRequest, SimpleResponse>> + StartReq(TestService::Stub* stub, grpc::ClientContext* ctx, + CompletionQueue* cq, void* tag) { + auto stream = stub->AsyncStreamingCall(ctx, cq, tag); + return stream; + }; static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub, const SimpleRequest& req) { - auto check_done = [](grpc::Status s, SimpleResponse* response) {}; - auto start_req = [](TestService::Stub* stub, grpc::ClientContext* ctx, - CompletionQueue* cq, void* tag) { - auto stream = stub->AsyncStreamingCall(ctx, cq, tag); - return stream; - }; return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>( - channel_id, stub, req, start_req, check_done); + channel_id, stub, req, AsyncStreamingClient::StartReq, + AsyncStreamingClient::CheckDone); } }; diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 718698bfe1..db5416a707 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -45,8 +45,9 @@ #include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/histogram.h> -#include <grpc/support/log.h> #include <grpc/support/host_port.h> +#include <grpc/support/log.h> +#include <grpc/support/time.h> #include <gflags/gflags.h> #include <grpc++/client_context.h> #include <grpc++/server.h> @@ -79,7 +80,9 @@ class SynchronousClient : public Client { void WaitToIssue(int thread_idx) { grpc_time next_time; if (NextIssueTime(thread_idx, &next_time)) { - std::this_thread::sleep_until(next_time); + gpr_timespec next_timespec; + TimepointHR2Timespec(next_time, &next_timespec); + gpr_sleep_until(next_timespec); } } @@ -110,9 +113,10 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { public: SynchronousStreamingClient(const ClientConfig& config) - : SynchronousClient(config), - context_(num_threads_), - stream_(num_threads_) { + : SynchronousClient(config) { + context_ = new grpc::ClientContext[num_threads_]; + stream_ = new std::unique_ptr< + grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>>[num_threads_]; for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); @@ -121,12 +125,15 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { } ~SynchronousStreamingClient() { EndThreads(); - for (auto stream = stream_.begin(); stream != stream_.end(); stream++) { + for (auto stream = &stream_[0]; stream != &stream_[num_threads_]; + stream++) { if (*stream) { (*stream)->WritesDone(); EXPECT_TRUE((*stream)->Finish().ok()); } } + delete[] stream_; + delete[] context_; } bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { @@ -141,9 +148,11 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { } private: - std::vector<grpc::ClientContext> context_; - std::vector<std::unique_ptr< - grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>>> stream_; + // These are both conceptually std::vector but cannot be for old compilers + // that expect contained classes to support copy constructors + grpc::ClientContext* context_; + std::unique_ptr<grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>>* + stream_; }; std::unique_ptr<Client> CreateSynchronousUnaryClient( diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index a0360295e0..78e3720938 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -77,16 +77,34 @@ static deque<string> get_hosts(const string& name) { } } +// Namespace for classes and functions used only in RunScenario +// Using this rather than local definitions to workaround gcc-4.4 limitations +// regarding using templates without linkage +namespace runsc { + +// ClientContext allocator +static ClientContext* AllocContext(list<ClientContext>* contexts) { + contexts->emplace_back(); + return &contexts->back(); +} + +struct ServerData { + unique_ptr<Worker::Stub> stub; + unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream; +}; + +struct ClientData { + unique_ptr<Worker::Stub> stub; + unique_ptr<ClientReaderWriter<ClientArgs, ClientStatus>> stream; +}; +} // namespace runsc + std::unique_ptr<ScenarioResult> RunScenario( const ClientConfig& initial_client_config, size_t num_clients, const ServerConfig& server_config, size_t num_servers, int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count) { - // ClientContext allocator (all are destroyed at scope exit) + // ClientContext allocations (all are destroyed at scope exit) list<ClientContext> contexts; - auto alloc_context = [&contexts]() { - contexts.emplace_back(); - return &contexts.back(); - }; // To be added to the result, containing the final configuration used for // client and config (incluiding host, etc.) @@ -131,23 +149,22 @@ std::unique_ptr<ScenarioResult> RunScenario( workers.resize(num_clients + num_servers); // Start servers - struct ServerData { - unique_ptr<Worker::Stub> stub; - unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream; - }; - vector<ServerData> servers; + using runsc::ServerData; + // servers is array rather than std::vector to avoid gcc-4.4 issues + // where class contained in std::vector must have a copy constructor + auto* servers = new ServerData[num_servers]; for (size_t i = 0; i < num_servers; i++) { - ServerData sd; - sd.stub = std::move(Worker::NewStub( + servers[i].stub = std::move(Worker::NewStub( CreateChannel(workers[i], InsecureCredentials(), ChannelArguments()))); ServerArgs args; result_server_config = server_config; result_server_config.set_host(workers[i]); *args.mutable_setup() = server_config; - sd.stream = std::move(sd.stub->RunServer(alloc_context())); - GPR_ASSERT(sd.stream->Write(args)); + servers[i].stream = + std::move(servers[i].stub->RunServer(runsc::AllocContext(&contexts))); + GPR_ASSERT(servers[i].stream->Write(args)); ServerStatus init_status; - GPR_ASSERT(sd.stream->Read(&init_status)); + GPR_ASSERT(servers[i].stream->Read(&init_status)); char* host; char* driver_port; char* cli_target; @@ -157,30 +174,25 @@ std::unique_ptr<ScenarioResult> RunScenario( gpr_free(host); gpr_free(driver_port); gpr_free(cli_target); - - servers.push_back(std::move(sd)); } // Start clients - struct ClientData { - unique_ptr<Worker::Stub> stub; - unique_ptr<ClientReaderWriter<ClientArgs, ClientStatus>> stream; - }; - vector<ClientData> clients; + using runsc::ClientData; + // clients is array rather than std::vector to avoid gcc-4.4 issues + // where class contained in std::vector must have a copy constructor + auto* clients = new ClientData[num_clients]; for (size_t i = 0; i < num_clients; i++) { - ClientData cd; - cd.stub = std::move(Worker::NewStub(CreateChannel( + clients[i].stub = std::move(Worker::NewStub(CreateChannel( workers[i + num_servers], InsecureCredentials(), ChannelArguments()))); ClientArgs args; result_client_config = client_config; result_client_config.set_host(workers[i + num_servers]); *args.mutable_setup() = client_config; - cd.stream = std::move(cd.stub->RunTest(alloc_context())); - GPR_ASSERT(cd.stream->Write(args)); + clients[i].stream = + std::move(clients[i].stub->RunTest(runsc::AllocContext(&contexts))); + GPR_ASSERT(clients[i].stream->Write(args)); ClientStatus init_status; - GPR_ASSERT(cd.stream->Read(&init_status)); - - clients.push_back(std::move(cd)); + GPR_ASSERT(clients[i].stream->Read(&init_status)); } // Let everything warmup @@ -195,23 +207,25 @@ std::unique_ptr<ScenarioResult> RunScenario( server_mark.mutable_mark(); ClientArgs client_mark; client_mark.mutable_mark(); - for (auto server = servers.begin(); server != servers.end(); server++) { + for (auto server = &servers[0]; server != &servers[num_servers]; server++) { GPR_ASSERT(server->stream->Write(server_mark)); } - for (auto client = clients.begin(); client != clients.end(); client++) { + for (auto client = &clients[0]; client != &clients[num_clients]; client++) { GPR_ASSERT(client->stream->Write(client_mark)); } ServerStatus server_status; ClientStatus client_status; - for (auto server = servers.begin(); server != servers.end(); server++) { + for (auto server = &servers[0]; server != &servers[num_servers]; server++) { GPR_ASSERT(server->stream->Read(&server_status)); } - for (auto client = clients.begin(); client != clients.end(); client++) { + for (auto client = &clients[0]; client != &clients[num_clients]; client++) { GPR_ASSERT(client->stream->Read(&client_status)); } // Wait some time gpr_log(GPR_INFO, "Running"); + // Use gpr_sleep_until rather than this_thread::sleep_until to support + // compilers that don't work with this_thread gpr_sleep_until(gpr_time_add( start, gpr_time_from_seconds(benchmark_seconds, GPR_TIMESPAN))); @@ -220,34 +234,36 @@ std::unique_ptr<ScenarioResult> RunScenario( result->client_config = result_client_config; result->server_config = result_server_config; gpr_log(GPR_INFO, "Finishing"); - for (auto server = servers.begin(); server != servers.end(); server++) { + for (auto server = &servers[0]; server != &servers[num_servers]; server++) { GPR_ASSERT(server->stream->Write(server_mark)); } - for (auto client = clients.begin(); client != clients.end(); client++) { + for (auto client = &clients[0]; client != &clients[num_clients]; client++) { GPR_ASSERT(client->stream->Write(client_mark)); } - for (auto server = servers.begin(); server != servers.end(); server++) { + for (auto server = &servers[0]; server != &servers[num_servers]; server++) { GPR_ASSERT(server->stream->Read(&server_status)); const auto& stats = server_status.stats(); - result->server_resources.push_back(ResourceUsage{ - stats.time_elapsed(), stats.time_user(), stats.time_system()}); + result->server_resources.emplace_back( + stats.time_elapsed(), stats.time_user(), stats.time_system()); } - for (auto client = clients.begin(); client != clients.end(); client++) { + for (auto client = &clients[0]; client != &clients[num_clients]; client++) { GPR_ASSERT(client->stream->Read(&client_status)); const auto& stats = client_status.stats(); result->latencies.MergeProto(stats.latencies()); - result->client_resources.push_back(ResourceUsage{ - stats.time_elapsed(), stats.time_user(), stats.time_system()}); + result->client_resources.emplace_back( + stats.time_elapsed(), stats.time_user(), stats.time_system()); } - for (auto client = clients.begin(); client != clients.end(); client++) { + for (auto client = &clients[0]; client != &clients[num_clients]; client++) { GPR_ASSERT(client->stream->WritesDone()); GPR_ASSERT(client->stream->Finish().ok()); } - for (auto server = servers.begin(); server != servers.end(); server++) { + for (auto server = &servers[0]; server != &servers[num_servers]; server++) { GPR_ASSERT(server->stream->WritesDone()); GPR_ASSERT(server->stream->Finish().ok()); } + delete[] clients; + delete[] servers; return result; } } // namespace testing diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h index 5e9d4b3cb9..9a29df8d49 100644 --- a/test/cpp/qps/driver.h +++ b/test/cpp/qps/driver.h @@ -41,10 +41,18 @@ namespace grpc { namespace testing { -struct ResourceUsage { - double wall_time; - double user_time; - double system_time; +class ResourceUsage { + public: + ResourceUsage(double w, double u, double s) + : wall_time_(w), user_time_(u), system_time_(s) {} + double wall_time() const { return wall_time_; } + double user_time() const { return user_time_; } + double system_time() const { return system_time_; } + + private: + double wall_time_; + double user_time_; + double system_time_; }; struct ScenarioResult { diff --git a/test/cpp/qps/interarrival.h b/test/cpp/qps/interarrival.h index f90a17a894..04d14f689f 100644 --- a/test/cpp/qps/interarrival.h +++ b/test/cpp/qps/interarrival.h @@ -36,7 +36,8 @@ #include <chrono> #include <cmath> -#include <random> +#include <cstdlib> +#include <vector> #include <grpc++/config.h> @@ -141,17 +142,16 @@ class ParetoDist GRPC_FINAL : public RandomDist { // in an efficient re-entrant way. The random table is built at construction // time, and each call must include the thread id of the invoker -typedef std::default_random_engine qps_random_engine; - class InterarrivalTimer { public: InterarrivalTimer() {} void init(const RandomDist& r, int threads, int entries = 1000000) { - qps_random_engine gen; - std::uniform_real_distribution<double> uniform(0.0, 1.0); for (int i = 0; i < entries; i++) { - random_table_.push_back(std::chrono::nanoseconds( - static_cast<int64_t>(1e9 * r(uniform(gen))))); + // rand is the only choice that is portable across POSIX and Windows + // and that supports new and old compilers + const double uniform_0_1 = rand() / RAND_MAX; + random_table_.push_back( + std::chrono::nanoseconds(static_cast<int64_t>(1e9 * r(uniform_0_1)))); } // Now set up the thread positions for (int i = 0; i < threads; i++) { diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc index d534846365..b1463be8f6 100644 --- a/test/cpp/qps/qps_driver.cc +++ b/test/cpp/qps/qps_driver.cc @@ -33,6 +33,7 @@ #include <memory> #include <set> +#include <signal.h> #include <gflags/gflags.h> #include <grpc/support/log.h> diff --git a/test/cpp/qps/report.cc b/test/cpp/qps/report.cc index ff01ec1501..e03e8e1fb0 100644 --- a/test/cpp/qps/report.cc +++ b/test/cpp/qps/report.cc @@ -34,11 +34,16 @@ #include "test/cpp/qps/report.h" #include <grpc/support/log.h> +#include "test/cpp/qps/driver.h" #include "test/cpp/qps/stats.h" namespace grpc { namespace testing { +static double WallTime(ResourceUsage u) { return u.wall_time(); } +static double UserTime(ResourceUsage u) { return u.user_time(); } +static double SystemTime(ResourceUsage u) { return u.system_time(); } + void CompositeReporter::add(std::unique_ptr<Reporter> reporter) { reporters_.emplace_back(std::move(reporter)); } @@ -68,16 +73,14 @@ void CompositeReporter::ReportTimes(const ScenarioResult& result) { } void GprLogReporter::ReportQPS(const ScenarioResult& result) { - gpr_log(GPR_INFO, "QPS: %.1f", - result.latencies.Count() / - average(result.client_resources, - [](ResourceUsage u) { return u.wall_time; })); + gpr_log( + GPR_INFO, "QPS: %.1f", + result.latencies.Count() / average(result.client_resources, WallTime)); } void GprLogReporter::ReportQPSPerCore(const ScenarioResult& result) { - auto qps = result.latencies.Count() / - average(result.client_resources, - [](ResourceUsage u) { return u.wall_time; }); + auto qps = + result.latencies.Count() / average(result.client_resources, WallTime); gpr_log(GPR_INFO, "QPS: %.1f (%.1f/server core)", qps, qps / result.server_config.threads()); @@ -95,40 +98,30 @@ void GprLogReporter::ReportLatency(const ScenarioResult& result) { void GprLogReporter::ReportTimes(const ScenarioResult& result) { gpr_log(GPR_INFO, "Server system time: %.2f%%", - 100.0 * sum(result.server_resources, - [](ResourceUsage u) { return u.system_time; }) / - sum(result.server_resources, - [](ResourceUsage u) { return u.wall_time; })); + 100.0 * sum(result.server_resources, SystemTime) / + sum(result.server_resources, WallTime)); gpr_log(GPR_INFO, "Server user time: %.2f%%", - 100.0 * sum(result.server_resources, - [](ResourceUsage u) { return u.user_time; }) / - sum(result.server_resources, - [](ResourceUsage u) { return u.wall_time; })); + 100.0 * sum(result.server_resources, UserTime) / + sum(result.server_resources, WallTime)); gpr_log(GPR_INFO, "Client system time: %.2f%%", - 100.0 * sum(result.client_resources, - [](ResourceUsage u) { return u.system_time; }) / - sum(result.client_resources, - [](ResourceUsage u) { return u.wall_time; })); + 100.0 * sum(result.client_resources, SystemTime) / + sum(result.client_resources, WallTime)); gpr_log(GPR_INFO, "Client user time: %.2f%%", - 100.0 * sum(result.client_resources, - [](ResourceUsage u) { return u.user_time; }) / - sum(result.client_resources, - [](ResourceUsage u) { return u.wall_time; })); + 100.0 * sum(result.client_resources, UserTime) / + sum(result.client_resources, WallTime)); } void PerfDbReporter::ReportQPS(const ScenarioResult& result) { - auto qps = result.latencies.Count() / - average(result.client_resources, - [](ResourceUsage u) { return u.wall_time; }); + auto qps = + result.latencies.Count() / average(result.client_resources, WallTime); perf_db_client_.setQps(qps); perf_db_client_.setConfigs(result.client_config, result.server_config); } void PerfDbReporter::ReportQPSPerCore(const ScenarioResult& result) { - auto qps = result.latencies.Count() / - average(result.client_resources, - [](ResourceUsage u) { return u.wall_time; }); + auto qps = + result.latencies.Count() / average(result.client_resources, WallTime); auto qpsPerCore = qps / result.server_config.threads(); @@ -139,33 +132,29 @@ void PerfDbReporter::ReportQPSPerCore(const ScenarioResult& result) { void PerfDbReporter::ReportLatency(const ScenarioResult& result) { perf_db_client_.setLatencies(result.latencies.Percentile(50) / 1000, - result.latencies.Percentile(90) / 1000, - result.latencies.Percentile(95) / 1000, - result.latencies.Percentile(99) / 1000, - result.latencies.Percentile(99.9) / 1000); + result.latencies.Percentile(90) / 1000, + result.latencies.Percentile(95) / 1000, + result.latencies.Percentile(99) / 1000, + result.latencies.Percentile(99.9) / 1000); perf_db_client_.setConfigs(result.client_config, result.server_config); } void PerfDbReporter::ReportTimes(const ScenarioResult& result) { - double server_system_time = - 100.0 * sum(result.server_resources, - [](ResourceUsage u) { return u.system_time; }) / - sum(result.server_resources, [](ResourceUsage u) { return u.wall_time; }); - double server_user_time = - 100.0 * sum(result.server_resources, - [](ResourceUsage u) { return u.user_time; }) / - sum(result.server_resources, [](ResourceUsage u) { return u.wall_time; }); - double client_system_time = - 100.0 * sum(result.client_resources, - [](ResourceUsage u) { return u.system_time; }) / - sum(result.client_resources, [](ResourceUsage u) { return u.wall_time; }); - double client_user_time = - 100.0 * sum(result.client_resources, - [](ResourceUsage u) { return u.user_time; }) / - sum(result.client_resources, [](ResourceUsage u) { return u.wall_time; }); - - perf_db_client_.setTimes(server_system_time, server_user_time, client_system_time, - client_user_time); + const double server_system_time = 100.0 * + sum(result.server_resources, SystemTime) / + sum(result.server_resources, WallTime); + const double server_user_time = 100.0 * + sum(result.server_resources, UserTime) / + sum(result.server_resources, WallTime); + const double client_system_time = 100.0 * + sum(result.client_resources, SystemTime) / + sum(result.client_resources, WallTime); + const double client_user_time = 100.0 * + sum(result.client_resources, UserTime) / + sum(result.client_resources, WallTime); + + perf_db_client_.setTimes(server_system_time, server_user_time, + client_system_time, client_user_time); perf_db_client_.setConfigs(result.client_config, result.server_config); } diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 33b6fa55c3..b4fc49c31c 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -99,25 +99,7 @@ class AsyncQpsServerTest : public Server { shutdown_state_.emplace_back(new PerThreadShutdownState()); } for (int i = 0; i < config.threads(); i++) { - threads_.push_back(std::thread([=]() { - // Wait until work is available or we are shutting down - bool ok; - void *got_tag; - while (srv_cqs_[i]->Next(&got_tag, &ok)) { - ServerRpcContext *ctx = detag(got_tag); - // The tag is a pointer to an RPC context to invoke - bool still_going = ctx->RunNextState(ok); - if (!shutdown_state_[i]->shutdown()) { - // this RPC context is done, so refresh it - if (!still_going) { - ctx->Reset(); - } - } else { - return; - } - } - return; - })); + threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i); } } ~AsyncQpsServerTest() { @@ -142,6 +124,26 @@ class AsyncQpsServerTest : public Server { } private: + void ThreadFunc(int rank) { + // Wait until work is available or we are shutting down + bool ok; + void *got_tag; + while (srv_cqs_[rank]->Next(&got_tag, &ok)) { + ServerRpcContext *ctx = detag(got_tag); + // The tag is a pointer to an RPC context to invoke + const bool still_going = ctx->RunNextState(ok); + if (!shutdown_state_[rank]->shutdown()) { + // this RPC context is done, so refresh it + if (!still_going) { + ctx->Reset(); + } + } else { + return; + } + } + return; + } + class ServerRpcContext { public: ServerRpcContext() {} |