diff options
Diffstat (limited to 'test')
59 files changed, 1663 insertions, 397 deletions
diff --git a/test/compiler/python_plugin_test.py b/test/compiler/python_plugin_test.py index 367effdb1a..0e58d912b9 100644 --- a/test/compiler/python_plugin_test.py +++ b/test/compiler/python_plugin_test.py @@ -36,6 +36,7 @@ import shutil import subprocess import sys import tempfile +import threading import time import unittest @@ -49,13 +50,13 @@ STUB_IDENTIFIER = 'EarlyAdopterTestServiceStub' SERVER_FACTORY_IDENTIFIER = 'early_adopter_create_TestService_server' STUB_FACTORY_IDENTIFIER = 'early_adopter_create_TestService_stub' -# Timeouts and delays. -SHORT_TIMEOUT = 0.1 -NORMAL_TIMEOUT = 1 -LONG_TIMEOUT = 2 -DOES_NOT_MATTER_DELAY = 0 +# The timeout used in tests of RPCs that are supposed to expire. +SHORT_TIMEOUT = 2 +# The timeout used in tests of RPCs that are not supposed to expire. The +# absurdly large value doesn't matter since no passing execution of this test +# module will ever wait the duration. +LONG_TIMEOUT = 600 NO_DELAY = 0 -LONG_DELAY = 1 # Build mode environment variable set by tools/run_tests/run_tests.py. _build_mode = os.environ['CONFIG'] @@ -64,47 +65,54 @@ _build_mode = os.environ['CONFIG'] class _ServicerMethods(object): def __init__(self, test_pb2, delay): + self._condition = threading.Condition() + self._delay = delay self._paused = False - self._failed = False - self.test_pb2 = test_pb2 - self.delay = delay + self._fail = False + self._test_pb2 = test_pb2 @contextlib.contextmanager def pause(self): # pylint: disable=invalid-name - self._paused = True + with self._condition: + self._paused = True yield - self._paused = False + with self._condition: + self._paused = False + self._condition.notify_all() @contextlib.contextmanager def fail(self): # pylint: disable=invalid-name - self._failed = True + with self._condition: + self._fail = True yield - self._failed = False + with self._condition: + self._fail = False def _control(self): # pylint: disable=invalid-name - if self._failed: - raise ValueError() - time.sleep(self.delay) - while self._paused: - time.sleep(0) - - def UnaryCall(self, request, unused_context): - response = self.test_pb2.SimpleResponse() - response.payload.payload_type = self.test_pb2.COMPRESSABLE + with self._condition: + if self._fail: + raise ValueError() + while self._paused: + self._condition.wait() + time.sleep(self._delay) + + def UnaryCall(self, request, unused_rpc_context): + response = self._test_pb2.SimpleResponse() + response.payload.payload_type = self._test_pb2.COMPRESSABLE response.payload.payload_compressable = 'a' * request.response_size self._control() return response - def StreamingOutputCall(self, request, unused_context): + def StreamingOutputCall(self, request, unused_rpc_context): for parameter in request.response_parameters: - response = self.test_pb2.StreamingOutputCallResponse() - response.payload.payload_type = self.test_pb2.COMPRESSABLE + response = self._test_pb2.StreamingOutputCallResponse() + response.payload.payload_type = self._test_pb2.COMPRESSABLE response.payload.payload_compressable = 'a' * parameter.size self._control() yield response - def StreamingInputCall(self, request_iter, unused_context): - response = self.test_pb2.StreamingInputCallResponse() + def StreamingInputCall(self, request_iter, unused_rpc_context): + response = self._test_pb2.StreamingInputCallResponse() aggregated_payload_size = 0 for request in request_iter: aggregated_payload_size += len(request.payload.payload_compressable) @@ -112,21 +120,21 @@ class _ServicerMethods(object): self._control() return response - def FullDuplexCall(self, request_iter, unused_context): + def FullDuplexCall(self, request_iter, unused_rpc_context): for request in request_iter: for parameter in request.response_parameters: - response = self.test_pb2.StreamingOutputCallResponse() - response.payload.payload_type = self.test_pb2.COMPRESSABLE + response = self._test_pb2.StreamingOutputCallResponse() + response.payload.payload_type = self._test_pb2.COMPRESSABLE response.payload.payload_compressable = 'a' * parameter.size self._control() yield response - def HalfDuplexCall(self, request_iter, unused_context): + def HalfDuplexCall(self, request_iter, unused_rpc_context): responses = [] for request in request_iter: for parameter in request.response_parameters: - response = self.test_pb2.StreamingOutputCallResponse() - response.payload.payload_type = self.test_pb2.COMPRESSABLE + response = self._test_pb2.StreamingOutputCallResponse() + response.payload.payload_type = self._test_pb2.COMPRESSABLE response.payload.payload_compressable = 'a' * parameter.size self._control() responses.append(response) @@ -147,12 +155,11 @@ def _CreateService(test_pb2, delay): waiting for the service. Args: - test_pb2: the test_pb2 module generated by this test - delay: delay in seconds per response from the servicer - timeout: how long the stub will wait for the servicer by default. + test_pb2: The test_pb2 module generated by this test. + delay: Delay in seconds per response from the servicer. Yields: - A three-tuple (servicer_methods, servicer, stub), where the servicer is + A (servicer_methods, servicer, stub) three-tuple where servicer_methods is the back-end of the service bound to the stub and the server and stub are both activated and ready for use. """ @@ -185,7 +192,7 @@ def _CreateService(test_pb2, delay): yield servicer_methods, stub, server -def StreamingInputRequest(test_pb2): +def _streaming_input_request_iterator(test_pb2): for _ in range(3): request = test_pb2.StreamingInputCallRequest() request.payload.payload_type = test_pb2.COMPRESSABLE @@ -193,7 +200,7 @@ def StreamingInputRequest(test_pb2): yield request -def StreamingOutputRequest(test_pb2): +def _streaming_output_request(test_pb2): request = test_pb2.StreamingOutputCallRequest() sizes = [1, 2, 3] request.response_parameters.add(size=sizes[0], interval_us=0) @@ -202,7 +209,7 @@ def StreamingOutputRequest(test_pb2): return request -def FullDuplexRequest(test_pb2): +def _full_duplex_request_iterator(test_pb2): request = test_pb2.StreamingOutputCallRequest() request.response_parameters.add(size=1, interval_us=0) yield request @@ -250,7 +257,7 @@ class PythonPluginTest(unittest.TestCase): if exc.errno != errno.ENOENT: raise - # TODO(atash): Figure out which of theses tests is hanging flakily with small + # TODO(atash): Figure out which of these tests is hanging flakily with small # probability. def testImportAttributes(self): @@ -265,37 +272,36 @@ class PythonPluginTest(unittest.TestCase): def testUpDown(self): import test_pb2 with _CreateService( - test_pb2, DOES_NOT_MATTER_DELAY) as (servicer, stub, unused_server): + test_pb2, NO_DELAY) as (servicer, stub, unused_server): request = test_pb2.SimpleRequest(response_size=13) def testUnaryCall(self): import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server): + with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): + timeout = 6 # TODO(issue 2039): LONG_TIMEOUT like the other methods. request = test_pb2.SimpleRequest(response_size=13) - response = stub.UnaryCall(request, NORMAL_TIMEOUT) - expected_response = servicer.UnaryCall(request, None) + response = stub.UnaryCall(request, timeout) + expected_response = methods.UnaryCall(request, 'not a real RpcContext!') self.assertEqual(expected_response, response) def testUnaryCallAsync(self): import test_pb2 # pylint: disable=g-import-not-at-top request = test_pb2.SimpleRequest(response_size=13) - with _CreateService(test_pb2, LONG_DELAY) as ( - servicer, stub, unused_server): - start_time = time.clock() - response_future = stub.UnaryCall.async(request, LONG_TIMEOUT) - # Check that we didn't block on the asynchronous call. - self.assertGreater(LONG_DELAY, time.clock() - start_time) + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + # Check that the call does not block waiting for the server to respond. + with methods.pause(): + response_future = stub.UnaryCall.async(request, LONG_TIMEOUT) response = response_future.result() - expected_response = servicer.UnaryCall(request, None) + expected_response = methods.UnaryCall(request, 'not a real RpcContext!') self.assertEqual(expected_response, response) def testUnaryCallAsyncExpired(self): import test_pb2 # pylint: disable=g-import-not-at-top - # set the timeout super low... - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( - servicer, stub, unused_server): + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): request = test_pb2.SimpleRequest(response_size=13) - with servicer.pause(): + with methods.pause(): response_future = stub.UnaryCall.async(request, SHORT_TIMEOUT) with self.assertRaises(exceptions.ExpirationError): response_future.result() @@ -305,9 +311,9 @@ class PythonPluginTest(unittest.TestCase): def testUnaryCallAsyncCancelled(self): import test_pb2 # pylint: disable=g-import-not-at-top request = test_pb2.SimpleRequest(response_size=13) - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( - servicer, stub, unused_server): - with servicer.pause(): + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.pause(): response_future = stub.UnaryCall.async(request, 1) response_future.cancel() self.assertTrue(response_future.cancelled()) @@ -315,30 +321,31 @@ class PythonPluginTest(unittest.TestCase): def testUnaryCallAsyncFailed(self): import test_pb2 # pylint: disable=g-import-not-at-top request = test_pb2.SimpleRequest(response_size=13) - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( - servicer, stub, unused_server): - with servicer.fail(): - response_future = stub.UnaryCall.async(request, NORMAL_TIMEOUT) + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.fail(): + response_future = stub.UnaryCall.async(request, LONG_TIMEOUT) self.assertIsNotNone(response_future.exception()) def testStreamingOutputCall(self): import test_pb2 # pylint: disable=g-import-not-at-top - request = StreamingOutputRequest(test_pb2) - with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server): - responses = stub.StreamingOutputCall(request, NORMAL_TIMEOUT) - expected_responses = servicer.StreamingOutputCall(request, None) - for check in itertools.izip_longest(expected_responses, responses): - expected_response, response = check + request = _streaming_output_request(test_pb2) + with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): + responses = stub.StreamingOutputCall(request, LONG_TIMEOUT) + expected_responses = methods.StreamingOutputCall( + request, 'not a real RpcContext!') + for expected_response, response in itertools.izip_longest( + expected_responses, responses): self.assertEqual(expected_response, response) @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' 'forever and fix.') def testStreamingOutputCallExpired(self): import test_pb2 # pylint: disable=g-import-not-at-top - request = StreamingOutputRequest(test_pb2) - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( - servicer, stub, unused_server): - with servicer.pause(): + request = _streaming_output_request(test_pb2) + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.pause(): responses = stub.StreamingOutputCall(request, SHORT_TIMEOUT) with self.assertRaises(exceptions.ExpirationError): list(responses) @@ -347,9 +354,9 @@ class PythonPluginTest(unittest.TestCase): 'forever and fix.') def testStreamingOutputCallCancelled(self): import test_pb2 # pylint: disable=g-import-not-at-top - request = StreamingOutputRequest(test_pb2) - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( - unused_servicer, stub, unused_server): + request = _streaming_output_request(test_pb2) + with _CreateService(test_pb2, NO_DELAY) as ( + unused_methods, stub, unused_server): responses = stub.StreamingOutputCall(request, SHORT_TIMEOUT) next(responses) responses.cancel() @@ -360,10 +367,10 @@ class PythonPluginTest(unittest.TestCase): 'instead of raising the proper error.') def testStreamingOutputCallFailed(self): import test_pb2 # pylint: disable=g-import-not-at-top - request = StreamingOutputRequest(test_pb2) - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( - servicer, stub, unused_server): - with servicer.fail(): + request = _streaming_output_request(test_pb2) + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.fail(): responses = stub.StreamingOutputCall(request, 1) self.assertIsNotNone(responses) with self.assertRaises(exceptions.ServicerError): @@ -373,34 +380,32 @@ class PythonPluginTest(unittest.TestCase): 'forever and fix.') def testStreamingInputCall(self): import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server): - response = stub.StreamingInputCall(StreamingInputRequest(test_pb2), - NORMAL_TIMEOUT) - expected_response = servicer.StreamingInputCall( - StreamingInputRequest(test_pb2), None) + with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): + response = stub.StreamingInputCall( + _streaming_input_request_iterator(test_pb2), LONG_TIMEOUT) + expected_response = methods.StreamingInputCall( + _streaming_input_request_iterator(test_pb2), 'not a real RpcContext!') self.assertEqual(expected_response, response) def testStreamingInputCallAsync(self): import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, LONG_DELAY) as ( - servicer, stub, unused_server): - start_time = time.clock() - response_future = stub.StreamingInputCall.async( - StreamingInputRequest(test_pb2), LONG_TIMEOUT) - self.assertGreater(LONG_DELAY, time.clock() - start_time) + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.pause(): + response_future = stub.StreamingInputCall.async( + _streaming_input_request_iterator(test_pb2), LONG_TIMEOUT) response = response_future.result() - expected_response = servicer.StreamingInputCall( - StreamingInputRequest(test_pb2), None) + expected_response = methods.StreamingInputCall( + _streaming_input_request_iterator(test_pb2), 'not a real RpcContext!') self.assertEqual(expected_response, response) def testStreamingInputCallAsyncExpired(self): import test_pb2 # pylint: disable=g-import-not-at-top - # set the timeout super low... - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( - servicer, stub, unused_server): - with servicer.pause(): + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.pause(): response_future = stub.StreamingInputCall.async( - StreamingInputRequest(test_pb2), SHORT_TIMEOUT) + _streaming_input_request_iterator(test_pb2), SHORT_TIMEOUT) with self.assertRaises(exceptions.ExpirationError): response_future.result() self.assertIsInstance( @@ -408,11 +413,12 @@ class PythonPluginTest(unittest.TestCase): def testStreamingInputCallAsyncCancelled(self): import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( - servicer, stub, unused_server): - with servicer.pause(): + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.pause(): + timeout = 6 # TODO(issue 2039): LONG_TIMEOUT like the other methods. response_future = stub.StreamingInputCall.async( - StreamingInputRequest(test_pb2), NORMAL_TIMEOUT) + _streaming_input_request_iterator(test_pb2), timeout) response_future.cancel() self.assertTrue(response_future.cancelled()) with self.assertRaises(future.CancelledError): @@ -420,33 +426,33 @@ class PythonPluginTest(unittest.TestCase): def testStreamingInputCallAsyncFailed(self): import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( - servicer, stub, unused_server): - with servicer.fail(): + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.fail(): response_future = stub.StreamingInputCall.async( - StreamingInputRequest(test_pb2), SHORT_TIMEOUT) + _streaming_input_request_iterator(test_pb2), SHORT_TIMEOUT) self.assertIsNotNone(response_future.exception()) def testFullDuplexCall(self): import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server): - responses = stub.FullDuplexCall(FullDuplexRequest(test_pb2), - NORMAL_TIMEOUT) - expected_responses = servicer.FullDuplexCall(FullDuplexRequest(test_pb2), - None) - for check in itertools.izip_longest(expected_responses, responses): - expected_response, response = check + with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): + responses = stub.FullDuplexCall( + _full_duplex_request_iterator(test_pb2), LONG_TIMEOUT) + expected_responses = methods.FullDuplexCall( + _full_duplex_request_iterator(test_pb2), 'not a real RpcContext!') + for expected_response, response in itertools.izip_longest( + expected_responses, responses): self.assertEqual(expected_response, response) @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' 'forever and fix.') def testFullDuplexCallExpired(self): import test_pb2 # pylint: disable=g-import-not-at-top - request = FullDuplexRequest(test_pb2) - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( - servicer, stub, unused_server): - with servicer.pause(): - responses = stub.FullDuplexCall(request, SHORT_TIMEOUT) + request_iterator = _full_duplex_request_iterator(test_pb2) + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.pause(): + responses = stub.FullDuplexCall(request_iterator, SHORT_TIMEOUT) with self.assertRaises(exceptions.ExpirationError): list(responses) @@ -454,9 +460,9 @@ class PythonPluginTest(unittest.TestCase): 'forever and fix.') def testFullDuplexCallCancelled(self): import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server): - request = FullDuplexRequest(test_pb2) - responses = stub.FullDuplexCall(request, NORMAL_TIMEOUT) + with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): + request_iterator = _full_duplex_request_iterator(test_pb2) + responses = stub.FullDuplexCall(request_iterator, LONG_TIMEOUT) next(responses) responses.cancel() with self.assertRaises(future.CancelledError): @@ -466,11 +472,11 @@ class PythonPluginTest(unittest.TestCase): 'and fix.') def testFullDuplexCallFailed(self): import test_pb2 # pylint: disable=g-import-not-at-top - request = FullDuplexRequest(test_pb2) - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( - servicer, stub, unused_server): - with servicer.fail(): - responses = stub.FullDuplexCall(request, NORMAL_TIMEOUT) + request_iterator = _full_duplex_request_iterator(test_pb2) + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.fail(): + responses = stub.FullDuplexCall(request_iterator, LONG_TIMEOUT) self.assertIsNotNone(responses) with self.assertRaises(exceptions.ServicerError): next(responses) @@ -479,9 +485,9 @@ class PythonPluginTest(unittest.TestCase): 'forever and fix.') def testHalfDuplexCall(self): import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( - servicer, stub, unused_server): - def HalfDuplexRequest(): + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + def half_duplex_request_iterator(): request = test_pb2.StreamingOutputCallRequest() request.response_parameters.add(size=1, interval_us=0) yield request @@ -489,30 +495,38 @@ class PythonPluginTest(unittest.TestCase): request.response_parameters.add(size=2, interval_us=0) request.response_parameters.add(size=3, interval_us=0) yield request - responses = stub.HalfDuplexCall(HalfDuplexRequest(), NORMAL_TIMEOUT) - expected_responses = servicer.HalfDuplexCall(HalfDuplexRequest(), None) + responses = stub.HalfDuplexCall( + half_duplex_request_iterator(), LONG_TIMEOUT) + expected_responses = methods.HalfDuplexCall( + half_duplex_request_iterator(), 'not a real RpcContext!') for check in itertools.izip_longest(expected_responses, responses): expected_response, response = check self.assertEqual(expected_response, response) def testHalfDuplexCallWedged(self): import test_pb2 # pylint: disable=g-import-not-at-top - wait_flag = [False] + condition = threading.Condition() + wait_cell = [False] @contextlib.contextmanager def wait(): # pylint: disable=invalid-name # Where's Python 3's 'nonlocal' statement when you need it? - wait_flag[0] = True + with condition: + wait_cell[0] = True yield - wait_flag[0] = False - def HalfDuplexRequest(): + with condition: + wait_cell[0] = False + condition.notify_all() + def half_duplex_request_iterator(): request = test_pb2.StreamingOutputCallRequest() request.response_parameters.add(size=1, interval_us=0) yield request - while wait_flag[0]: - time.sleep(0.1) - with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server): + with condition: + while wait_cell[0]: + condition.wait() + with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): with wait(): - responses = stub.HalfDuplexCall(HalfDuplexRequest(), NORMAL_TIMEOUT) + responses = stub.HalfDuplexCall( + half_duplex_request_iterator(), SHORT_TIMEOUT) # half-duplex waits for the client to send all info with self.assertRaises(exceptions.ExpirationError): next(responses) diff --git a/test/core/end2end/cq_verifier.c b/test/core/end2end/cq_verifier.c index d37454d9cf..33f7c02b61 100644 --- a/test/core/end2end/cq_verifier.c +++ b/test/core/end2end/cq_verifier.c @@ -133,7 +133,8 @@ static int byte_buffer_eq_slice(grpc_byte_buffer *bb, gpr_slice b) { if (!bb) return 0; - a = merge_slices(bb->data.slice_buffer.slices, bb->data.slice_buffer.count); + a = merge_slices(bb->data.raw.slice_buffer.slices, + bb->data.raw.slice_buffer.count); ok = GPR_SLICE_LENGTH(a) == GPR_SLICE_LENGTH(b) && 0 == memcmp(GPR_SLICE_START_PTR(a), GPR_SLICE_START_PTR(b), GPR_SLICE_LENGTH(a)); diff --git a/test/core/end2end/dualstack_socket_test.c b/test/core/end2end/dualstack_socket_test.c index 1142c37ea4..1e7e21c739 100644 --- a/test/core/end2end/dualstack_socket_test.c +++ b/test/core/end2end/dualstack_socket_test.c @@ -131,17 +131,21 @@ void test_connect(const char *server_host, const char *client_host, int port, op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; op++; op->op = GRPC_OP_RECV_INITIAL_METADATA; op->data.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; op++; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; op->data.recv_status_on_client.status = &status; op->data.recv_status_on_client.status_details = &details; op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); @@ -157,14 +161,17 @@ void test_connect(const char *server_host, const char *client_host, int port, op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; op->data.send_status_from_server.trailing_metadata_count = 0; op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED; op->data.send_status_from_server.status_details = "xyz"; + op->flags = 0; op++; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102))); @@ -177,7 +184,7 @@ void test_connect(const char *server_host, const char *client_host, int port, GPR_ASSERT(0 == strcmp(details, "xyz")); GPR_ASSERT(0 == strcmp(call_details.method, "/foo")); GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr")); - GPR_ASSERT(was_cancelled == 0); + GPR_ASSERT(was_cancelled == 1); grpc_call_destroy(s); } else { diff --git a/test/core/end2end/gen_build_json.py b/test/core/end2end/gen_build_json.py index 7d581f64cc..f47c92bc47 100755 --- a/test/core/end2end/gen_build_json.py +++ b/test/core/end2end/gen_build_json.py @@ -84,6 +84,7 @@ END2END_TESTS = { 'request_response_with_payload_and_call_creds': TestOptions(flaky=False, secure=True), 'request_with_large_metadata': default_test_options, 'request_with_payload': default_test_options, + 'request_with_flags': default_test_options, 'server_finishes_request': default_test_options, 'simple_delayed_request': default_test_options, 'simple_request': default_test_options, @@ -101,7 +102,7 @@ def main(): 'language': 'c', 'secure': 'check' if END2END_FIXTURES[f].secure else 'no', 'src': ['test/core/end2end/fixtures/%s.c' % f], - 'platforms': [ 'posix' ] if f.endswith('_posix') else [ 'windows', 'posix' ], + 'platforms': [ 'posix' ] if f.endswith('_posix') else END2END_FIXTURES[f].platforms, } for f in sorted(END2END_FIXTURES.keys())] + [ { diff --git a/test/core/end2end/no_server_test.c b/test/core/end2end/no_server_test.c index bba9cd1a56..35b837b7b5 100644 --- a/test/core/end2end/no_server_test.c +++ b/test/core/end2end/no_server_test.c @@ -67,12 +67,14 @@ int main(int argc, char **argv) { op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; op->data.recv_status_on_client.status = &status; op->data.recv_status_on_client.status_details = &details; op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, ops, op - ops, tag(1))); diff --git a/test/core/end2end/tests/bad_hostname.c b/test/core/end2end/tests/bad_hostname.c index 09b20f4a89..6dbf16d23f 100644 --- a/test/core/end2end/tests/bad_hostname.c +++ b/test/core/end2end/tests/bad_hostname.c @@ -123,17 +123,21 @@ static void simple_request_body(grpc_end2end_test_fixture f) { op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; op++; op->op = GRPC_OP_RECV_INITIAL_METADATA; op->data.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; op++; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; op->data.recv_status_on_client.status = &status; op->data.recv_status_on_client.status_details = &details; op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); diff --git a/test/core/end2end/tests/cancel_after_accept.c b/test/core/end2end/tests/cancel_after_accept.c index 09a1e488de..13593cfbc7 100644 --- a/test/core/end2end/tests/cancel_after_accept.c +++ b/test/core/end2end/tests/cancel_after_accept.c @@ -118,9 +118,9 @@ static void test_cancel_after_accept(grpc_end2end_test_config config, gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world"); gpr_slice response_payload_slice = gpr_slice_from_copied_string("hello you"); grpc_byte_buffer *request_payload = - grpc_byte_buffer_create(&request_payload_slice, 1); + grpc_raw_byte_buffer_create(&request_payload_slice, 1); grpc_byte_buffer *response_payload = - grpc_byte_buffer_create(&response_payload_slice, 1); + grpc_raw_byte_buffer_create(&response_payload_slice, 1); int was_cancelled = 2; c = grpc_channel_create_call(f.client, f.cq, "/foo", @@ -138,18 +138,23 @@ static void test_cancel_after_accept(grpc_end2end_test_config config, op->data.recv_status_on_client.status = &status; op->data.recv_status_on_client.status_details = &details; op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = 0; op++; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_SEND_MESSAGE; op->data.send_message = request_payload; + op->flags = 0; op++; op->op = GRPC_OP_RECV_INITIAL_METADATA; op->data.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; op++; op->op = GRPC_OP_RECV_MESSAGE; op->data.recv_message = &response_payload_recv; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); @@ -163,15 +168,19 @@ static void test_cancel_after_accept(grpc_end2end_test_config config, op = ops; op->op = GRPC_OP_RECV_MESSAGE; op->data.recv_message = &request_payload_recv; + op->flags = 0; op++; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_SEND_MESSAGE; op->data.send_message = response_payload; + op->flags = 0; op++; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(3))); diff --git a/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c b/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c index 604ab8cb8f..07741b601c 100644 --- a/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c +++ b/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c @@ -118,9 +118,9 @@ static void test_cancel_after_accept_and_writes_closed( gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world"); gpr_slice response_payload_slice = gpr_slice_from_copied_string("hello you"); grpc_byte_buffer *request_payload = - grpc_byte_buffer_create(&request_payload_slice, 1); + grpc_raw_byte_buffer_create(&request_payload_slice, 1); grpc_byte_buffer *response_payload = - grpc_byte_buffer_create(&response_payload_slice, 1); + grpc_raw_byte_buffer_create(&response_payload_slice, 1); int was_cancelled = 2; c = grpc_channel_create_call(f.client, f.cq, "/foo", @@ -138,20 +138,26 @@ static void test_cancel_after_accept_and_writes_closed( op->data.recv_status_on_client.status = &status; op->data.recv_status_on_client.status_details = &details; op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = 0; op++; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_SEND_MESSAGE; op->data.send_message = request_payload; + op->flags = 0; op++; op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; op++; op->op = GRPC_OP_RECV_INITIAL_METADATA; op->data.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; op++; op->op = GRPC_OP_RECV_MESSAGE; op->data.recv_message = &response_payload_recv; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); @@ -165,15 +171,19 @@ static void test_cancel_after_accept_and_writes_closed( op = ops; op->op = GRPC_OP_RECV_MESSAGE; op->data.recv_message = &request_payload_recv; + op->flags = 0; op++; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; op++; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_SEND_MESSAGE; op->data.send_message = response_payload; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(3))); diff --git a/test/core/end2end/tests/cancel_after_invoke.c b/test/core/end2end/tests/cancel_after_invoke.c index b9438b4a47..99e830d117 100644 --- a/test/core/end2end/tests/cancel_after_invoke.c +++ b/test/core/end2end/tests/cancel_after_invoke.c @@ -117,7 +117,7 @@ static void test_cancel_after_invoke(grpc_end2end_test_config config, grpc_byte_buffer *response_payload_recv = NULL; gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world"); grpc_byte_buffer *request_payload = - grpc_byte_buffer_create(&request_payload_slice, 1); + grpc_raw_byte_buffer_create(&request_payload_slice, 1); c = grpc_channel_create_call(f.client, f.cq, "/foo", "foo.test.google.fr", deadline); @@ -134,20 +134,26 @@ static void test_cancel_after_invoke(grpc_end2end_test_config config, op->data.recv_status_on_client.status = &status; op->data.recv_status_on_client.status_details = &details; op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = 0; op++; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_SEND_MESSAGE; op->data.send_message = request_payload; + op->flags = 0; op++; op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; op++; op->op = GRPC_OP_RECV_INITIAL_METADATA; op->data.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; op++; op->op = GRPC_OP_RECV_MESSAGE; op->data.recv_message = &response_payload_recv; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, test_ops, tag(1))); diff --git a/test/core/end2end/tests/cancel_before_invoke.c b/test/core/end2end/tests/cancel_before_invoke.c index 7455c1cb3d..d098c61937 100644 --- a/test/core/end2end/tests/cancel_before_invoke.c +++ b/test/core/end2end/tests/cancel_before_invoke.c @@ -114,7 +114,7 @@ static void test_cancel_before_invoke(grpc_end2end_test_config config, grpc_byte_buffer *response_payload_recv = NULL; gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world"); grpc_byte_buffer *request_payload = - grpc_byte_buffer_create(&request_payload_slice, 1); + grpc_raw_byte_buffer_create(&request_payload_slice, 1); c = grpc_channel_create_call(f.client, f.cq, "/foo", "foo.test.google.fr", deadline); @@ -133,20 +133,26 @@ static void test_cancel_before_invoke(grpc_end2end_test_config config, op->data.recv_status_on_client.status = &status; op->data.recv_status_on_client.status_details = &details; op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = 0; op++; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_SEND_MESSAGE; op->data.send_message = request_payload; + op->flags = 0; op++; op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; op++; op->op = GRPC_OP_RECV_INITIAL_METADATA; op->data.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; op++; op->op = GRPC_OP_RECV_MESSAGE; op->data.recv_message = &response_payload_recv; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, test_ops, tag(1))); diff --git a/test/core/end2end/tests/census_simple_request.c b/test/core/end2end/tests/census_simple_request.c index f3a46e23c9..f0820743f2 100644 --- a/test/core/end2end/tests/census_simple_request.c +++ b/test/core/end2end/tests/census_simple_request.c @@ -121,17 +121,21 @@ static void test_body(grpc_end2end_test_fixture f) { op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; op++; op->op = GRPC_OP_RECV_INITIAL_METADATA; op->data.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; op++; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; op->data.recv_status_on_client.status = &status; op->data.recv_status_on_client.status_details = &details; op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); @@ -145,14 +149,17 @@ static void test_body(grpc_end2end_test_fixture f) { op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; op->data.send_status_from_server.trailing_metadata_count = 0; op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED; op->data.send_status_from_server.status_details = "xyz"; + op->flags = 0; op++; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102))); @@ -164,7 +171,7 @@ static void test_body(grpc_end2end_test_fixture f) { GPR_ASSERT(0 == strcmp(details, "xyz")); GPR_ASSERT(0 == strcmp(call_details.method, "/foo")); GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr:1234")); - GPR_ASSERT(was_cancelled == 0); + GPR_ASSERT(was_cancelled == 1); gpr_free(details); grpc_metadata_array_destroy(&initial_metadata_recv); diff --git a/test/core/end2end/tests/disappearing_server.c b/test/core/end2end/tests/disappearing_server.c index d962f870b0..1a394bfce2 100644 --- a/test/core/end2end/tests/disappearing_server.c +++ b/test/core/end2end/tests/disappearing_server.c @@ -109,17 +109,21 @@ static void do_request_and_shutdown_server(grpc_end2end_test_fixture *f, op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; op++; op->op = GRPC_OP_RECV_INITIAL_METADATA; op->data.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; op++; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; op->data.recv_status_on_client.status = &status; op->data.recv_status_on_client.status_details = &details; op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); @@ -137,14 +141,17 @@ static void do_request_and_shutdown_server(grpc_end2end_test_fixture *f, op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; op->data.send_status_from_server.trailing_metadata_count = 0; op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED; op->data.send_status_from_server.status_details = "xyz"; + op->flags = 0; op++; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102))); @@ -157,7 +164,7 @@ static void do_request_and_shutdown_server(grpc_end2end_test_fixture *f, GPR_ASSERT(0 == strcmp(details, "xyz")); GPR_ASSERT(0 == strcmp(call_details.method, "/foo")); GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr:1234")); - GPR_ASSERT(was_cancelled == 0); + GPR_ASSERT(was_cancelled == 1); gpr_free(details); grpc_metadata_array_destroy(&initial_metadata_recv); diff --git a/test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c b/test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c index ca07024560..fd6074fe30 100644 --- a/test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c +++ b/test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c @@ -117,17 +117,21 @@ static void test_early_server_shutdown_finishes_inflight_calls( op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; op->data.send_initial_metadata.metadata = NULL; + op->flags = 0; op++; op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; op++; op->op = GRPC_OP_RECV_INITIAL_METADATA; op->data.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; op++; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; op->data.recv_status_on_client.status = &status; op->data.recv_status_on_client.status_details = &details; op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); @@ -141,6 +145,7 @@ static void test_early_server_shutdown_finishes_inflight_calls( op = ops; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102))); diff --git a/test/core/end2end/tests/graceful_server_shutdown.c b/test/core/end2end/tests/graceful_server_shutdown.c index c1fc6fb4c5..8d0034264c 100644 --- a/test/core/end2end/tests/graceful_server_shutdown.c +++ b/test/core/end2end/tests/graceful_server_shutdown.c @@ -124,17 +124,21 @@ static void test_early_server_shutdown_finishes_inflight_calls( op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; op->data.send_initial_metadata.metadata = NULL; + op->flags = 0; op++; op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; op++; op->op = GRPC_OP_RECV_INITIAL_METADATA; op->data.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; op++; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; op->data.recv_status_on_client.status = &status; op->data.recv_status_on_client.status_details = &details; op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); @@ -152,14 +156,17 @@ static void test_early_server_shutdown_finishes_inflight_calls( op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; op->data.send_status_from_server.trailing_metadata_count = 0; op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED; op->data.send_status_from_server.status_details = "xyz"; + op->flags = 0; op++; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102))); @@ -173,7 +180,7 @@ static void test_early_server_shutdown_finishes_inflight_calls( GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED); GPR_ASSERT(0 == strcmp(call_details.method, "/foo")); GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr")); - GPR_ASSERT(was_cancelled == 0); + GPR_ASSERT(was_cancelled == 1); gpr_free(details); grpc_metadata_array_destroy(&initial_metadata_recv); diff --git a/test/core/end2end/tests/invoke_large_request.c b/test/core/end2end/tests/invoke_large_request.c index fb4d60ccb5..cc976639d2 100644 --- a/test/core/end2end/tests/invoke_large_request.c +++ b/test/core/end2end/tests/invoke_large_request.c @@ -107,9 +107,9 @@ static void test_invoke_large_request(grpc_end2end_test_config config) { grpc_call *c; grpc_call *s; grpc_byte_buffer *request_payload = - grpc_byte_buffer_create(&request_payload_slice, 1); + grpc_raw_byte_buffer_create(&request_payload_slice, 1); grpc_byte_buffer *response_payload = - grpc_byte_buffer_create(&response_payload_slice, 1); + grpc_raw_byte_buffer_create(&response_payload_slice, 1); gpr_timespec deadline = n_seconds_time(30); cq_verifier *cqv = cq_verifier_create(f.cq); grpc_op ops[6]; @@ -198,7 +198,7 @@ static void test_invoke_large_request(grpc_end2end_test_config config) { GPR_ASSERT(0 == strcmp(details, "xyz")); GPR_ASSERT(0 == strcmp(call_details.method, "/foo")); GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr")); - GPR_ASSERT(was_cancelled == 0); + GPR_ASSERT(was_cancelled == 1); gpr_free(details); grpc_metadata_array_destroy(&initial_metadata_recv); diff --git a/test/core/end2end/tests/max_concurrent_streams.c b/test/core/end2end/tests/max_concurrent_streams.c index 8cfa15a9bc..6a54c6a756 100644 --- a/test/core/end2end/tests/max_concurrent_streams.c +++ b/test/core/end2end/tests/max_concurrent_streams.c @@ -123,17 +123,21 @@ static void simple_request_body(grpc_end2end_test_fixture f) { op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; op++; op->op = GRPC_OP_RECV_INITIAL_METADATA; op->data.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; op++; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; op->data.recv_status_on_client.status = &status; op->data.recv_status_on_client.status_details = &details; op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); @@ -147,14 +151,17 @@ static void simple_request_body(grpc_end2end_test_fixture f) { op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; op->data.send_status_from_server.trailing_metadata_count = 0; op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED; op->data.send_status_from_server.status_details = "xyz"; + op->flags = 0; op++; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102))); @@ -166,7 +173,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) { GPR_ASSERT(0 == strcmp(details, "xyz")); GPR_ASSERT(0 == strcmp(call_details.method, "/foo")); GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr:1234")); - GPR_ASSERT(was_cancelled == 0); + GPR_ASSERT(was_cancelled == 1); gpr_free(details); grpc_metadata_array_destroy(&initial_metadata_recv); @@ -251,8 +258,10 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) { op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c1, ops, op - ops, tag(301))); @@ -263,9 +272,11 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) { op->data.recv_status_on_client.status = &status1; op->data.recv_status_on_client.status_details = &details1; op->data.recv_status_on_client.status_details_capacity = &details_capacity1; + op->flags = 0; op++; op->op = GRPC_OP_RECV_INITIAL_METADATA; op->data.recv_initial_metadata = &initial_metadata_recv1; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c1, ops, op - ops, tag(302))); @@ -273,8 +284,10 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) { op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c2, ops, op - ops, tag(401))); @@ -285,9 +298,11 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) { op->data.recv_status_on_client.status = &status2; op->data.recv_status_on_client.status_details = &details2; op->data.recv_status_on_client.status_details_capacity = &details_capacity2; + op->flags = 0; op++; op->op = GRPC_OP_RECV_INITIAL_METADATA; op->data.recv_initial_metadata = &initial_metadata_recv1; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c2, ops, op - ops, tag(402))); @@ -318,14 +333,17 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) { op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; op++; op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; op->data.send_status_from_server.trailing_metadata_count = 0; op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED; op->data.send_status_from_server.status_details = "xyz"; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s1, ops, op - ops, tag(102))); @@ -347,14 +365,17 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) { op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; op++; op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; op->data.send_status_from_server.trailing_metadata_count = 0; op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED; op->data.send_status_from_server.status_details = "xyz"; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s2, ops, op - ops, tag(202))); diff --git a/test/core/end2end/tests/max_message_length.c b/test/core/end2end/tests/max_message_length.c index ad72091cfe..06d617eaa9 100644 --- a/test/core/end2end/tests/max_message_length.c +++ b/test/core/end2end/tests/max_message_length.c @@ -106,7 +106,7 @@ static void test_max_message_length(grpc_end2end_test_config config) { grpc_op *op; gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world"); grpc_byte_buffer *request_payload = - grpc_byte_buffer_create(&request_payload_slice, 1); + grpc_raw_byte_buffer_create(&request_payload_slice, 1); grpc_metadata_array initial_metadata_recv; grpc_metadata_array trailing_metadata_recv; grpc_metadata_array request_metadata_recv; @@ -138,20 +138,25 @@ static void test_max_message_length(grpc_end2end_test_config config) { op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_SEND_MESSAGE; op->data.send_message = request_payload; + op->flags = 0; op++; op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; op++; op->op = GRPC_OP_RECV_INITIAL_METADATA; op->data.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; op++; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; op->data.recv_status_on_client.status = &status; op->data.recv_status_on_client.status_details = &details; op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); @@ -165,6 +170,7 @@ static void test_max_message_length(grpc_end2end_test_config config) { op = ops; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102))); diff --git a/test/core/end2end/tests/ping_pong_streaming.c b/test/core/end2end/tests/ping_pong_streaming.c index c9f161cadf..75b01a5cc4 100644 --- a/test/core/end2end/tests/ping_pong_streaming.c +++ b/test/core/end2end/tests/ping_pong_streaming.c @@ -133,15 +133,18 @@ static void test_pingpong_streaming(grpc_end2end_test_config config, op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_RECV_INITIAL_METADATA; op->data.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; op++; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; op->data.recv_status_on_client.status = &status; op->data.recv_status_on_client.status_details = &details; op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); @@ -155,28 +158,33 @@ static void test_pingpong_streaming(grpc_end2end_test_config config, op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(101))); for (i = 0; i < messages; i++) { - request_payload = grpc_byte_buffer_create(&request_payload_slice, 1); - response_payload = grpc_byte_buffer_create(&response_payload_slice, 1); + request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); + response_payload = grpc_raw_byte_buffer_create(&response_payload_slice, 1); op = ops; op->op = GRPC_OP_SEND_MESSAGE; op->data.send_message = request_payload; + op->flags = 0; op++; op->op = GRPC_OP_RECV_MESSAGE; op->data.recv_message = &response_payload_recv; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(2))); op = ops; op->op = GRPC_OP_RECV_MESSAGE; op->data.recv_message = &request_payload_recv; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102))); @@ -186,6 +194,7 @@ static void test_pingpong_streaming(grpc_end2end_test_config config, op = ops; op->op = GRPC_OP_SEND_MESSAGE; op->data.send_message = response_payload; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(103))); @@ -204,6 +213,7 @@ static void test_pingpong_streaming(grpc_end2end_test_config config, op = ops; op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(3))); @@ -212,6 +222,7 @@ static void test_pingpong_streaming(grpc_end2end_test_config config, op->data.send_status_from_server.trailing_metadata_count = 0; op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED; op->data.send_status_from_server.status_details = "xyz"; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(104))); diff --git a/test/core/end2end/tests/registered_call.c b/test/core/end2end/tests/registered_call.c index 70c70e9c40..7b4ad6adc5 100644 --- a/test/core/end2end/tests/registered_call.c +++ b/test/core/end2end/tests/registered_call.c @@ -124,17 +124,21 @@ static void simple_request_body(grpc_end2end_test_fixture f, void *rc) { op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; op++; op->op = GRPC_OP_RECV_INITIAL_METADATA; op->data.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; op++; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; op->data.recv_status_on_client.status = &status; op->data.recv_status_on_client.status_details = &details; op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); @@ -148,14 +152,17 @@ static void simple_request_body(grpc_end2end_test_fixture f, void *rc) { op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; op->data.send_status_from_server.trailing_metadata_count = 0; op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED; op->data.send_status_from_server.status_details = "xyz"; + op->flags = 0; op++; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102))); @@ -167,7 +174,7 @@ static void simple_request_body(grpc_end2end_test_fixture f, void *rc) { GPR_ASSERT(0 == strcmp(details, "xyz")); GPR_ASSERT(0 == strcmp(call_details.method, "/foo")); GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr:1234")); - GPR_ASSERT(was_cancelled == 0); + GPR_ASSERT(was_cancelled == 1); gpr_free(details); grpc_metadata_array_destroy(&initial_metadata_recv); diff --git a/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c b/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c index fbdab81cd0..4f1267fb39 100644 --- a/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c +++ b/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c @@ -103,9 +103,9 @@ static void test_request_response_with_metadata_and_payload( gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world"); gpr_slice response_payload_slice = gpr_slice_from_copied_string("hello you"); grpc_byte_buffer *request_payload = - grpc_byte_buffer_create(&request_payload_slice, 1); + grpc_raw_byte_buffer_create(&request_payload_slice, 1); grpc_byte_buffer *response_payload = - grpc_byte_buffer_create(&response_payload_slice, 1); + grpc_raw_byte_buffer_create(&response_payload_slice, 1); gpr_timespec deadline = five_seconds_time(); grpc_metadata meta_c[2] = { {"key1-bin", @@ -153,23 +153,29 @@ static void test_request_response_with_metadata_and_payload( op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 2; op->data.send_initial_metadata.metadata = meta_c; + op->flags = 0; op++; op->op = GRPC_OP_SEND_MESSAGE; op->data.send_message = request_payload; + op->flags = 0; op++; op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; op++; op->op = GRPC_OP_RECV_INITIAL_METADATA; op->data.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; op++; op->op = GRPC_OP_RECV_MESSAGE; op->data.recv_message = &response_payload_recv; + op->flags = 0; op++; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; op->data.recv_status_on_client.status = &status; op->data.recv_status_on_client.status_details = &details; op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); @@ -184,9 +190,11 @@ static void test_request_response_with_metadata_and_payload( op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 2; op->data.send_initial_metadata.metadata = meta_s; + op->flags = 0; op++; op->op = GRPC_OP_RECV_MESSAGE; op->data.recv_message = &request_payload_recv; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102))); @@ -196,14 +204,17 @@ static void test_request_response_with_metadata_and_payload( op = ops; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; op++; op->op = GRPC_OP_SEND_MESSAGE; op->data.send_message = response_payload; + op->flags = 0; op++; op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; op->data.send_status_from_server.trailing_metadata_count = 0; op->data.send_status_from_server.status = GRPC_STATUS_OK; op->data.send_status_from_server.status_details = "xyz"; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(103))); diff --git a/test/core/end2end/tests/request_response_with_metadata_and_payload.c b/test/core/end2end/tests/request_response_with_metadata_and_payload.c index f1b57f3457..6d1695c177 100644 --- a/test/core/end2end/tests/request_response_with_metadata_and_payload.c +++ b/test/core/end2end/tests/request_response_with_metadata_and_payload.c @@ -103,9 +103,9 @@ static void test_request_response_with_metadata_and_payload( gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world"); gpr_slice response_payload_slice = gpr_slice_from_copied_string("hello you"); grpc_byte_buffer *request_payload = - grpc_byte_buffer_create(&request_payload_slice, 1); + grpc_raw_byte_buffer_create(&request_payload_slice, 1); grpc_byte_buffer *response_payload = - grpc_byte_buffer_create(&response_payload_slice, 1); + grpc_raw_byte_buffer_create(&response_payload_slice, 1); gpr_timespec deadline = five_seconds_time(); grpc_metadata meta_c[2] = {{"key1", "val1", 4, {{NULL, NULL, NULL}}}, {"key2", "val2", 4, {{NULL, NULL, NULL}}}}; @@ -139,23 +139,29 @@ static void test_request_response_with_metadata_and_payload( op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 2; op->data.send_initial_metadata.metadata = meta_c; + op->flags = 0; op++; op->op = GRPC_OP_SEND_MESSAGE; op->data.send_message = request_payload; + op->flags = 0; op++; op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; op++; op->op = GRPC_OP_RECV_INITIAL_METADATA; op->data.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; op++; op->op = GRPC_OP_RECV_MESSAGE; op->data.recv_message = &response_payload_recv; + op->flags = 0; op++; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; op->data.recv_status_on_client.status = &status; op->data.recv_status_on_client.status_details = &details; op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); @@ -170,9 +176,11 @@ static void test_request_response_with_metadata_and_payload( op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 2; op->data.send_initial_metadata.metadata = meta_s; + op->flags = 0; op++; op->op = GRPC_OP_RECV_MESSAGE; op->data.recv_message = &request_payload_recv; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102))); @@ -182,14 +190,17 @@ static void test_request_response_with_metadata_and_payload( op = ops; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; op++; op->op = GRPC_OP_SEND_MESSAGE; op->data.send_message = response_payload; + op->flags = 0; op++; op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; op->data.send_status_from_server.trailing_metadata_count = 0; op->data.send_status_from_server.status = GRPC_STATUS_OK; op->data.send_status_from_server.status_details = "xyz"; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(103))); diff --git a/test/core/end2end/tests/request_response_with_payload.c b/test/core/end2end/tests/request_response_with_payload.c index 543a625919..1f72b2e92f 100644 --- a/test/core/end2end/tests/request_response_with_payload.c +++ b/test/core/end2end/tests/request_response_with_payload.c @@ -101,9 +101,9 @@ static void request_response_with_payload(grpc_end2end_test_fixture f) { grpc_call *c; grpc_call *s; grpc_byte_buffer *request_payload = - grpc_byte_buffer_create(&request_payload_slice, 1); + grpc_raw_byte_buffer_create(&request_payload_slice, 1); grpc_byte_buffer *response_payload = - grpc_byte_buffer_create(&response_payload_slice, 1); + grpc_raw_byte_buffer_create(&response_payload_slice, 1); gpr_timespec deadline = five_seconds_time(); cq_verifier *cqv = cq_verifier_create(f.cq); grpc_op ops[6]; @@ -131,23 +131,29 @@ static void request_response_with_payload(grpc_end2end_test_fixture f) { op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_SEND_MESSAGE; op->data.send_message = request_payload; + op->flags = 0; op++; op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; op++; op->op = GRPC_OP_RECV_INITIAL_METADATA; op->data.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; op++; op->op = GRPC_OP_RECV_MESSAGE; op->data.recv_message = &response_payload_recv; + op->flags = 0; op++; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; op->data.recv_status_on_client.status = &status; op->data.recv_status_on_client.status_details = &details; op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); @@ -161,9 +167,11 @@ static void request_response_with_payload(grpc_end2end_test_fixture f) { op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_RECV_MESSAGE; op->data.recv_message = &request_payload_recv; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102))); @@ -173,14 +181,17 @@ static void request_response_with_payload(grpc_end2end_test_fixture f) { op = ops; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; op++; op->op = GRPC_OP_SEND_MESSAGE; op->data.send_message = response_payload; + op->flags = 0; op++; op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; op->data.send_status_from_server.trailing_metadata_count = 0; op->data.send_status_from_server.status = GRPC_STATUS_OK; op->data.send_status_from_server.status_details = "xyz"; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(103))); diff --git a/test/core/end2end/tests/request_response_with_payload_and_call_creds.c b/test/core/end2end/tests/request_response_with_payload_and_call_creds.c index 45057f07c5..f75ecf58f4 100644 --- a/test/core/end2end/tests/request_response_with_payload_and_call_creds.c +++ b/test/core/end2end/tests/request_response_with_payload_and_call_creds.c @@ -154,9 +154,9 @@ static void request_response_with_payload_and_call_creds( gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world"); gpr_slice response_payload_slice = gpr_slice_from_copied_string("hello you"); grpc_byte_buffer *request_payload = - grpc_byte_buffer_create(&request_payload_slice, 1); + grpc_raw_byte_buffer_create(&request_payload_slice, 1); grpc_byte_buffer *response_payload = - grpc_byte_buffer_create(&response_payload_slice, 1); + grpc_raw_byte_buffer_create(&response_payload_slice, 1); gpr_timespec deadline = five_seconds_time(); grpc_end2end_test_fixture f = begin_test(config, test_name, NULL, NULL); @@ -206,23 +206,29 @@ static void request_response_with_payload_and_call_creds( op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_SEND_MESSAGE; op->data.send_message = request_payload; + op->flags = 0; op++; op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; op++; op->op = GRPC_OP_RECV_INITIAL_METADATA; op->data.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; op++; op->op = GRPC_OP_RECV_MESSAGE; op->data.recv_message = &response_payload_recv; + op->flags = 0; op++; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; op->data.recv_status_on_client.status = &status; op->data.recv_status_on_client.status_details = &details; op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); @@ -243,9 +249,11 @@ static void request_response_with_payload_and_call_creds( op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_RECV_MESSAGE; op->data.recv_message = &request_payload_recv; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102))); @@ -255,14 +263,17 @@ static void request_response_with_payload_and_call_creds( op = ops; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; op++; op->op = GRPC_OP_SEND_MESSAGE; op->data.send_message = response_payload; + op->flags = 0; op++; op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; op->data.send_status_from_server.trailing_metadata_count = 0; op->data.send_status_from_server.status = GRPC_STATUS_OK; op->data.send_status_from_server.status_details = "xyz"; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(103))); diff --git a/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c b/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c index 4a6563b275..48de52ae7b 100644 --- a/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c +++ b/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c @@ -103,9 +103,9 @@ static void test_request_response_with_metadata_and_payload( gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world"); gpr_slice response_payload_slice = gpr_slice_from_copied_string("hello you"); grpc_byte_buffer *request_payload = - grpc_byte_buffer_create(&request_payload_slice, 1); + grpc_raw_byte_buffer_create(&request_payload_slice, 1); grpc_byte_buffer *response_payload = - grpc_byte_buffer_create(&response_payload_slice, 1); + grpc_raw_byte_buffer_create(&response_payload_slice, 1); gpr_timespec deadline = five_seconds_time(); grpc_metadata meta_c[2] = {{"key1", "val1", 4, {{NULL, NULL, NULL}}}, {"key2", "val2", 4, {{NULL, NULL, NULL}}}}; grpc_metadata meta_s[2] = {{"key3", "val3", 4, {{NULL, NULL, NULL}}}, {"key4", "val4", 4, {{NULL, NULL, NULL}}}}; @@ -138,23 +138,29 @@ static void test_request_response_with_metadata_and_payload( op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 2; op->data.send_initial_metadata.metadata = meta_c; + op->flags = 0; op++; op->op = GRPC_OP_SEND_MESSAGE; op->data.send_message = request_payload; + op->flags = 0; op++; op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; op++; op->op = GRPC_OP_RECV_INITIAL_METADATA; op->data.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; op++; op->op = GRPC_OP_RECV_MESSAGE; op->data.recv_message = &response_payload_recv; + op->flags = 0; op++; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; op->data.recv_status_on_client.status = &status; op->data.recv_status_on_client.status_details = &details; op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); @@ -170,9 +176,11 @@ static void test_request_response_with_metadata_and_payload( op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 2; op->data.send_initial_metadata.metadata = meta_s; + op->flags = 0; op++; op->op = GRPC_OP_RECV_MESSAGE; op->data.recv_message = &request_payload_recv; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102))); @@ -182,15 +190,18 @@ static void test_request_response_with_metadata_and_payload( op = ops; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; op++; op->op = GRPC_OP_SEND_MESSAGE; op->data.send_message = response_payload; + op->flags = 0; op++; op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; op->data.send_status_from_server.trailing_metadata_count = 2; op->data.send_status_from_server.trailing_metadata = meta_t; op->data.send_status_from_server.status = GRPC_STATUS_OK; op->data.send_status_from_server.status_details = "xyz"; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(103))); diff --git a/test/core/end2end/tests/request_with_flags.c b/test/core/end2end/tests/request_with_flags.c new file mode 100644 index 0000000000..79de915dfc --- /dev/null +++ b/test/core/end2end/tests/request_with_flags.c @@ -0,0 +1,202 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "test/core/end2end/end2end_tests.h" + +#include <stdio.h> +#include <string.h> + +#include <grpc/byte_buffer.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/time.h> +#include <grpc/support/useful.h> +#include "src/core/transport/stream_op.h" +#include "test/core/end2end/cq_verifier.h" + +enum { TIMEOUT = 200000 }; + +static void *tag(gpr_intptr t) { return (void *)t; } + +static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, + const char *test_name, + grpc_channel_args *client_args, + grpc_channel_args *server_args) { + grpc_end2end_test_fixture f; + gpr_log(GPR_INFO, "%s/%s", test_name, config.name); + f = config.create_fixture(client_args, server_args); + config.init_client(&f, client_args); + config.init_server(&f, server_args); + return f; +} + +static gpr_timespec n_seconds_time(int n) { + return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(n); +} + +static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); } + +static void drain_cq(grpc_completion_queue *cq) { + grpc_event ev; + do { + ev = grpc_completion_queue_next(cq, five_seconds_time()); + } while (ev.type != GRPC_QUEUE_SHUTDOWN); +} + +static void shutdown_server(grpc_end2end_test_fixture *f) { + if (!f->server) return; + grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE); + grpc_server_destroy(f->server); + f->server = NULL; +} + +static void shutdown_client(grpc_end2end_test_fixture *f) { + if (!f->client) return; + grpc_channel_destroy(f->client); + f->client = NULL; +} + +static void end_test(grpc_end2end_test_fixture *f) { + shutdown_server(f); + shutdown_client(f); + + grpc_completion_queue_shutdown(f->cq); + drain_cq(f->cq); + grpc_completion_queue_destroy(f->cq); +} + +static void test_invoke_request_with_flags( + grpc_end2end_test_config config, gpr_uint32 *flags_for_op, + grpc_call_error call_start_batch_expected_result) { + grpc_call *c; + gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world"); + grpc_byte_buffer *request_payload = + grpc_raw_byte_buffer_create(&request_payload_slice, 1); + gpr_timespec deadline = five_seconds_time(); + grpc_end2end_test_fixture f = + begin_test(config, "test_invoke_request_with_flags", NULL, NULL); + cq_verifier *cqv = cq_verifier_create(f.cq); + grpc_op ops[6]; + grpc_op *op; + grpc_metadata_array initial_metadata_recv; + grpc_metadata_array trailing_metadata_recv; + grpc_metadata_array request_metadata_recv; + grpc_byte_buffer *request_payload_recv = NULL; + grpc_call_details call_details; + grpc_status_code status; + char *details = NULL; + size_t details_capacity = 0; + grpc_call_error expectation; + + c = grpc_channel_create_call(f.client, f.cq, "/foo", + "foo.test.google.fr", deadline); + GPR_ASSERT(c); + + grpc_metadata_array_init(&initial_metadata_recv); + grpc_metadata_array_init(&trailing_metadata_recv); + grpc_metadata_array_init(&request_metadata_recv); + grpc_call_details_init(&call_details); + + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = flags_for_op[op->op]; + op++; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message = request_payload; + op->flags = flags_for_op[op->op]; + op++; + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = flags_for_op[op->op]; + op++; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata = &initial_metadata_recv; + op->flags = flags_for_op[op->op]; + op++; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; + op->data.recv_status_on_client.status = &status; + op->data.recv_status_on_client.status_details = &details; + op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = flags_for_op[op->op]; + op++; + expectation = call_start_batch_expected_result; + GPR_ASSERT(expectation == grpc_call_start_batch(c, ops, op - ops, tag(1))); + + gpr_free(details); + grpc_metadata_array_destroy(&initial_metadata_recv); + grpc_metadata_array_destroy(&trailing_metadata_recv); + grpc_metadata_array_destroy(&request_metadata_recv); + grpc_call_details_destroy(&call_details); + + grpc_call_destroy(c); + + cq_verifier_destroy(cqv); + + grpc_byte_buffer_destroy(request_payload); + grpc_byte_buffer_destroy(request_payload_recv); + + end_test(&f); + config.tear_down_data(&f); +} + +void grpc_end2end_tests(grpc_end2end_test_config config) { + size_t i; + gpr_uint32 flags_for_op[GRPC_OP_RECV_CLOSE_ON_SERVER+1]; + + { + /* check that all grpc_op_types fail when their flag value is set to an + * invalid value */ + int indices[] = {GRPC_OP_SEND_INITIAL_METADATA, GRPC_OP_SEND_MESSAGE, + GRPC_OP_SEND_CLOSE_FROM_CLIENT, + GRPC_OP_RECV_INITIAL_METADATA, + GRPC_OP_RECV_STATUS_ON_CLIENT}; + for (i = 0; i < GPR_ARRAY_SIZE(indices); ++i) { + memset(flags_for_op, 0, sizeof(flags_for_op)); + flags_for_op[indices[i]] = 0xDEADBEEF; + test_invoke_request_with_flags(config, flags_for_op, + GRPC_CALL_ERROR_INVALID_FLAGS); + } + } + { + /* check valid operation with allowed flags for GRPC_OP_SEND_BUFFER */ + gpr_uint32 flags[] = {GRPC_WRITE_BUFFER_HINT, GRPC_WRITE_NO_COMPRESS, + GRPC_WRITE_INTERNAL_COMPRESS}; + for (i = 0; i < GPR_ARRAY_SIZE(flags); ++i) { + memset(flags_for_op, 0, sizeof(flags_for_op)); + flags_for_op[GRPC_OP_SEND_MESSAGE] = flags[i]; + test_invoke_request_with_flags(config, flags_for_op, GRPC_CALL_OK); + } + } +} diff --git a/test/core/end2end/tests/request_with_large_metadata.c b/test/core/end2end/tests/request_with_large_metadata.c index 7e3cc4c084..e27c400b54 100644 --- a/test/core/end2end/tests/request_with_large_metadata.c +++ b/test/core/end2end/tests/request_with_large_metadata.c @@ -101,7 +101,7 @@ static void test_request_with_large_metadata(grpc_end2end_test_config config) { grpc_call *s; gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world"); grpc_byte_buffer *request_payload = - grpc_byte_buffer_create(&request_payload_slice, 1); + grpc_raw_byte_buffer_create(&request_payload_slice, 1); gpr_timespec deadline = five_seconds_time(); grpc_metadata meta; grpc_end2end_test_fixture f = begin_test(config, "test_request_with_large_metadata", NULL, NULL); @@ -138,20 +138,25 @@ static void test_request_with_large_metadata(grpc_end2end_test_config config) { op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 1; op->data.send_initial_metadata.metadata = &meta; + op->flags = 0; op++; op->op = GRPC_OP_SEND_MESSAGE; op->data.send_message = request_payload; + op->flags = 0; op++; op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; op++; op->op = GRPC_OP_RECV_INITIAL_METADATA; op->data.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; op++; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; op->data.recv_status_on_client.status = &status; op->data.recv_status_on_client.status_details = &details; op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); @@ -165,9 +170,11 @@ static void test_request_with_large_metadata(grpc_end2end_test_config config) { op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_RECV_MESSAGE; op->data.recv_message = &request_payload_recv; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102))); @@ -177,11 +184,13 @@ static void test_request_with_large_metadata(grpc_end2end_test_config config) { op = ops; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; op++; op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; op->data.send_status_from_server.trailing_metadata_count = 0; op->data.send_status_from_server.status = GRPC_STATUS_OK; op->data.send_status_from_server.status_details = "xyz"; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(103))); diff --git a/test/core/end2end/tests/request_with_payload.c b/test/core/end2end/tests/request_with_payload.c index 55bc60e792..708855aa9d 100644 --- a/test/core/end2end/tests/request_with_payload.c +++ b/test/core/end2end/tests/request_with_payload.c @@ -101,7 +101,7 @@ static void test_invoke_request_with_payload(grpc_end2end_test_config config) { grpc_call *s; gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world"); grpc_byte_buffer *request_payload = - grpc_byte_buffer_create(&request_payload_slice, 1); + grpc_raw_byte_buffer_create(&request_payload_slice, 1); gpr_timespec deadline = five_seconds_time(); grpc_end2end_test_fixture f = begin_test(config, "test_invoke_request_with_payload", NULL, NULL); cq_verifier *cqv = cq_verifier_create(f.cq); @@ -129,20 +129,25 @@ static void test_invoke_request_with_payload(grpc_end2end_test_config config) { op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_SEND_MESSAGE; op->data.send_message = request_payload; + op->flags = 0; op++; op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; op++; op->op = GRPC_OP_RECV_INITIAL_METADATA; op->data.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; op++; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; op->data.recv_status_on_client.status = &status; op->data.recv_status_on_client.status_details = &details; op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); @@ -156,9 +161,11 @@ static void test_invoke_request_with_payload(grpc_end2end_test_config config) { op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_RECV_MESSAGE; op->data.recv_message = &request_payload_recv; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102))); @@ -168,11 +175,13 @@ static void test_invoke_request_with_payload(grpc_end2end_test_config config) { op = ops; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; op++; op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; op->data.send_status_from_server.trailing_metadata_count = 0; op->data.send_status_from_server.status = GRPC_STATUS_OK; op->data.send_status_from_server.status_details = "xyz"; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(103))); diff --git a/test/core/end2end/tests/server_finishes_request.c b/test/core/end2end/tests/server_finishes_request.c index fe8657a2cc..ca03c1d777 100644 --- a/test/core/end2end/tests/server_finishes_request.c +++ b/test/core/end2end/tests/server_finishes_request.c @@ -125,15 +125,18 @@ static void simple_request_body(grpc_end2end_test_fixture f) { op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_RECV_INITIAL_METADATA; op->data.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; op++; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; op->data.recv_status_on_client.status = &status; op->data.recv_status_on_client.status_details = &details; op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); @@ -147,14 +150,17 @@ static void simple_request_body(grpc_end2end_test_fixture f) { op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; op->data.send_status_from_server.trailing_metadata_count = 0; op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED; op->data.send_status_from_server.status_details = "xyz"; + op->flags = 0; op++; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102))); @@ -166,7 +172,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) { GPR_ASSERT(0 == strcmp(details, "xyz")); GPR_ASSERT(0 == strcmp(call_details.method, "/foo")); GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr:1234")); - GPR_ASSERT(was_cancelled == 0); + GPR_ASSERT(was_cancelled == 1); gpr_free(details); grpc_metadata_array_destroy(&initial_metadata_recv); diff --git a/test/core/end2end/tests/simple_delayed_request.c b/test/core/end2end/tests/simple_delayed_request.c index f399d0ea9b..3eb3cd2b54 100644 --- a/test/core/end2end/tests/simple_delayed_request.c +++ b/test/core/end2end/tests/simple_delayed_request.c @@ -117,17 +117,21 @@ static void simple_delayed_request_body(grpc_end2end_test_config config, op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; op++; op->op = GRPC_OP_RECV_INITIAL_METADATA; op->data.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; op++; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; op->data.recv_status_on_client.status = &status; op->data.recv_status_on_client.status_details = &details; op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); @@ -143,14 +147,17 @@ static void simple_delayed_request_body(grpc_end2end_test_config config, op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; op->data.send_status_from_server.trailing_metadata_count = 0; op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED; op->data.send_status_from_server.status_details = "xyz"; + op->flags = 0; op++; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102))); @@ -162,7 +169,7 @@ static void simple_delayed_request_body(grpc_end2end_test_config config, GPR_ASSERT(0 == strcmp(details, "xyz")); GPR_ASSERT(0 == strcmp(call_details.method, "/foo")); GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr")); - GPR_ASSERT(was_cancelled == 0); + GPR_ASSERT(was_cancelled == 1); gpr_free(details); grpc_metadata_array_destroy(&initial_metadata_recv); diff --git a/test/core/end2end/tests/simple_request.c b/test/core/end2end/tests/simple_request.c index 2246ad540c..ca938f0827 100644 --- a/test/core/end2end/tests/simple_request.c +++ b/test/core/end2end/tests/simple_request.c @@ -125,17 +125,21 @@ static void simple_request_body(grpc_end2end_test_fixture f) { op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; op++; op->op = GRPC_OP_RECV_INITIAL_METADATA; op->data.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; op++; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; op->data.recv_status_on_client.status = &status; op->data.recv_status_on_client.status_details = &details; op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); @@ -149,14 +153,17 @@ static void simple_request_body(grpc_end2end_test_fixture f) { op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; op->data.send_status_from_server.trailing_metadata_count = 0; op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED; op->data.send_status_from_server.status_details = "xyz"; + op->flags = 0; op++; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102))); @@ -168,7 +175,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) { GPR_ASSERT(0 == strcmp(details, "xyz")); GPR_ASSERT(0 == strcmp(call_details.method, "/foo")); GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr:1234")); - GPR_ASSERT(was_cancelled == 0); + GPR_ASSERT(was_cancelled == 1); gpr_free(details); grpc_metadata_array_destroy(&initial_metadata_recv); diff --git a/test/core/end2end/tests/simple_request_with_high_initial_sequence_number.c b/test/core/end2end/tests/simple_request_with_high_initial_sequence_number.c index ff00a5aa6e..b94ad9f234 100644 --- a/test/core/end2end/tests/simple_request_with_high_initial_sequence_number.c +++ b/test/core/end2end/tests/simple_request_with_high_initial_sequence_number.c @@ -125,17 +125,21 @@ static void simple_request_body(grpc_end2end_test_fixture f) { op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; op++; op->op = GRPC_OP_RECV_INITIAL_METADATA; op->data.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; op++; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; op->data.recv_status_on_client.status = &status; op->data.recv_status_on_client.status_details = &details; op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); @@ -149,14 +153,17 @@ static void simple_request_body(grpc_end2end_test_fixture f) { op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; op->data.send_status_from_server.trailing_metadata_count = 0; op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED; op->data.send_status_from_server.status_details = "xyz"; + op->flags = 0; op++; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102))); @@ -168,7 +175,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) { GPR_ASSERT(0 == strcmp(details, "xyz")); GPR_ASSERT(0 == strcmp(call_details.method, "/foo")); GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr:1234")); - GPR_ASSERT(was_cancelled == 0); + GPR_ASSERT(was_cancelled == 1); gpr_free(details); grpc_metadata_array_destroy(&initial_metadata_recv); diff --git a/test/core/fling/client.c b/test/core/fling/client.c index 37d787c7c3..ee5e390c39 100644 --- a/test/core/fling/client.c +++ b/test/core/fling/client.c @@ -183,7 +183,7 @@ int main(int argc, char **argv) { channel = grpc_channel_create(target, NULL); cq = grpc_completion_queue_create(); - the_buffer = grpc_byte_buffer_create(&slice, payload_size); + the_buffer = grpc_raw_byte_buffer_create(&slice, payload_size); histogram = gpr_histogram_create(0.01, 60e9); sc.init(); diff --git a/test/core/network_benchmarks/low_level_ping_pong.c b/test/core/network_benchmarks/low_level_ping_pong.c index 7d74d0e078..78a0eef1a2 100644 --- a/test/core/network_benchmarks/low_level_ping_pong.c +++ b/test/core/network_benchmarks/low_level_ping_pong.c @@ -238,6 +238,7 @@ static int set_socket_nonblocking(thread_args *args) { static int do_nothing(thread_args *args) { return 0; } +#ifdef __linux__ /* Special case for epoll, where we need to create the fd ahead of time. */ static int epoll_setup(thread_args *args) { int epoll_fd; @@ -258,6 +259,7 @@ static int epoll_setup(thread_args *args) { } return 0; } +#endif static void server_thread(thread_args *args) { char *buf = malloc(args->msg_size); diff --git a/test/core/surface/byte_buffer_reader_test.c b/test/core/surface/byte_buffer_reader_test.c index c2f5fc2eb7..87a2cd7a43 100644 --- a/test/core/surface/byte_buffer_reader_test.c +++ b/test/core/surface/byte_buffer_reader_test.c @@ -42,6 +42,8 @@ #include <grpc/support/time.h> #include "test/core/util/test_config.h" +#include "src/core/compression/message_compress.h" + #include <string.h> #define LOG_TEST(x) gpr_log(GPR_INFO, "%s", x) @@ -55,7 +57,7 @@ static void test_read_one_slice(void) { LOG_TEST("test_read_one_slice"); slice = gpr_slice_from_copied_string("test"); - buffer = grpc_byte_buffer_create(&slice, 1); + buffer = grpc_raw_byte_buffer_create(&slice, 1); gpr_slice_unref(slice); grpc_byte_buffer_reader_init(&reader, buffer); first_code = grpc_byte_buffer_reader_next(&reader, &first_slice); @@ -77,7 +79,28 @@ static void test_read_one_slice_malloc(void) { LOG_TEST("test_read_one_slice_malloc"); slice = gpr_slice_malloc(4); memcpy(GPR_SLICE_START_PTR(slice), "test", 4); - buffer = grpc_byte_buffer_create(&slice, 1); + buffer = grpc_raw_byte_buffer_create(&slice, 1); + gpr_slice_unref(slice); + grpc_byte_buffer_reader_init(&reader, buffer); + first_code = grpc_byte_buffer_reader_next(&reader, &first_slice); + GPR_ASSERT(first_code != 0); + GPR_ASSERT(memcmp(GPR_SLICE_START_PTR(first_slice), "test", 4) == 0); + gpr_slice_unref(first_slice); + second_code = grpc_byte_buffer_reader_next(&reader, &second_slice); + GPR_ASSERT(second_code == 0); + grpc_byte_buffer_destroy(buffer); +} + +static void test_read_none_compressed_slice(void) { + gpr_slice slice; + grpc_byte_buffer *buffer; + grpc_byte_buffer_reader reader; + gpr_slice first_slice, second_slice; + int first_code, second_code; + + LOG_TEST("test_read_none_compressed_slice"); + slice = gpr_slice_from_copied_string("test"); + buffer = grpc_raw_byte_buffer_create(&slice, 1); gpr_slice_unref(slice); grpc_byte_buffer_reader_init(&reader, buffer); first_code = grpc_byte_buffer_reader_next(&reader, &first_slice); @@ -89,9 +112,61 @@ static void test_read_one_slice_malloc(void) { grpc_byte_buffer_destroy(buffer); } +static void read_compressed_slice(grpc_compression_algorithm algorithm, + int input_size) { + gpr_slice input_slice; + gpr_slice_buffer sliceb_in; + gpr_slice_buffer sliceb_out; + grpc_byte_buffer *buffer; + grpc_byte_buffer_reader reader; + gpr_slice read_slice; + int read_count = 0; + + gpr_slice_buffer_init(&sliceb_in); + gpr_slice_buffer_init(&sliceb_out); + + input_slice = gpr_slice_malloc(input_size); + memset(GPR_SLICE_START_PTR(input_slice), 'a', input_size); + gpr_slice_buffer_add(&sliceb_in, input_slice); /* takes ownership */ + GPR_ASSERT(grpc_msg_compress(algorithm, &sliceb_in, &sliceb_out)); + + buffer = grpc_raw_compressed_byte_buffer_create( + sliceb_out.slices, sliceb_out.count, algorithm); + grpc_byte_buffer_reader_init(&reader, buffer); + + while (grpc_byte_buffer_reader_next(&reader, &read_slice)) { + GPR_ASSERT(memcmp(GPR_SLICE_START_PTR(read_slice), + GPR_SLICE_START_PTR(input_slice) + read_count, + GPR_SLICE_LENGTH(read_slice)) == 0); + read_count += GPR_SLICE_LENGTH(read_slice); + gpr_slice_unref(read_slice); + } + GPR_ASSERT(read_count == input_size); + grpc_byte_buffer_reader_destroy(&reader); + grpc_byte_buffer_destroy(buffer); + gpr_slice_buffer_destroy(&sliceb_out); + gpr_slice_buffer_destroy(&sliceb_in); +} + +static void test_read_gzip_compressed_slice(void) { + const int INPUT_SIZE = 2048; + LOG_TEST("test_read_gzip_compressed_slice"); + read_compressed_slice(GRPC_COMPRESS_GZIP, INPUT_SIZE); +} + +static void test_read_deflate_compressed_slice(void) { + const int INPUT_SIZE = 2048; + LOG_TEST("test_read_deflate_compressed_slice"); + read_compressed_slice(GRPC_COMPRESS_DEFLATE, INPUT_SIZE); +} + int main(int argc, char **argv) { grpc_test_init(argc, argv); test_read_one_slice(); test_read_one_slice_malloc(); + test_read_none_compressed_slice(); + test_read_gzip_compressed_slice(); + test_read_deflate_compressed_slice(); + return 0; } diff --git a/test/core/surface/lame_client_test.c b/test/core/surface/lame_client_test.c index 34d37f0f3c..b2facd33b1 100644 --- a/test/core/surface/lame_client_test.c +++ b/test/core/surface/lame_client_test.c @@ -68,12 +68,14 @@ int main(int argc, char **argv) { op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + op->flags = 0; op++; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; op->data.recv_status_on_client.status = &status; op->data.recv_status_on_client.status_details = &details; op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = 0; op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, ops, op - ops, tag(1))); diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 507fc7a710..117d8bb9fa 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -167,7 +167,7 @@ class AsyncEnd2endTest : public ::testing::Test { Verifier().Expect(4, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); - EXPECT_TRUE(recv_status.IsOk()); + EXPECT_TRUE(recv_status.ok()); } } @@ -227,7 +227,7 @@ TEST_F(AsyncEnd2endTest, AsyncNextRpc) { Verifier().Expect(4, true).Verify(cq_.get(), std::chrono::system_clock::time_point::max()); EXPECT_EQ(send_response.message(), recv_response.message()); - EXPECT_TRUE(recv_status.IsOk()); + EXPECT_TRUE(recv_status.ok()); } // Two pings and a final pong. @@ -280,7 +280,7 @@ TEST_F(AsyncEnd2endTest, SimpleClientStreaming) { Verifier().Expect(10, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); - EXPECT_TRUE(recv_status.IsOk()); + EXPECT_TRUE(recv_status.ok()); } // One ping, two pongs. @@ -330,7 +330,7 @@ TEST_F(AsyncEnd2endTest, SimpleServerStreaming) { cli_stream->Finish(&recv_status, tag(9)); Verifier().Expect(9, true).Verify(cq_.get()); - EXPECT_TRUE(recv_status.IsOk()); + EXPECT_TRUE(recv_status.ok()); } // One ping, one pong. @@ -382,7 +382,7 @@ TEST_F(AsyncEnd2endTest, SimpleBidiStreaming) { cli_stream->Finish(&recv_status, tag(10)); Verifier().Expect(10, true).Verify(cq_.get()); - EXPECT_TRUE(recv_status.IsOk()); + EXPECT_TRUE(recv_status.ok()); } // Metadata tests @@ -426,7 +426,7 @@ TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) { Verifier().Expect(4, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); - EXPECT_TRUE(recv_status.IsOk()); + EXPECT_TRUE(recv_status.ok()); } TEST_F(AsyncEnd2endTest, ServerInitialMetadataRpc) { @@ -473,7 +473,7 @@ TEST_F(AsyncEnd2endTest, ServerInitialMetadataRpc) { Verifier().Expect(6, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); - EXPECT_TRUE(recv_status.IsOk()); + EXPECT_TRUE(recv_status.ok()); } TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) { @@ -513,7 +513,7 @@ TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) { response_reader->Finish(&recv_response, &recv_status, tag(5)); Verifier().Expect(5, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); - EXPECT_TRUE(recv_status.IsOk()); + EXPECT_TRUE(recv_status.ok()); auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata(); EXPECT_EQ(meta1.second, server_trailing_metadata.find(meta1.first)->second); EXPECT_EQ(meta2.second, server_trailing_metadata.find(meta2.first)->second); @@ -586,7 +586,7 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) { response_reader->Finish(&recv_response, &recv_status, tag(6)); Verifier().Expect(6, true).Verify(cq_.get()); EXPECT_EQ(send_response.message(), recv_response.message()); - EXPECT_TRUE(recv_status.IsOk()); + EXPECT_TRUE(recv_status.ok()); auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata(); EXPECT_EQ(meta5.second, server_trailing_metadata.find(meta5.first)->second); EXPECT_EQ(meta6.second, server_trailing_metadata.find(meta6.first)->second); diff --git a/test/cpp/end2end/client_crash_test.cc b/test/cpp/end2end/client_crash_test.cc index 7b90fcaec1..7876e8dee3 100644 --- a/test/cpp/end2end/client_crash_test.cc +++ b/test/cpp/end2end/client_crash_test.cc @@ -118,7 +118,7 @@ TEST_F(CrashTest, KillBeforeWrite) { // But the read will definitely fail EXPECT_FALSE(stream->Read(&response)); - EXPECT_FALSE(stream->Finish().IsOk()); + EXPECT_FALSE(stream->Finish().ok()); } TEST_F(CrashTest, KillAfterWrite) { @@ -142,7 +142,7 @@ TEST_F(CrashTest, KillAfterWrite) { EXPECT_FALSE(stream->Read(&response)); - EXPECT_FALSE(stream->Finish().IsOk()); + EXPECT_FALSE(stream->Finish().ok()); } } // namespace diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index f48cf953a4..45ba8b0878 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -101,13 +101,13 @@ class TestServiceImpl : public ::grpc::cpp::test::util::TestService::Service { gpr_now(), gpr_time_from_micros(request->param().client_cancel_after_us()))); } - return Status::Cancelled; + return Status::CANCELLED; } else if (request->has_param() && request->param().server_cancel_after_us()) { gpr_sleep_until(gpr_time_add( gpr_now(), gpr_time_from_micros(request->param().server_cancel_after_us()))); - return Status::Cancelled; + return Status::CANCELLED; } else { EXPECT_FALSE(context->IsCancelled()); } @@ -232,7 +232,7 @@ static void SendRpc(grpc::cpp::test::util::TestService::Stub* stub, ClientContext context; Status s = stub->Echo(&context, request, &response); EXPECT_EQ(response.message(), request.message()); - EXPECT_TRUE(s.IsOk()); + EXPECT_TRUE(s.ok()); } } @@ -265,7 +265,7 @@ TEST_F(End2endTest, RpcDeadlineExpires) { std::chrono::system_clock::now() + std::chrono::microseconds(10); context.set_deadline(deadline); Status s = stub_->Echo(&context, request, &response); - EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, s.code()); + EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, s.error_code()); } // Set a long but finite deadline. @@ -281,7 +281,7 @@ TEST_F(End2endTest, RpcLongDeadline) { context.set_deadline(deadline); Status s = stub_->Echo(&context, request, &response); EXPECT_EQ(response.message(), request.message()); - EXPECT_TRUE(s.IsOk()); + EXPECT_TRUE(s.ok()); } // Ask server to echo back the deadline it sees. @@ -298,7 +298,7 @@ TEST_F(End2endTest, EchoDeadline) { context.set_deadline(deadline); Status s = stub_->Echo(&context, request, &response); EXPECT_EQ(response.message(), request.message()); - EXPECT_TRUE(s.IsOk()); + EXPECT_TRUE(s.ok()); gpr_timespec sent_deadline; Timepoint2Timespec(deadline, &sent_deadline); // Allow 1 second error. @@ -317,7 +317,7 @@ TEST_F(End2endTest, EchoDeadlineForNoDeadlineRpc) { ClientContext context; Status s = stub_->Echo(&context, request, &response); EXPECT_EQ(response.message(), request.message()); - EXPECT_TRUE(s.IsOk()); + EXPECT_TRUE(s.ok()); EXPECT_EQ(response.param().request_deadline(), gpr_inf_future.tv_sec); } @@ -329,9 +329,9 @@ TEST_F(End2endTest, UnimplementedRpc) { ClientContext context; Status s = stub_->Unimplemented(&context, request, &response); - EXPECT_FALSE(s.IsOk()); - EXPECT_EQ(s.code(), grpc::StatusCode::UNIMPLEMENTED); - EXPECT_EQ(s.details(), ""); + EXPECT_FALSE(s.ok()); + EXPECT_EQ(s.error_code(), grpc::StatusCode::UNIMPLEMENTED); + EXPECT_EQ(s.error_message(), ""); EXPECT_EQ(response.message(), ""); } @@ -347,7 +347,7 @@ TEST_F(End2endTest, RequestStreamOneRequest) { stream->WritesDone(); Status s = stream->Finish(); EXPECT_EQ(response.message(), request.message()); - EXPECT_TRUE(s.IsOk()); + EXPECT_TRUE(s.ok()); } TEST_F(End2endTest, RequestStreamTwoRequests) { @@ -363,7 +363,7 @@ TEST_F(End2endTest, RequestStreamTwoRequests) { stream->WritesDone(); Status s = stream->Finish(); EXPECT_EQ(response.message(), "hellohello"); - EXPECT_TRUE(s.IsOk()); + EXPECT_TRUE(s.ok()); } TEST_F(End2endTest, ResponseStream) { @@ -383,7 +383,7 @@ TEST_F(End2endTest, ResponseStream) { EXPECT_FALSE(stream->Read(&response)); Status s = stream->Finish(); - EXPECT_TRUE(s.IsOk()); + EXPECT_TRUE(s.ok()); } TEST_F(End2endTest, BidiStream) { @@ -414,7 +414,7 @@ TEST_F(End2endTest, BidiStream) { EXPECT_FALSE(stream->Read(&response)); Status s = stream->Finish(); - EXPECT_TRUE(s.IsOk()); + EXPECT_TRUE(s.ok()); } // Talk to the two services with the same name but different package names. @@ -433,7 +433,7 @@ TEST_F(End2endTest, DiffPackageServices) { ClientContext context; Status s = stub->Echo(&context, request, &response); EXPECT_EQ(response.message(), request.message()); - EXPECT_TRUE(s.IsOk()); + EXPECT_TRUE(s.ok()); std::unique_ptr<grpc::cpp::test::util::duplicate::TestService::Stub> dup_pkg_stub( @@ -441,7 +441,7 @@ TEST_F(End2endTest, DiffPackageServices) { ClientContext context2; s = dup_pkg_stub->Echo(&context2, request, &response); EXPECT_EQ("no package", response.message()); - EXPECT_TRUE(s.IsOk()); + EXPECT_TRUE(s.ok()); } // rpc and stream should fail on bad credentials. @@ -459,16 +459,16 @@ TEST_F(End2endTest, BadCredentials) { Status s = stub->Echo(&context, request, &response); EXPECT_EQ("", response.message()); - EXPECT_FALSE(s.IsOk()); - EXPECT_EQ(StatusCode::UNKNOWN, s.code()); - EXPECT_EQ("Rpc sent on a lame channel.", s.details()); + EXPECT_FALSE(s.ok()); + EXPECT_EQ(StatusCode::UNKNOWN, s.error_code()); + EXPECT_EQ("Rpc sent on a lame channel.", s.error_message()); ClientContext context2; auto stream = stub->BidiStream(&context2); s = stream->Finish(); - EXPECT_FALSE(s.IsOk()); - EXPECT_EQ(StatusCode::UNKNOWN, s.code()); - EXPECT_EQ("Rpc sent on a lame channel.", s.details()); + EXPECT_FALSE(s.ok()); + EXPECT_EQ(StatusCode::UNKNOWN, s.error_code()); + EXPECT_EQ("Rpc sent on a lame channel.", s.error_message()); } void CancelRpc(ClientContext* context, int delay_us, TestServiceImpl* service) { @@ -491,8 +491,8 @@ TEST_F(End2endTest, ClientCancelsRpc) { std::thread cancel_thread(CancelRpc, &context, kCancelDelayUs, &service_); Status s = stub_->Echo(&context, request, &response); cancel_thread.join(); - EXPECT_EQ(StatusCode::CANCELLED, s.code()); - EXPECT_EQ(s.details(), "Cancelled"); + EXPECT_EQ(StatusCode::CANCELLED, s.error_code()); + EXPECT_EQ(s.error_message(), "Cancelled"); } // Server cancels rpc after 1ms @@ -505,8 +505,8 @@ TEST_F(End2endTest, ServerCancelsRpc) { ClientContext context; Status s = stub_->Echo(&context, request, &response); - EXPECT_EQ(StatusCode::CANCELLED, s.code()); - EXPECT_TRUE(s.details().empty()); + EXPECT_EQ(StatusCode::CANCELLED, s.error_code()); + EXPECT_TRUE(s.error_message().empty()); } // Client cancels request stream after sending two messages @@ -524,7 +524,7 @@ TEST_F(End2endTest, ClientCancelsRequestStream) { context.TryCancel(); Status s = stream->Finish(); - EXPECT_EQ(grpc::StatusCode::CANCELLED, s.code()); + EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); EXPECT_EQ(response.message(), ""); } @@ -558,7 +558,7 @@ TEST_F(End2endTest, ClientCancelsResponseStream) { Status s = stream->Finish(); // The final status could be either of CANCELLED or OK depending on // who won the race. - EXPECT_GE(grpc::StatusCode::CANCELLED, s.code()); + EXPECT_GE(grpc::StatusCode::CANCELLED, s.error_code()); } // Client cancels bidi stream after sending some messages @@ -591,7 +591,7 @@ TEST_F(End2endTest, ClientCancelsBidi) { } Status s = stream->Finish(); - EXPECT_EQ(grpc::StatusCode::CANCELLED, s.code()); + EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); } TEST_F(End2endTest, RpcMaxMessageSize) { @@ -602,7 +602,7 @@ TEST_F(End2endTest, RpcMaxMessageSize) { ClientContext context; Status s = stub_->Echo(&context, request, &response); - EXPECT_FALSE(s.IsOk()); + EXPECT_FALSE(s.ok()); } bool MetadataContains(const std::multimap<grpc::string, grpc::string>& metadata, @@ -632,7 +632,7 @@ TEST_F(End2endTest, SetPerCallCredentials) { Status s = stub_->Echo(&context, request, &response); EXPECT_EQ(request.message(), response.message()); - EXPECT_TRUE(s.IsOk()); + EXPECT_TRUE(s.ok()); EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(), GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY, "fake_token")); @@ -652,8 +652,8 @@ TEST_F(End2endTest, InsecurePerCallCredentials) { request.mutable_param()->set_echo_metadata(true); Status s = stub_->Echo(&context, request, &response); - EXPECT_EQ(StatusCode::CANCELLED, s.code()); - EXPECT_EQ("Failed to set credentials to rpc.", s.details()); + EXPECT_EQ(StatusCode::CANCELLED, s.error_code()); + EXPECT_EQ("Failed to set credentials to rpc.", s.error_message()); } TEST_F(End2endTest, OverridePerCallCredentials) { @@ -684,7 +684,7 @@ TEST_F(End2endTest, OverridePerCallCredentials) { GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY, "fake_selector1")); EXPECT_EQ(request.message(), response.message()); - EXPECT_TRUE(s.IsOk()); + EXPECT_TRUE(s.ok()); } } // namespace testing diff --git a/test/cpp/end2end/generic_end2end_test.cc b/test/cpp/end2end/generic_end2end_test.cc index 5a26834df9..b9d47b32de 100644 --- a/test/cpp/end2end/generic_end2end_test.cc +++ b/test/cpp/end2end/generic_end2end_test.cc @@ -190,7 +190,7 @@ class GenericEnd2endTest : public ::testing::Test { client_ok(9); EXPECT_EQ(send_response.message(), recv_response.message()); - EXPECT_TRUE(recv_status.IsOk()); + EXPECT_TRUE(recv_status.ok()); } } @@ -273,7 +273,7 @@ TEST_F(GenericEnd2endTest, SimpleBidiStreaming) { client_ok(10); EXPECT_EQ(send_response.message(), recv_response.message()); - EXPECT_TRUE(recv_status.IsOk()); + EXPECT_TRUE(recv_status.ok()); } } // namespace diff --git a/test/cpp/end2end/mock_test.cc b/test/cpp/end2end/mock_test.cc index 0226da672c..2809ab8d3c 100644 --- a/test/cpp/end2end/mock_test.cc +++ b/test/cpp/end2end/mock_test.cc @@ -168,7 +168,7 @@ class FakeClient { request.set_message("hello world"); Status s = stub_->Echo(&context, request, &response); EXPECT_EQ(request.message(), response.message()); - EXPECT_TRUE(s.IsOk()); + EXPECT_TRUE(s.ok()); } void DoBidiStream() { @@ -199,7 +199,7 @@ class FakeClient { EXPECT_FALSE(stream->Read(&response)); Status s = stream->Finish(); - EXPECT_TRUE(s.IsOk()); + EXPECT_TRUE(s.ok()); } void ResetStub(TestService::StubInterface* stub) { stub_ = stub; } diff --git a/test/cpp/end2end/thread_stress_test.cc b/test/cpp/end2end/thread_stress_test.cc index 5ee29e40f4..0b43dfd106 100644 --- a/test/cpp/end2end/thread_stress_test.cc +++ b/test/cpp/end2end/thread_stress_test.cc @@ -99,13 +99,13 @@ class TestServiceImpl : public ::grpc::cpp::test::util::TestService::Service { gpr_now(), gpr_time_from_micros(request->param().client_cancel_after_us()))); } - return Status::Cancelled; + return Status::CANCELLED; } else if (request->has_param() && request->param().server_cancel_after_us()) { gpr_sleep_until(gpr_time_add( gpr_now(), gpr_time_from_micros(request->param().server_cancel_after_us()))); - return Status::Cancelled; + return Status::CANCELLED; } else { EXPECT_FALSE(context->IsCancelled()); } @@ -219,7 +219,7 @@ static void SendRpc(grpc::cpp::test::util::TestService::Stub* stub, ClientContext context; Status s = stub->Echo(&context, request, &response); EXPECT_EQ(response.message(), request.message()); - EXPECT_TRUE(s.IsOk()); + EXPECT_TRUE(s.ok()); } } diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc index e351059246..f08f90b139 100644 --- a/test/cpp/interop/interop_client.cc +++ b/test/cpp/interop/interop_client.cc @@ -65,11 +65,11 @@ InteropClient::InteropClient(std::shared_ptr<ChannelInterface> channel) : channel_(channel) {} void InteropClient::AssertOkOrPrintErrorStatus(const Status& s) { - if (s.IsOk()) { + if (s.ok()) { return; } - gpr_log(GPR_INFO, "Error status code: %d, message: %s", s.code(), - s.details().c_str()); + gpr_log(GPR_INFO, "Error status code: %d, message: %s", s.error_code(), + s.error_message().c_str()); GPR_ASSERT(0); } @@ -321,7 +321,7 @@ void InteropClient::DoCancelAfterBegin() { gpr_log(GPR_INFO, "Trying to cancel..."); context.TryCancel(); Status s = stream->Finish(); - GPR_ASSERT(s.code() == StatusCode::CANCELLED); + GPR_ASSERT(s.error_code() == StatusCode::CANCELLED); gpr_log(GPR_INFO, "Canceling streaming done."); } diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index dc3a9f2ac5..28cd32a197 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -35,6 +35,7 @@ #define TEST_QPS_CLIENT_H #include "test/cpp/qps/histogram.h" +#include "test/cpp/qps/interarrival.h" #include "test/cpp/qps/timer.h" #include "test/cpp/qps/qpstest.grpc.pb.h" @@ -42,11 +43,31 @@ #include <mutex> namespace grpc { + +#if defined(__APPLE__) +// Specialize Timepoint for high res clock as we need that +template <> +class TimePoint<std::chrono::high_resolution_clock::time_point> { + public: + TimePoint(const std::chrono::high_resolution_clock::time_point& time) { + TimepointHR2Timespec(time, &time_); + } + gpr_timespec raw_time() const { return time_; } + + private: + gpr_timespec time_; +}; +#endif + namespace testing { +typedef std::chrono::high_resolution_clock grpc_time_source; +typedef std::chrono::time_point<grpc_time_source> grpc_time; + class Client { public: - explicit Client(const ClientConfig& config) : timer_(new Timer) { + explicit Client(const ClientConfig& config) + : timer_(new Timer), interarrival_timer_() { for (int i = 0; i < config.client_channels(); i++) { channels_.push_back(ClientChannelInfo( config.server_targets(i % config.server_targets_size()), config)); @@ -81,6 +102,7 @@ class Client { protected: SimpleRequest request_; + bool closed_loop_; class ClientChannelInfo { public: @@ -106,6 +128,61 @@ class Client { virtual bool ThreadFunc(Histogram* histogram, size_t thread_idx) = 0; + void SetupLoadTest(const ClientConfig& config, size_t num_threads) { + // Set up the load distribution based on the number of threads + if (config.load_type() == CLOSED_LOOP) { + closed_loop_ = true; + } else { + closed_loop_ = false; + + std::unique_ptr<RandomDist> random_dist; + const auto& load = config.load_params(); + switch (config.load_type()) { + case POISSON: + random_dist.reset( + new ExpDist(load.poisson().offered_load() / num_threads)); + break; + case UNIFORM: + random_dist.reset( + new UniformDist(load.uniform().interarrival_lo() * num_threads, + load.uniform().interarrival_hi() * num_threads)); + break; + case DETERMINISTIC: + random_dist.reset( + new DetDist(num_threads / load.determ().offered_load())); + break; + case PARETO: + random_dist.reset( + new ParetoDist(load.pareto().interarrival_base() * num_threads, + load.pareto().alpha())); + break; + default: + GPR_ASSERT(false); + break; + } + + interarrival_timer_.init(*random_dist, num_threads); + for (size_t i = 0; i < num_threads; i++) { + next_time_.push_back( + grpc_time_source::now() + + std::chrono::duration_cast<grpc_time_source::duration>( + interarrival_timer_(i))); + } + } + } + + bool NextIssueTime(int thread_idx, grpc_time* time_delay) { + if (closed_loop_) { + return false; + } else { + *time_delay = next_time_[thread_idx]; + next_time_[thread_idx] += + std::chrono::duration_cast<grpc_time_source::duration>( + interarrival_timer_(thread_idx)); + return true; + } + } + private: class Thread { public: @@ -168,6 +245,9 @@ class Client { std::vector<std::unique_ptr<Thread>> threads_; std::unique_ptr<Timer> timer_; + + InterarrivalTimer interarrival_timer_; + std::vector<grpc_time> next_time_; }; std::unique_ptr<Client> CreateSynchronousUnaryClient(const ClientConfig& args); diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 00bbd8a8a0..1b7a8d26b2 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -32,8 +32,11 @@ */ #include <cassert> +#include <forward_list> #include <functional> +#include <list> #include <memory> +#include <mutex> #include <string> #include <thread> #include <vector> @@ -55,38 +58,55 @@ namespace grpc { namespace testing { +typedef std::list<grpc_time> deadline_list; + class ClientRpcContext { public: - ClientRpcContext() {} + explicit ClientRpcContext(int ch) : channel_id_(ch) {} virtual ~ClientRpcContext() {} // next state, return false if done. Collect stats when appropriate virtual bool RunNextState(bool, Histogram* hist) = 0; - virtual void StartNewClone() = 0; + virtual ClientRpcContext* StartNewClone() = 0; static void* tag(ClientRpcContext* c) { return reinterpret_cast<void*>(c); } static ClientRpcContext* detag(void* t) { return reinterpret_cast<ClientRpcContext*>(t); } + + deadline_list::iterator deadline_posn() const { return deadline_posn_; } + void set_deadline_posn(const deadline_list::iterator& it) { + deadline_posn_ = it; + } + virtual void Start(CompletionQueue* cq) = 0; + int channel_id() const { return channel_id_; } + + protected: + int channel_id_; + + private: + deadline_list::iterator deadline_posn_; }; template <class RequestType, class ResponseType> class ClientRpcContextUnaryImpl : public ClientRpcContext { public: ClientRpcContextUnaryImpl( - TestService::Stub* stub, const RequestType& req, + int channel_id, TestService::Stub* stub, const RequestType& req, std::function< std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( - TestService::Stub*, grpc::ClientContext*, const RequestType&)> - start_req, + TestService::Stub*, grpc::ClientContext*, const RequestType&, + CompletionQueue*)> start_req, std::function<void(grpc::Status, ResponseType*)> on_done) - : context_(), + : ClientRpcContext(channel_id), + context_(), stub_(stub), req_(req), response_(), next_state_(&ClientRpcContextUnaryImpl::RespDone), callback_(on_done), - start_req_(start_req), - start_(Timer::Now()), - response_reader_(start_req(stub_, &context_, req_)) { + start_req_(start_req) {} + void Start(CompletionQueue* cq) GRPC_OVERRIDE { + start_ = Timer::Now(); + response_reader_ = start_req_(stub_, &context_, req_, cq); response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this)); } ~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {} @@ -98,8 +118,9 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { return ret; } - void StartNewClone() GRPC_OVERRIDE { - new ClientRpcContextUnaryImpl(stub_, req_, start_req_, callback_); + ClientRpcContext* StartNewClone() GRPC_OVERRIDE { + return new ClientRpcContextUnaryImpl(channel_id_, stub_, req_, start_req_, + callback_); } private: @@ -109,7 +130,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { } bool DoCallBack(bool) { callback_(status_, &response_); - return false; + return true; // we're done, this'll be ignored } grpc::ClientContext context_; TestService::Stub* stub_; @@ -118,29 +139,54 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { bool (ClientRpcContextUnaryImpl::*next_state_)(bool); std::function<void(grpc::Status, ResponseType*)> callback_; std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( - TestService::Stub*, grpc::ClientContext*, const RequestType&)> start_req_; + TestService::Stub*, grpc::ClientContext*, const RequestType&, + CompletionQueue*)> start_req_; grpc::Status status_; double start_; std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>> response_reader_; }; +typedef std::forward_list<ClientRpcContext*> context_list; + class AsyncClient : public Client { public: - explicit AsyncClient(const ClientConfig& config, - std::function<void(CompletionQueue*, TestService::Stub*, - const SimpleRequest&)> setup_ctx) - : Client(config) { + explicit AsyncClient( + const ClientConfig& config, + std::function<ClientRpcContext*(int, TestService::Stub*, + const SimpleRequest&)> setup_ctx) + : Client(config), + channel_lock_(config.client_channels()), + contexts_(config.client_channels()), + max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()), + channel_count_(config.client_channels()), + pref_channel_inc_(config.async_client_threads()) { + SetupLoadTest(config, config.async_client_threads()); + for (int i = 0; i < config.async_client_threads(); i++) { cli_cqs_.emplace_back(new CompletionQueue); + if (!closed_loop_) { + rpc_deadlines_.emplace_back(); + next_channel_.push_back(i % channel_count_); + issue_allowed_.push_back(true); + + grpc_time next_issue; + NextIssueTime(i, &next_issue); + next_issue_.push_back(next_issue); + } } + int t = 0; for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) { - for (auto channel = channels_.begin(); channel != channels_.end(); - channel++) { + for (int ch = 0; ch < channel_count_; ch++) { auto* cq = cli_cqs_[t].get(); t = (t + 1) % cli_cqs_.size(); - setup_ctx(cq, channel->get_stub(), request_); + auto ctx = setup_ctx(ch, channels_[ch].get_stub(), request_); + if (closed_loop_) { + ctx->Start(cq); + } else { + contexts_[ch].push_front(ctx); + } } } } @@ -159,30 +205,126 @@ class AsyncClient : public Client { size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL { void* got_tag; bool ok; - switch (cli_cqs_[thread_idx]->AsyncNext( - &got_tag, &ok, - std::chrono::system_clock::now() + std::chrono::seconds(1))) { + grpc_time deadline, short_deadline; + if (closed_loop_) { + deadline = grpc_time_source::now() + std::chrono::seconds(1); + short_deadline = deadline; + } else { + if (rpc_deadlines_[thread_idx].empty()) { + deadline = grpc_time_source::now() + std::chrono::seconds(1); + } else { + deadline = *(rpc_deadlines_[thread_idx].begin()); + } + short_deadline = + issue_allowed_[thread_idx] ? next_issue_[thread_idx] : deadline; + } + + bool got_event; + + switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok, short_deadline)) { case CompletionQueue::SHUTDOWN: return false; case CompletionQueue::TIMEOUT: - return true; + got_event = false; + break; case CompletionQueue::GOT_EVENT: + got_event = true; + break; + default: + GPR_ASSERT(false); break; } - - ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); - if (ctx->RunNextState(ok, histogram) == false) { - // call the callback and then delete it - ctx->RunNextState(ok, histogram); - ctx->StartNewClone(); - delete ctx; + if ((closed_loop_ || !rpc_deadlines_[thread_idx].empty()) && + grpc_time_source::now() > deadline) { + // we have missed some 1-second deadline, which is worth noting + gpr_log(GPR_INFO, "Missed an RPC deadline"); + // Don't give up, as there might be some truly heavy tails + } + if (got_event) { + ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); + if (ctx->RunNextState(ok, histogram) == false) { + // call the callback and then clone the ctx + ctx->RunNextState(ok, histogram); + ClientRpcContext* clone_ctx = ctx->StartNewClone(); + if (closed_loop_) { + clone_ctx->Start(cli_cqs_[thread_idx].get()); + } else { + // Remove the entry from the rpc deadlines list + rpc_deadlines_[thread_idx].erase(ctx->deadline_posn()); + // Put the clone_ctx in the list of idle contexts for this channel + // Under lock + int ch = clone_ctx->channel_id(); + std::lock_guard<std::mutex> g(channel_lock_[ch]); + contexts_[ch].push_front(clone_ctx); + } + // delete the old version + delete ctx; + } + if (!closed_loop_) + issue_allowed_[thread_idx] = + true; // may be ok now even if it hadn't been + } + if (!closed_loop_ && issue_allowed_[thread_idx] && + grpc_time_source::now() >= next_issue_[thread_idx]) { + // Attempt to issue + bool issued = false; + for (int num_attempts = 0, channel_attempt = next_channel_[thread_idx]; + num_attempts < channel_count_ && !issued; num_attempts++) { + bool can_issue = false; + ClientRpcContext* ctx = nullptr; + { + std::lock_guard<std::mutex> g(channel_lock_[channel_attempt]); + if (!contexts_[channel_attempt].empty()) { + // Get an idle context from the front of the list + ctx = *(contexts_[channel_attempt].begin()); + contexts_[channel_attempt].pop_front(); + can_issue = true; + } + } + if (can_issue) { + // do the work to issue + rpc_deadlines_[thread_idx].emplace_back(grpc_time_source::now() + + std::chrono::seconds(1)); + auto it = rpc_deadlines_[thread_idx].end(); + --it; + ctx->set_deadline_posn(it); + ctx->Start(cli_cqs_[thread_idx].get()); + issued = true; + // If we did issue, then next time, try our thread's next + // preferred channel + next_channel_[thread_idx] += pref_channel_inc_; + if (next_channel_[thread_idx] >= channel_count_) + next_channel_[thread_idx] = (thread_idx % channel_count_); + } else { + // Do a modular increment of channel attempt if we couldn't issue + channel_attempt = (channel_attempt + 1) % channel_count_; + } + } + if (issued) { + // We issued one; see when we can issue the next + grpc_time next_issue; + NextIssueTime(thread_idx, &next_issue); + next_issue_[thread_idx] = next_issue; + } else { + issue_allowed_[thread_idx] = false; + } } - return true; } private: std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_; + + std::vector<deadline_list> rpc_deadlines_; // per thread deadlines + std::vector<int> next_channel_; // per thread round-robin channel ctr + std::vector<bool> issue_allowed_; // may this thread attempt to issue + std::vector<grpc_time> next_issue_; // when should it issue? + + std::vector<std::mutex> channel_lock_; + std::vector<context_list> contexts_; // per-channel list of idle contexts + int max_outstanding_per_channel_; + int channel_count_; + int pref_channel_inc_; }; class AsyncUnaryClient GRPC_FINAL : public AsyncClient { @@ -194,15 +336,15 @@ class AsyncUnaryClient GRPC_FINAL : public AsyncClient { ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); } private: - static void SetupCtx(CompletionQueue* cq, TestService::Stub* stub, - const SimpleRequest& req) { + static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub, + const SimpleRequest& req) { auto check_done = [](grpc::Status s, SimpleResponse* response) {}; - auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx, - const SimpleRequest& request) { + auto start_req = [](TestService::Stub* stub, grpc::ClientContext* ctx, + const SimpleRequest& request, CompletionQueue* cq) { return stub->AsyncUnaryCall(ctx, request, cq); }; - new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( - stub, req, start_req, check_done); + return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( + channel_id, stub, req, start_req, check_done); } }; @@ -210,26 +352,30 @@ template <class RequestType, class ResponseType> class ClientRpcContextStreamingImpl : public ClientRpcContext { public: ClientRpcContextStreamingImpl( - TestService::Stub* stub, const RequestType& req, - std::function<std::unique_ptr< - grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>( - TestService::Stub*, grpc::ClientContext*, void*)> start_req, + int channel_id, TestService::Stub* stub, const RequestType& req, + std::function<std::unique_ptr<grpc::ClientAsyncReaderWriter< + RequestType, ResponseType>>(TestService::Stub*, grpc::ClientContext*, + CompletionQueue*, void*)> start_req, std::function<void(grpc::Status, ResponseType*)> on_done) - : context_(), + : ClientRpcContext(channel_id), + context_(), stub_(stub), req_(req), response_(), next_state_(&ClientRpcContextStreamingImpl::ReqSent), callback_(on_done), start_req_(start_req), - start_(Timer::Now()), - stream_(start_req_(stub_, &context_, ClientRpcContext::tag(this))) {} + start_(Timer::Now()) {} ~ClientRpcContextStreamingImpl() GRPC_OVERRIDE {} bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { return (this->*next_state_)(ok, hist); } - void StartNewClone() GRPC_OVERRIDE { - new ClientRpcContextStreamingImpl(stub_, req_, start_req_, callback_); + ClientRpcContext* StartNewClone() GRPC_OVERRIDE { + return new ClientRpcContextStreamingImpl(channel_id_, stub_, req_, + start_req_, callback_); + } + void Start(CompletionQueue* cq) GRPC_OVERRIDE { + stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this)); } private: @@ -263,7 +409,8 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { std::function<void(grpc::Status, ResponseType*)> callback_; std::function< std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>( - TestService::Stub*, grpc::ClientContext*, void*)> start_req_; + TestService::Stub*, grpc::ClientContext*, CompletionQueue*, void*)> + start_req_; grpc::Status status_; double start_; std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>> @@ -274,22 +421,25 @@ class AsyncStreamingClient GRPC_FINAL : public AsyncClient { public: explicit AsyncStreamingClient(const ClientConfig& config) : AsyncClient(config, SetupCtx) { + // async streaming currently only supported closed loop + GPR_ASSERT(config.load_type() == CLOSED_LOOP); + StartThreads(config.async_client_threads()); } ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); } private: - static void SetupCtx(CompletionQueue* cq, TestService::Stub* stub, - const SimpleRequest& req) { + static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub, + const SimpleRequest& req) { auto check_done = [](grpc::Status s, SimpleResponse* response) {}; - auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx, - void* tag) { + auto start_req = [](TestService::Stub* stub, grpc::ClientContext* ctx, + CompletionQueue* cq, void* tag) { auto stream = stub->AsyncStreamingCall(ctx, cq, tag); return stream; }; - new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>( - stub, req, start_req, check_done); + return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>( + channel_id, stub, req, start_req, check_done); } }; diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index c28dc91321..718698bfe1 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -32,6 +32,7 @@ */ #include <cassert> +#include <chrono> #include <memory> #include <mutex> #include <string> @@ -57,6 +58,7 @@ #include "test/cpp/qps/client.h" #include "test/cpp/qps/qpstest.grpc.pb.h" #include "test/cpp/qps/histogram.h" +#include "test/cpp/qps/interarrival.h" #include "test/cpp/qps/timer.h" namespace grpc { @@ -68,11 +70,19 @@ class SynchronousClient : public Client { num_threads_ = config.outstanding_rpcs_per_channel() * config.client_channels(); responses_.resize(num_threads_); + SetupLoadTest(config, num_threads_); } virtual ~SynchronousClient(){}; protected: + void WaitToIssue(int thread_idx) { + grpc_time next_time; + if (NextIssueTime(thread_idx, &next_time)) { + std::this_thread::sleep_until(next_time); + } + } + size_t num_threads_; std::vector<SimpleResponse> responses_; }; @@ -86,13 +96,14 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { ~SynchronousUnaryClient() { EndThreads(); } bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { + WaitToIssue(thread_idx); auto* stub = channels_[thread_idx % channels_.size()].get_stub(); double start = Timer::Now(); grpc::ClientContext context; grpc::Status s = stub->UnaryCall(&context, request_, &responses_[thread_idx]); histogram->Add((Timer::Now() - start) * 1e9); - return s.IsOk(); + return s.ok(); } }; @@ -113,12 +124,13 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { for (auto stream = stream_.begin(); stream != stream_.end(); stream++) { if (*stream) { (*stream)->WritesDone(); - EXPECT_TRUE((*stream)->Finish().IsOk()); + EXPECT_TRUE((*stream)->Finish().ok()); } } } bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { + WaitToIssue(thread_idx); double start = Timer::Now(); if (stream_[thread_idx]->Write(request_) && stream_[thread_idx]->Read(&responses_[thread_idx])) { diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index bf12730f97..c8cc11e6ab 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -241,11 +241,11 @@ std::unique_ptr<ScenarioResult> RunScenario( for (auto client = clients.begin(); client != clients.end(); client++) { GPR_ASSERT(client->stream->WritesDone()); - GPR_ASSERT(client->stream->Finish().IsOk()); + GPR_ASSERT(client->stream->Finish().ok()); } for (auto server = servers.begin(); server != servers.end(); server++) { GPR_ASSERT(server->stream->WritesDone()); - GPR_ASSERT(server->stream->Finish().IsOk()); + GPR_ASSERT(server->stream->Finish().ok()); } return result; } diff --git a/test/cpp/qps/interarrival.h b/test/cpp/qps/interarrival.h new file mode 100644 index 0000000000..f90a17a894 --- /dev/null +++ b/test/cpp/qps/interarrival.h @@ -0,0 +1,178 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef TEST_QPS_INTERARRIVAL_H +#define TEST_QPS_INTERARRIVAL_H + +#include <chrono> +#include <cmath> +#include <random> + +#include <grpc++/config.h> + +namespace grpc { +namespace testing { + +// First create classes that define a random distribution +// Note that this code does not include C++-specific random distribution +// features supported in std::random. Although this would make this code easier, +// this code is required to serve as the template code for other language +// stacks. Thus, this code only uses a uniform distribution of doubles [0,1) +// and then provides the distribution functions itself. + +class RandomDist { + public: + RandomDist() {} + virtual ~RandomDist() = 0; + // Argument to operator() is a uniform double in the range [0,1) + virtual double operator()(double uni) const = 0; +}; + +inline RandomDist::~RandomDist() {} + +// ExpDist implements an exponential distribution, which is the +// interarrival distribution for a Poisson process. The parameter +// lambda is the mean rate of arrivals. This is the +// most useful distribution since it is actually additive and +// memoryless. It is a good representation of activity coming in from +// independent identical stationary sources. For more information, +// see http://en.wikipedia.org/wiki/Exponential_distribution + +class ExpDist GRPC_FINAL : public RandomDist { + public: + explicit ExpDist(double lambda) : lambda_recip_(1.0 / lambda) {} + ~ExpDist() GRPC_OVERRIDE {} + double operator()(double uni) const GRPC_OVERRIDE { + // Note: Use 1.0-uni above to avoid NaN if uni is 0 + return lambda_recip_ * (-log(1.0 - uni)); + } + + private: + double lambda_recip_; +}; + +// UniformDist implements a random distribution that has +// interarrival time uniformly spread between [lo,hi). The +// mean interarrival time is (lo+hi)/2. For more information, +// see http://en.wikipedia.org/wiki/Uniform_distribution_%28continuous%29 + +class UniformDist GRPC_FINAL : public RandomDist { + public: + UniformDist(double lo, double hi) : lo_(lo), range_(hi - lo) {} + ~UniformDist() GRPC_OVERRIDE {} + double operator()(double uni) const GRPC_OVERRIDE { + return uni * range_ + lo_; + } + + private: + double lo_; + double range_; +}; + +// DetDist provides a random distribution with interarrival time +// of val. Note that this is not additive, so using this on multiple +// flows of control (threads within the same client or separate +// clients) will not preserve any deterministic interarrival gap across +// requests. + +class DetDist GRPC_FINAL : public RandomDist { + public: + explicit DetDist(double val) : val_(val) {} + ~DetDist() GRPC_OVERRIDE {} + double operator()(double uni) const GRPC_OVERRIDE { return val_; } + + private: + double val_; +}; + +// ParetoDist provides a random distribution with interarrival time +// spread according to a Pareto (heavy-tailed) distribution. In this +// model, many interarrival times are close to the base, but a sufficient +// number will be high (up to infinity) as to disturb the mean. It is a +// good representation of the response times of data center jobs. See +// http://en.wikipedia.org/wiki/Pareto_distribution + +class ParetoDist GRPC_FINAL : public RandomDist { + public: + ParetoDist(double base, double alpha) + : base_(base), alpha_recip_(1.0 / alpha) {} + ~ParetoDist() GRPC_OVERRIDE {} + double operator()(double uni) const GRPC_OVERRIDE { + // Note: Use 1.0-uni above to avoid div by zero if uni is 0 + return base_ / pow(1.0 - uni, alpha_recip_); + } + + private: + double base_; + double alpha_recip_; +}; + +// A class library for generating pseudo-random interarrival times +// in an efficient re-entrant way. The random table is built at construction +// time, and each call must include the thread id of the invoker + +typedef std::default_random_engine qps_random_engine; + +class InterarrivalTimer { + public: + InterarrivalTimer() {} + void init(const RandomDist& r, int threads, int entries = 1000000) { + qps_random_engine gen; + std::uniform_real_distribution<double> uniform(0.0, 1.0); + for (int i = 0; i < entries; i++) { + random_table_.push_back(std::chrono::nanoseconds( + static_cast<int64_t>(1e9 * r(uniform(gen))))); + } + // Now set up the thread positions + for (int i = 0; i < threads; i++) { + thread_posns_.push_back(random_table_.begin() + (entries * i) / threads); + } + } + virtual ~InterarrivalTimer(){}; + + std::chrono::nanoseconds operator()(int thread_num) { + auto ret = *(thread_posns_[thread_num]++); + if (thread_posns_[thread_num] == random_table_.end()) + thread_posns_[thread_num] = random_table_.begin(); + return ret; + } + + private: + typedef std::vector<std::chrono::nanoseconds> time_table; + std::vector<time_table::const_iterator> thread_posns_; + time_table random_table_; +}; +} +} + +#endif diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc index 281e2e8119..d534846365 100644 --- a/test/cpp/qps/qps_driver.cc +++ b/test/cpp/qps/qps_driver.cc @@ -63,11 +63,15 @@ DEFINE_int32(client_channels, 1, "Number of client channels"); DEFINE_int32(payload_size, 1, "Payload size"); DEFINE_string(client_type, "SYNCHRONOUS_CLIENT", "Client type"); DEFINE_int32(async_client_threads, 1, "Async client threads"); +DEFINE_string(load_type, "CLOSED_LOOP", "Load type"); +DEFINE_double(load_param_1, 0.0, "Load parameter 1"); +DEFINE_double(load_param_2, 0.0, "Load parameter 2"); using grpc::testing::ClientConfig; using grpc::testing::ServerConfig; using grpc::testing::ClientType; using grpc::testing::ServerType; +using grpc::testing::LoadType; using grpc::testing::RpcType; using grpc::testing::ResourceUsage; @@ -80,11 +84,14 @@ static void QpsDriver() { ClientType client_type; ServerType server_type; + LoadType load_type; GPR_ASSERT(ClientType_Parse(FLAGS_client_type, &client_type)); GPR_ASSERT(ServerType_Parse(FLAGS_server_type, &server_type)); + GPR_ASSERT(LoadType_Parse(FLAGS_load_type, &load_type)); ClientConfig client_config; client_config.set_client_type(client_type); + client_config.set_load_type(load_type); client_config.set_enable_ssl(FLAGS_enable_ssl); client_config.set_outstanding_rpcs_per_channel( FLAGS_outstanding_rpcs_per_channel); @@ -93,6 +100,43 @@ static void QpsDriver() { client_config.set_async_client_threads(FLAGS_async_client_threads); client_config.set_rpc_type(rpc_type); + // set up the load parameters + switch (load_type) { + case grpc::testing::CLOSED_LOOP: + break; + case grpc::testing::POISSON: { + auto poisson = client_config.mutable_load_params()->mutable_poisson(); + GPR_ASSERT(FLAGS_load_param_1 != 0.0); + poisson->set_offered_load(FLAGS_load_param_1); + break; + } + case grpc::testing::UNIFORM: { + auto uniform = client_config.mutable_load_params()->mutable_uniform(); + GPR_ASSERT(FLAGS_load_param_1 != 0.0); + GPR_ASSERT(FLAGS_load_param_2 != 0.0); + uniform->set_interarrival_lo(FLAGS_load_param_1 / 1e6); + uniform->set_interarrival_hi(FLAGS_load_param_2 / 1e6); + break; + } + case grpc::testing::DETERMINISTIC: { + auto determ = client_config.mutable_load_params()->mutable_determ(); + GPR_ASSERT(FLAGS_load_param_1 != 0.0); + determ->set_offered_load(FLAGS_load_param_1); + break; + } + case grpc::testing::PARETO: { + auto pareto = client_config.mutable_load_params()->mutable_pareto(); + GPR_ASSERT(FLAGS_load_param_1 != 0.0); + GPR_ASSERT(FLAGS_load_param_2 != 0.0); + pareto->set_interarrival_base(FLAGS_load_param_1 / 1e6); + pareto->set_alpha(FLAGS_load_param_2); + break; + } + default: + GPR_ASSERT(false); + break; + } + ServerConfig server_config; server_config.set_server_type(server_type); server_config.set_threads(FLAGS_server_threads); @@ -112,7 +156,7 @@ static void QpsDriver() { FLAGS_warmup_seconds, FLAGS_benchmark_seconds, FLAGS_local_workers); GetReporter()->ReportQPS(*result); - GetReporter()->ReportQPSPerCore(*result, server_config); + GetReporter()->ReportQPSPerCore(*result); GetReporter()->ReportLatency(*result); GetReporter()->ReportTimes(*result); } diff --git a/test/cpp/qps/qps_interarrival_test.cc b/test/cpp/qps/qps_interarrival_test.cc new file mode 100644 index 0000000000..cecd1be03f --- /dev/null +++ b/test/cpp/qps/qps_interarrival_test.cc @@ -0,0 +1,76 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "test/cpp/qps/interarrival.h" +#include <chrono> +#include <iostream> + +// Use the C histogram rather than C++ to avoid depending on proto +#include <grpc/support/histogram.h> +#include <grpc++/config.h> + +using grpc::testing::RandomDist; +using grpc::testing::InterarrivalTimer; + +void RunTest(RandomDist&& r, int threads, std::string title) { + InterarrivalTimer timer; + timer.init(r, threads); + gpr_histogram *h(gpr_histogram_create(0.01, 60e9)); + + for (int i = 0; i < 10000000; i++) { + for (int j = 0; j < threads; j++) { + gpr_histogram_add(h, timer(j).count()); + } + } + + std::cout << title << " Distribution" << std::endl; + std::cout << "Value, Percentile" << std::endl; + for (double pct = 0.0; pct < 100.0; pct += 1.0) { + std::cout << gpr_histogram_percentile(h, pct) << "," << pct << std::endl; + } + + gpr_histogram_destroy(h); +} + +using grpc::testing::ExpDist; +using grpc::testing::DetDist; +using grpc::testing::UniformDist; +using grpc::testing::ParetoDist; + +int main(int argc, char **argv) { + RunTest(ExpDist(10.0), 5, std::string("Exponential(10)")); + RunTest(DetDist(5.0), 5, std::string("Det(5)")); + RunTest(UniformDist(0.0, 10.0), 5, std::string("Uniform(1,10)")); + RunTest(ParetoDist(1.0, 1.0), 5, std::string("Pareto(1,1)")); + return 0; +} diff --git a/test/cpp/qps/qps_test.cc b/test/cpp/qps/qps_test.cc index 63a37ae07e..07b4834cc0 100644 --- a/test/cpp/qps/qps_test.cc +++ b/test/cpp/qps/qps_test.cc @@ -67,7 +67,7 @@ static void RunQPS() { const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); - GetReporter()->ReportQPSPerCore(*result, server_config); + GetReporter()->ReportQPSPerCore(*result); GetReporter()->ReportLatency(*result); } diff --git a/test/cpp/qps/qps_test_openloop.cc b/test/cpp/qps/qps_test_openloop.cc new file mode 100644 index 0000000000..52873b2987 --- /dev/null +++ b/test/cpp/qps/qps_test_openloop.cc @@ -0,0 +1,87 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <set> + +#include <grpc/support/log.h> + +#include <signal.h> + +#include "test/cpp/qps/driver.h" +#include "test/cpp/qps/report.h" +#include "test/cpp/util/benchmark_config.h" + +namespace grpc { +namespace testing { + +static const int WARMUP = 5; +static const int BENCHMARK = 10; + +static void RunQPS() { + gpr_log(GPR_INFO, "Running QPS test, open-loop"); + + ClientConfig client_config; + client_config.set_client_type(ASYNC_CLIENT); + client_config.set_enable_ssl(false); + client_config.set_outstanding_rpcs_per_channel(1000); + client_config.set_client_channels(8); + client_config.set_payload_size(1); + client_config.set_async_client_threads(8); + client_config.set_rpc_type(UNARY); + client_config.set_load_type(POISSON); + client_config.mutable_load_params()-> + mutable_poisson()->set_offered_load(10000.0); + + ServerConfig server_config; + server_config.set_server_type(ASYNC_SERVER); + server_config.set_enable_ssl(false); + server_config.set_threads(4); + + const auto result = + RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); + + GetReporter()->ReportQPSPerCore(*result); + GetReporter()->ReportLatency(*result); +} + +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + grpc::testing::InitBenchmark(&argc, &argv, true); + + signal(SIGPIPE, SIG_IGN); + grpc::testing::RunQPS(); + + return 0; +} diff --git a/test/cpp/qps/qps_test_with_poll.cc b/test/cpp/qps/qps_test_with_poll.cc new file mode 100644 index 0000000000..90a8da8d11 --- /dev/null +++ b/test/cpp/qps/qps_test_with_poll.cc @@ -0,0 +1,90 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <set> + +#include <grpc/support/log.h> + +#include <signal.h> + +#include "test/cpp/qps/driver.h" +#include "test/cpp/qps/report.h" +#include "test/cpp/util/benchmark_config.h" + +extern "C" { +#include "src/core/iomgr/pollset_posix.h" +} + +namespace grpc { +namespace testing { + +static const int WARMUP = 5; +static const int BENCHMARK = 5; + +static void RunQPS() { + gpr_log(GPR_INFO, "Running QPS test"); + + ClientConfig client_config; + client_config.set_client_type(ASYNC_CLIENT); + client_config.set_enable_ssl(false); + client_config.set_outstanding_rpcs_per_channel(1000); + client_config.set_client_channels(8); + client_config.set_payload_size(1); + client_config.set_async_client_threads(8); + client_config.set_rpc_type(UNARY); + + ServerConfig server_config; + server_config.set_server_type(ASYNC_SERVER); + server_config.set_enable_ssl(false); + server_config.set_threads(4); + + const auto result = + RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); + + GetReporter()->ReportQPSPerCore(*result); + GetReporter()->ReportLatency(*result); +} + +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + grpc::testing::InitBenchmark(&argc, &argv, true); + + grpc_platform_become_multipoller = grpc_poll_become_multipoller; + + signal(SIGPIPE, SIG_IGN); + grpc::testing::RunQPS(); + + return 0; +} diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc index fb49271991..423275ee85 100644 --- a/test/cpp/qps/qps_worker.cc +++ b/test/cpp/qps/qps_worker.cc @@ -71,6 +71,8 @@ std::unique_ptr<Client> CreateClient(const ClientConfig& config) { return (config.rpc_type() == RpcType::UNARY) ? CreateAsyncUnaryClient(config) : CreateAsyncStreamingClient(config); + default: + abort(); } abort(); } @@ -82,6 +84,8 @@ std::unique_ptr<Server> CreateServer(const ServerConfig& config, return CreateSynchronousServer(config, server_port); case ServerType::ASYNC_SERVER: return CreateAsyncServer(config, server_port); + default: + abort(); } abort(); } @@ -96,7 +100,7 @@ class WorkerImpl GRPC_FINAL : public Worker::Service { GRPC_OVERRIDE { InstanceGuard g(this); if (!g.Acquired()) { - return Status(RESOURCE_EXHAUSTED); + return Status(StatusCode::RESOURCE_EXHAUSTED, ""); } grpc_profiler_start("qps_client.prof"); @@ -110,7 +114,7 @@ class WorkerImpl GRPC_FINAL : public Worker::Service { GRPC_OVERRIDE { InstanceGuard g(this); if (!g.Acquired()) { - return Status(RESOURCE_EXHAUSTED); + return Status(StatusCode::RESOURCE_EXHAUSTED, ""); } grpc_profiler_start("qps_server.prof"); @@ -155,22 +159,22 @@ class WorkerImpl GRPC_FINAL : public Worker::Service { ServerReaderWriter<ClientStatus, ClientArgs>* stream) { ClientArgs args; if (!stream->Read(&args)) { - return Status(INVALID_ARGUMENT); + return Status(StatusCode::INVALID_ARGUMENT, ""); } if (!args.has_setup()) { - return Status(INVALID_ARGUMENT); + return Status(StatusCode::INVALID_ARGUMENT, ""); } auto client = CreateClient(args.setup()); if (!client) { - return Status(INVALID_ARGUMENT); + return Status(StatusCode::INVALID_ARGUMENT, ""); } ClientStatus status; if (!stream->Write(status)) { - return Status(UNKNOWN); + return Status(StatusCode::UNKNOWN, ""); } while (stream->Read(&args)) { if (!args.has_mark()) { - return Status(INVALID_ARGUMENT); + return Status(StatusCode::INVALID_ARGUMENT, ""); } *status.mutable_stats() = client->Mark(); stream->Write(status); @@ -183,23 +187,23 @@ class WorkerImpl GRPC_FINAL : public Worker::Service { ServerReaderWriter<ServerStatus, ServerArgs>* stream) { ServerArgs args; if (!stream->Read(&args)) { - return Status(INVALID_ARGUMENT); + return Status(StatusCode::INVALID_ARGUMENT, ""); } if (!args.has_setup()) { - return Status(INVALID_ARGUMENT); + return Status(StatusCode::INVALID_ARGUMENT, ""); } auto server = CreateServer(args.setup(), server_port_); if (!server) { - return Status(INVALID_ARGUMENT); + return Status(StatusCode::INVALID_ARGUMENT, ""); } ServerStatus status; status.set_port(server_port_); if (!stream->Write(status)) { - return Status(UNKNOWN); + return Status(StatusCode::UNKNOWN, ""); } while (stream->Read(&args)) { if (!args.has_mark()) { - return Status(INVALID_ARGUMENT); + return Status(StatusCode::INVALID_ARGUMENT, ""); } *status.mutable_stats() = server->Mark(); stream->Write(status); diff --git a/test/cpp/qps/qpstest.proto b/test/cpp/qps/qpstest.proto index 122a7df1ac..ef1f9451e9 100644 --- a/test/cpp/qps/qpstest.proto +++ b/test/cpp/qps/qpstest.proto @@ -30,83 +30,121 @@ // An integration test service that covers all the method signature permutations // of unary/streaming requests/responses. -syntax = "proto2"; +syntax = "proto3"; package grpc.testing; enum PayloadType { // Compressable text format. - COMPRESSABLE= 1; + COMPRESSABLE = 0; // Uncompressable binary format. - UNCOMPRESSABLE = 2; + UNCOMPRESSABLE = 1; // Randomly chosen from all other formats defined in this enum. - RANDOM = 3; + RANDOM = 2; } message StatsRequest { // run number - optional int32 test_num = 1; + int32 test_num = 1; } message ServerStats { // wall clock time - required double time_elapsed = 1; + double time_elapsed = 1; // user time used by the server process and threads - required double time_user = 2; + double time_user = 2; // server time used by the server process and all threads - required double time_system = 3; + double time_system = 3; } message Payload { // The type of data in body. - optional PayloadType type = 1; + PayloadType type = 1; // Primary contents of payload. - optional bytes body = 2; + bytes body = 2; } message HistogramData { repeated uint32 bucket = 1; - required double min_seen = 2; - required double max_seen = 3; - required double sum = 4; - required double sum_of_squares = 5; - required double count = 6; + double min_seen = 2; + double max_seen = 3; + double sum = 4; + double sum_of_squares = 5; + double count = 6; } enum ClientType { - SYNCHRONOUS_CLIENT = 1; - ASYNC_CLIENT = 2; + SYNCHRONOUS_CLIENT = 0; + ASYNC_CLIENT = 1; } enum ServerType { - SYNCHRONOUS_SERVER = 1; - ASYNC_SERVER = 2; + SYNCHRONOUS_SERVER = 0; + ASYNC_SERVER = 1; } enum RpcType { - UNARY = 1; - STREAMING = 2; + UNARY = 0; + STREAMING = 1; +} + +enum LoadType { + CLOSED_LOOP = 0; + POISSON = 1; + UNIFORM = 2; + DETERMINISTIC = 3; + PARETO = 4; +} + +message PoissonParams { + double offered_load = 1; +} + +message UniformParams { + double interarrival_lo = 1; + double interarrival_hi = 2; +} + +message DeterministicParams { + double offered_load = 1; +} + +message ParetoParams { + double interarrival_base = 1; + double alpha = 2; +} + +message LoadParams { + oneof load { + PoissonParams poisson = 1; + UniformParams uniform = 2; + DeterministicParams determ = 3; + ParetoParams pareto = 4; + }; } message ClientConfig { repeated string server_targets = 1; - required ClientType client_type = 2; - optional bool enable_ssl = 3 [default=false]; - required int32 outstanding_rpcs_per_channel = 4; - required int32 client_channels = 5; - required int32 payload_size = 6; + ClientType client_type = 2; + bool enable_ssl = 3; + int32 outstanding_rpcs_per_channel = 4; + int32 client_channels = 5; + int32 payload_size = 6; // only for async client: - optional int32 async_client_threads = 7; - optional RpcType rpc_type = 8 [default=UNARY]; - optional string host = 9; + int32 async_client_threads = 7; + RpcType rpc_type = 8; + string host = 9; + LoadType load_type = 10; + LoadParams load_params = 11; } // Request current stats -message Mark {} +message Mark { +} message ClientArgs { oneof argtype { @@ -116,21 +154,21 @@ message ClientArgs { } message ClientStats { - required HistogramData latencies = 1; - required double time_elapsed = 3; - required double time_user = 4; - required double time_system = 5; + HistogramData latencies = 1; + double time_elapsed = 2; + double time_user = 3; + double time_system = 4; } message ClientStatus { - optional ClientStats stats = 1; + ClientStats stats = 1; } message ServerConfig { - required ServerType server_type = 1; - optional int32 threads = 2 [default=1]; - optional bool enable_ssl = 3 [default=false]; - optional string host = 4; + ServerType server_type = 1; + int32 threads = 2; + bool enable_ssl = 3; + string host = 4; } message ServerArgs { @@ -141,25 +179,25 @@ message ServerArgs { } message ServerStatus { - optional ServerStats stats = 1; - required int32 port = 2; + ServerStats stats = 1; + int32 port = 2; } message SimpleRequest { // Desired payload type in the response from the server. // If response_type is RANDOM, server randomly chooses one from other formats. - optional PayloadType response_type = 1 [default=COMPRESSABLE]; + PayloadType response_type = 1; // Desired payload size in the response from the server. // If response_type is COMPRESSABLE, this denotes the size before compression. - optional int32 response_size = 2 [default=0]; + int32 response_size = 2; // Optional input payload sent along with the request. - optional Payload payload = 3; + Payload payload = 3; } message SimpleResponse { - optional Payload payload = 1; + Payload payload = 1; } service TestService { diff --git a/test/cpp/qps/report.cc b/test/cpp/qps/report.cc index e116175e3b..678ea080d1 100644 --- a/test/cpp/qps/report.cc +++ b/test/cpp/qps/report.cc @@ -49,10 +49,9 @@ void CompositeReporter::ReportQPS(const ScenarioResult& result) const { } } -void CompositeReporter::ReportQPSPerCore(const ScenarioResult& result, - const ServerConfig& config) const { +void CompositeReporter::ReportQPSPerCore(const ScenarioResult& result) const { for (size_t i = 0; i < reporters_.size(); ++i) { - reporters_[i]->ReportQPSPerCore(result, config); + reporters_[i]->ReportQPSPerCore(result); } } @@ -76,15 +75,14 @@ void GprLogReporter::ReportQPS(const ScenarioResult& result) const { [](ResourceUsage u) { return u.wall_time; })); } -void GprLogReporter::ReportQPSPerCore(const ScenarioResult& result, - const ServerConfig& server_config) const { +void GprLogReporter::ReportQPSPerCore(const ScenarioResult& result) const { auto qps = result.latencies.Count() / average(result.client_resources, [](ResourceUsage u) { return u.wall_time; }); gpr_log(GPR_INFO, "QPS: %.1f (%.1f/server core)", qps, - qps / server_config.threads()); + qps / result.server_config.threads()); } void GprLogReporter::ReportLatency(const ScenarioResult& result) const { diff --git a/test/cpp/qps/report.h b/test/cpp/qps/report.h index 630275ecda..0cce08816a 100644 --- a/test/cpp/qps/report.h +++ b/test/cpp/qps/report.h @@ -62,8 +62,7 @@ class Reporter { virtual void ReportQPS(const ScenarioResult& result) const = 0; /** Reports QPS per core as (YYY/server core). */ - virtual void ReportQPSPerCore(const ScenarioResult& result, - const ServerConfig& config) const = 0; + virtual void ReportQPSPerCore(const ScenarioResult& result) const = 0; /** Reports latencies for the 50, 90, 95, 99 and 99.9 percentiles, in ms. */ virtual void ReportLatency(const ScenarioResult& result) const = 0; @@ -84,8 +83,7 @@ class CompositeReporter : public Reporter { void add(std::unique_ptr<Reporter> reporter); void ReportQPS(const ScenarioResult& result) const GRPC_OVERRIDE; - void ReportQPSPerCore(const ScenarioResult& result, - const ServerConfig& config) const GRPC_OVERRIDE; + void ReportQPSPerCore(const ScenarioResult& result) const GRPC_OVERRIDE; void ReportLatency(const ScenarioResult& result) const GRPC_OVERRIDE; void ReportTimes(const ScenarioResult& result) const GRPC_OVERRIDE; @@ -100,8 +98,7 @@ class GprLogReporter : public Reporter { private: void ReportQPS(const ScenarioResult& result) const GRPC_OVERRIDE; - void ReportQPSPerCore(const ScenarioResult& result, - const ServerConfig& config) const GRPC_OVERRIDE; + void ReportQPSPerCore(const ScenarioResult& result) const GRPC_OVERRIDE; void ReportLatency(const ScenarioResult& result) const GRPC_OVERRIDE; void ReportTimes(const ScenarioResult& result) const GRPC_OVERRIDE; }; diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 977dfc2372..210aef4fd6 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -73,38 +73,43 @@ class AsyncQpsServerTest : public Server { gpr_free(server_address); builder.RegisterAsyncService(&async_service_); - srv_cq_ = builder.AddCompletionQueue(); + for (int i = 0; i < config.threads(); i++) { + srv_cqs_.emplace_back(std::move(builder.AddCompletionQueue())); + } server_ = builder.BuildAndStart(); using namespace std::placeholders; - request_unary_ = - std::bind(&TestService::AsyncService::RequestUnaryCall, &async_service_, - _1, _2, _3, srv_cq_.get(), srv_cq_.get(), _4); - request_streaming_ = - std::bind(&TestService::AsyncService::RequestStreamingCall, - &async_service_, _1, _2, srv_cq_.get(), srv_cq_.get(), _3); - for (int i = 0; i < 100; i++) { - contexts_.push_front( - new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( - request_unary_, ProcessRPC)); - contexts_.push_front( - new ServerRpcContextStreamingImpl<SimpleRequest, SimpleResponse>( - request_streaming_, ProcessRPC)); + for (int i = 0; i < 10; i++) { + for (int j = 0; j < config.threads(); j++) { + auto request_unary = std::bind( + &TestService::AsyncService::RequestUnaryCall, &async_service_, _1, + _2, _3, srv_cqs_[j].get(), srv_cqs_[j].get(), _4); + auto request_streaming = std::bind( + &TestService::AsyncService::RequestStreamingCall, &async_service_, + _1, _2, srv_cqs_[j].get(), srv_cqs_[j].get(), _3); + contexts_.push_front( + new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( + request_unary, ProcessRPC)); + contexts_.push_front( + new ServerRpcContextStreamingImpl<SimpleRequest, SimpleResponse>( + request_streaming, ProcessRPC)); + } } for (int i = 0; i < config.threads(); i++) { threads_.push_back(std::thread([=]() { // Wait until work is available or we are shutting down bool ok; void *got_tag; - while (srv_cq_->Next(&got_tag, &ok)) { + while (srv_cqs_[i]->Next(&got_tag, &ok)) { ServerRpcContext *ctx = detag(got_tag); // The tag is a pointer to an RPC context to invoke bool still_going = ctx->RunNextState(ok); - std::lock_guard<std::mutex> g(shutdown_mutex_); + std::unique_lock<std::mutex> g(shutdown_mutex_); if (!shutdown_) { // this RPC context is done, so refresh it if (!still_going) { + g.unlock(); ctx->Reset(); } } else { @@ -124,11 +129,13 @@ class AsyncQpsServerTest : public Server { for (auto thr = threads_.begin(); thr != threads_.end(); thr++) { thr->join(); } - srv_cq_->Shutdown(); - bool ok; - void *got_tag; - while (srv_cq_->Next(&got_tag, &ok)) - ; + for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) { + (*cq)->Shutdown(); + bool ok; + void *got_tag; + while ((*cq)->Next(&got_tag, &ok)) + ; + } while (!contexts_.empty()) { delete contexts_.front(); contexts_.pop_front(); @@ -305,15 +312,8 @@ class AsyncQpsServerTest : public Server { } std::vector<std::thread> threads_; std::unique_ptr<grpc::Server> server_; - std::unique_ptr<grpc::ServerCompletionQueue> srv_cq_; + std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> srv_cqs_; TestService::AsyncService async_service_; - std::function<void(ServerContext *, SimpleRequest *, - grpc::ServerAsyncResponseWriter<SimpleResponse> *, void *)> - request_unary_; - std::function<void( - ServerContext *, - grpc::ServerAsyncReaderWriter<SimpleResponse, SimpleRequest> *, void *)> - request_streaming_; std::forward_list<ServerRpcContext *> contexts_; std::mutex shutdown_mutex_; diff --git a/test/cpp/util/cli_call.cc b/test/cpp/util/cli_call.cc index eb67b8d314..fa952e8349 100644 --- a/test/cpp/util/cli_call.cc +++ b/test/cpp/util/cli_call.cc @@ -86,7 +86,7 @@ void CliCall::Call(std::shared_ptr<grpc::ChannelInterface> channel, cq.Next(&got_tag, &ok); GPR_ASSERT(ok); - if (status.IsOk()) { + if (status.ok()) { std::cout << "RPC finished with OK status." << std::endl; std::vector<grpc::Slice> slices; recv_buffer.Dump(&slices); @@ -97,8 +97,8 @@ void CliCall::Call(std::shared_ptr<grpc::ChannelInterface> channel, slices[i].size()); } } else { - std::cout << "RPC finished with status code " << status.code() - << " details: " << status.details() << std::endl; + std::cout << "RPC finished with status code " << status.error_code() + << " details: " << status.error_message() << std::endl; } } diff --git a/test/cpp/util/cli_call_test.cc b/test/cpp/util/cli_call_test.cc index 457a5e77de..12539c7479 100644 --- a/test/cpp/util/cli_call_test.cc +++ b/test/cpp/util/cli_call_test.cc @@ -108,7 +108,7 @@ TEST_F(CliCallTest, SimpleRpc) { ClientContext context; Status s = stub_->Echo(&context, request, &response); EXPECT_EQ(response.message(), request.message()); - EXPECT_TRUE(s.IsOk()); + EXPECT_TRUE(s.ok()); const grpc::string kMethod("/grpc.cpp.test.util.TestService/Echo"); grpc::string request_bin, response_bin, expected_response_bin; |