From ed96983e22a32abd533733f3cc04f6f3b1e2ef47 Mon Sep 17 00:00:00 2001 From: Masood Malekghassemi Date: Mon, 3 Aug 2015 14:54:44 -0700 Subject: Move Python protoc plugin tests to grpcio_test --- .../grpcio_test/grpc_protoc_plugin/__init__.py | 30 ++ .../grpc_protoc_plugin/python_plugin_test.py | 541 +++++++++++++++++++++ .../grpcio_test/grpc_protoc_plugin/test.proto | 139 ++++++ src/python/grpcio_test/setup.py | 11 +- 4 files changed, 718 insertions(+), 3 deletions(-) create mode 100644 src/python/grpcio_test/grpc_protoc_plugin/__init__.py create mode 100644 src/python/grpcio_test/grpc_protoc_plugin/python_plugin_test.py create mode 100644 src/python/grpcio_test/grpc_protoc_plugin/test.proto (limited to 'src/python') diff --git a/src/python/grpcio_test/grpc_protoc_plugin/__init__.py b/src/python/grpcio_test/grpc_protoc_plugin/__init__.py new file mode 100644 index 0000000000..7086519106 --- /dev/null +++ b/src/python/grpcio_test/grpc_protoc_plugin/__init__.py @@ -0,0 +1,30 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + diff --git a/src/python/grpcio_test/grpc_protoc_plugin/python_plugin_test.py b/src/python/grpcio_test/grpc_protoc_plugin/python_plugin_test.py new file mode 100644 index 0000000000..b200d129a9 --- /dev/null +++ b/src/python/grpcio_test/grpc_protoc_plugin/python_plugin_test.py @@ -0,0 +1,541 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import argparse +import contextlib +import distutils.spawn +import errno +import itertools +import os +import pkg_resources +import shutil +import subprocess +import sys +import tempfile +import threading +import time +import unittest + +from grpc.framework.alpha import exceptions +from grpc.framework.foundation import future + +# Identifiers of entities we expect to find in the generated module. +SERVICER_IDENTIFIER = 'EarlyAdopterTestServiceServicer' +SERVER_IDENTIFIER = 'EarlyAdopterTestServiceServer' +STUB_IDENTIFIER = 'EarlyAdopterTestServiceStub' +SERVER_FACTORY_IDENTIFIER = 'early_adopter_create_TestService_server' +STUB_FACTORY_IDENTIFIER = 'early_adopter_create_TestService_stub' + +# The timeout used in tests of RPCs that are supposed to expire. +SHORT_TIMEOUT = 2 +# The timeout used in tests of RPCs that are not supposed to expire. The +# absurdly large value doesn't matter since no passing execution of this test +# module will ever wait the duration. +LONG_TIMEOUT = 600 +NO_DELAY = 0 + + +class _ServicerMethods(object): + + def __init__(self, test_pb2, delay): + self._condition = threading.Condition() + self._delay = delay + self._paused = False + self._fail = False + self._test_pb2 = test_pb2 + + @contextlib.contextmanager + def pause(self): # pylint: disable=invalid-name + with self._condition: + self._paused = True + yield + with self._condition: + self._paused = False + self._condition.notify_all() + + @contextlib.contextmanager + def fail(self): # pylint: disable=invalid-name + with self._condition: + self._fail = True + yield + with self._condition: + self._fail = False + + def _control(self): # pylint: disable=invalid-name + with self._condition: + if self._fail: + raise ValueError() + while self._paused: + self._condition.wait() + time.sleep(self._delay) + + def UnaryCall(self, request, unused_rpc_context): + response = self._test_pb2.SimpleResponse() + response.payload.payload_type = self._test_pb2.COMPRESSABLE + response.payload.payload_compressable = 'a' * request.response_size + self._control() + return response + + def StreamingOutputCall(self, request, unused_rpc_context): + for parameter in request.response_parameters: + response = self._test_pb2.StreamingOutputCallResponse() + response.payload.payload_type = self._test_pb2.COMPRESSABLE + response.payload.payload_compressable = 'a' * parameter.size + self._control() + yield response + + def StreamingInputCall(self, request_iter, unused_rpc_context): + response = self._test_pb2.StreamingInputCallResponse() + aggregated_payload_size = 0 + for request in request_iter: + aggregated_payload_size += len(request.payload.payload_compressable) + response.aggregated_payload_size = aggregated_payload_size + self._control() + return response + + def FullDuplexCall(self, request_iter, unused_rpc_context): + for request in request_iter: + for parameter in request.response_parameters: + response = self._test_pb2.StreamingOutputCallResponse() + response.payload.payload_type = self._test_pb2.COMPRESSABLE + response.payload.payload_compressable = 'a' * parameter.size + self._control() + yield response + + def HalfDuplexCall(self, request_iter, unused_rpc_context): + responses = [] + for request in request_iter: + for parameter in request.response_parameters: + response = self._test_pb2.StreamingOutputCallResponse() + response.payload.payload_type = self._test_pb2.COMPRESSABLE + response.payload.payload_compressable = 'a' * parameter.size + self._control() + responses.append(response) + for response in responses: + yield response + + +@contextlib.contextmanager +def _CreateService(test_pb2, delay): + """Provides a servicer backend and a stub. + + The servicer is just the implementation + of the actual servicer passed to the face player of the python RPC + implementation; the two are detached. + + Non-zero delay puts a delay on each call to the servicer, representative of + communication latency. Timeout is the default timeout for the stub while + waiting for the service. + + Args: + test_pb2: The test_pb2 module generated by this test. + delay: Delay in seconds per response from the servicer. + + Yields: + A (servicer_methods, servicer, stub) three-tuple where servicer_methods is + the back-end of the service bound to the stub and the server and stub + are both activated and ready for use. + """ + servicer_methods = _ServicerMethods(test_pb2, delay) + + class Servicer(getattr(test_pb2, SERVICER_IDENTIFIER)): + + def UnaryCall(self, request, context): + return servicer_methods.UnaryCall(request, context) + + def StreamingOutputCall(self, request, context): + return servicer_methods.StreamingOutputCall(request, context) + + def StreamingInputCall(self, request_iter, context): + return servicer_methods.StreamingInputCall(request_iter, context) + + def FullDuplexCall(self, request_iter, context): + return servicer_methods.FullDuplexCall(request_iter, context) + + def HalfDuplexCall(self, request_iter, context): + return servicer_methods.HalfDuplexCall(request_iter, context) + + servicer = Servicer() + server = getattr( + test_pb2, SERVER_FACTORY_IDENTIFIER)(servicer, 0) + with server: + port = server.port() + stub = getattr(test_pb2, STUB_FACTORY_IDENTIFIER)('localhost', port) + with stub: + yield servicer_methods, stub, server + + +def _streaming_input_request_iterator(test_pb2): + for _ in range(3): + request = test_pb2.StreamingInputCallRequest() + request.payload.payload_type = test_pb2.COMPRESSABLE + request.payload.payload_compressable = 'a' + yield request + + +def _streaming_output_request(test_pb2): + request = test_pb2.StreamingOutputCallRequest() + sizes = [1, 2, 3] + request.response_parameters.add(size=sizes[0], interval_us=0) + request.response_parameters.add(size=sizes[1], interval_us=0) + request.response_parameters.add(size=sizes[2], interval_us=0) + return request + + +def _full_duplex_request_iterator(test_pb2): + request = test_pb2.StreamingOutputCallRequest() + request.response_parameters.add(size=1, interval_us=0) + yield request + request = test_pb2.StreamingOutputCallRequest() + request.response_parameters.add(size=2, interval_us=0) + request.response_parameters.add(size=3, interval_us=0) + yield request + + +class PythonPluginTest(unittest.TestCase): + """Test case for the gRPC Python protoc-plugin. + + While reading these tests, remember that the futures API + (`stub.method.async()`) only gives futures for the *non-streaming* responses, + else it behaves like its blocking cousin. + """ + + def setUp(self): + # Assume that the appropriate protoc and grpc_python_plugins are on the + # path. + protoc_command = 'protoc' + protoc_plugin_filename = distutils.spawn.find_executable( + 'grpc_python_plugin') + test_proto_filename = pkg_resources.resource_filename( + 'grpc_protoc_plugin', 'test.proto') + if not os.path.isfile(protoc_command): + # Assume that if we haven't built protoc that it's on the system. + protoc_command = 'protoc' + + # Ensure that the output directory exists. + self.outdir = tempfile.mkdtemp() + + # Invoke protoc with the plugin. + cmd = [ + protoc_command, + '--plugin=protoc-gen-python-grpc=%s' % protoc_plugin_filename, + '-I .', + '--python_out=%s' % self.outdir, + '--python-grpc_out=%s' % self.outdir, + os.path.basename(test_proto_filename), + ] + subprocess.check_call(' '.join(cmd), shell=True, env=os.environ, + cwd=os.path.dirname(test_proto_filename)) + sys.path.append(self.outdir) + + def tearDown(self): + try: + shutil.rmtree(self.outdir) + except OSError as exc: + if exc.errno != errno.ENOENT: + raise + + # TODO(atash): Figure out which of these tests is hanging flakily with small + # probability. + + def testImportAttributes(self): + # check that we can access the generated module and its members. + import test_pb2 # pylint: disable=g-import-not-at-top + self.assertIsNotNone(getattr(test_pb2, SERVICER_IDENTIFIER, None)) + self.assertIsNotNone(getattr(test_pb2, SERVER_IDENTIFIER, None)) + self.assertIsNotNone(getattr(test_pb2, STUB_IDENTIFIER, None)) + self.assertIsNotNone(getattr(test_pb2, SERVER_FACTORY_IDENTIFIER, None)) + self.assertIsNotNone(getattr(test_pb2, STUB_FACTORY_IDENTIFIER, None)) + + def testUpDown(self): + import test_pb2 + with _CreateService( + test_pb2, NO_DELAY) as (servicer, stub, unused_server): + request = test_pb2.SimpleRequest(response_size=13) + + def testUnaryCall(self): + import test_pb2 # pylint: disable=g-import-not-at-top + with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): + timeout = 6 # TODO(issue 2039): LONG_TIMEOUT like the other methods. + request = test_pb2.SimpleRequest(response_size=13) + response = stub.UnaryCall(request, timeout) + expected_response = methods.UnaryCall(request, 'not a real RpcContext!') + self.assertEqual(expected_response, response) + + def testUnaryCallAsync(self): + import test_pb2 # pylint: disable=g-import-not-at-top + request = test_pb2.SimpleRequest(response_size=13) + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + # Check that the call does not block waiting for the server to respond. + with methods.pause(): + response_future = stub.UnaryCall.async(request, LONG_TIMEOUT) + response = response_future.result() + expected_response = methods.UnaryCall(request, 'not a real RpcContext!') + self.assertEqual(expected_response, response) + + def testUnaryCallAsyncExpired(self): + import test_pb2 # pylint: disable=g-import-not-at-top + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + request = test_pb2.SimpleRequest(response_size=13) + with methods.pause(): + response_future = stub.UnaryCall.async(request, SHORT_TIMEOUT) + with self.assertRaises(exceptions.ExpirationError): + response_future.result() + + @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' + 'forever and fix.') + def testUnaryCallAsyncCancelled(self): + import test_pb2 # pylint: disable=g-import-not-at-top + request = test_pb2.SimpleRequest(response_size=13) + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.pause(): + response_future = stub.UnaryCall.async(request, 1) + response_future.cancel() + self.assertTrue(response_future.cancelled()) + + def testUnaryCallAsyncFailed(self): + import test_pb2 # pylint: disable=g-import-not-at-top + request = test_pb2.SimpleRequest(response_size=13) + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.fail(): + response_future = stub.UnaryCall.async(request, LONG_TIMEOUT) + self.assertIsNotNone(response_future.exception()) + + def testStreamingOutputCall(self): + import test_pb2 # pylint: disable=g-import-not-at-top + request = _streaming_output_request(test_pb2) + with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): + responses = stub.StreamingOutputCall(request, LONG_TIMEOUT) + expected_responses = methods.StreamingOutputCall( + request, 'not a real RpcContext!') + for expected_response, response in itertools.izip_longest( + expected_responses, responses): + self.assertEqual(expected_response, response) + + @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' + 'forever and fix.') + def testStreamingOutputCallExpired(self): + import test_pb2 # pylint: disable=g-import-not-at-top + request = _streaming_output_request(test_pb2) + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.pause(): + responses = stub.StreamingOutputCall(request, SHORT_TIMEOUT) + with self.assertRaises(exceptions.ExpirationError): + list(responses) + + @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' + 'forever and fix.') + def testStreamingOutputCallCancelled(self): + import test_pb2 # pylint: disable=g-import-not-at-top + request = _streaming_output_request(test_pb2) + with _CreateService(test_pb2, NO_DELAY) as ( + unused_methods, stub, unused_server): + responses = stub.StreamingOutputCall(request, SHORT_TIMEOUT) + next(responses) + responses.cancel() + with self.assertRaises(future.CancelledError): + next(responses) + + @unittest.skip('TODO(atash,nathaniel): figure out why this times out ' + 'instead of raising the proper error.') + def testStreamingOutputCallFailed(self): + import test_pb2 # pylint: disable=g-import-not-at-top + request = _streaming_output_request(test_pb2) + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.fail(): + responses = stub.StreamingOutputCall(request, 1) + self.assertIsNotNone(responses) + with self.assertRaises(exceptions.ServicerError): + next(responses) + + @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' + 'forever and fix.') + def testStreamingInputCall(self): + import test_pb2 # pylint: disable=g-import-not-at-top + with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): + response = stub.StreamingInputCall( + _streaming_input_request_iterator(test_pb2), LONG_TIMEOUT) + expected_response = methods.StreamingInputCall( + _streaming_input_request_iterator(test_pb2), 'not a real RpcContext!') + self.assertEqual(expected_response, response) + + def testStreamingInputCallAsync(self): + import test_pb2 # pylint: disable=g-import-not-at-top + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.pause(): + response_future = stub.StreamingInputCall.async( + _streaming_input_request_iterator(test_pb2), LONG_TIMEOUT) + response = response_future.result() + expected_response = methods.StreamingInputCall( + _streaming_input_request_iterator(test_pb2), 'not a real RpcContext!') + self.assertEqual(expected_response, response) + + def testStreamingInputCallAsyncExpired(self): + import test_pb2 # pylint: disable=g-import-not-at-top + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.pause(): + response_future = stub.StreamingInputCall.async( + _streaming_input_request_iterator(test_pb2), SHORT_TIMEOUT) + with self.assertRaises(exceptions.ExpirationError): + response_future.result() + self.assertIsInstance( + response_future.exception(), exceptions.ExpirationError) + + def testStreamingInputCallAsyncCancelled(self): + import test_pb2 # pylint: disable=g-import-not-at-top + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.pause(): + timeout = 6 # TODO(issue 2039): LONG_TIMEOUT like the other methods. + response_future = stub.StreamingInputCall.async( + _streaming_input_request_iterator(test_pb2), timeout) + response_future.cancel() + self.assertTrue(response_future.cancelled()) + with self.assertRaises(future.CancelledError): + response_future.result() + + def testStreamingInputCallAsyncFailed(self): + import test_pb2 # pylint: disable=g-import-not-at-top + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.fail(): + response_future = stub.StreamingInputCall.async( + _streaming_input_request_iterator(test_pb2), SHORT_TIMEOUT) + self.assertIsNotNone(response_future.exception()) + + def testFullDuplexCall(self): + import test_pb2 # pylint: disable=g-import-not-at-top + with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): + responses = stub.FullDuplexCall( + _full_duplex_request_iterator(test_pb2), LONG_TIMEOUT) + expected_responses = methods.FullDuplexCall( + _full_duplex_request_iterator(test_pb2), 'not a real RpcContext!') + for expected_response, response in itertools.izip_longest( + expected_responses, responses): + self.assertEqual(expected_response, response) + + @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' + 'forever and fix.') + def testFullDuplexCallExpired(self): + import test_pb2 # pylint: disable=g-import-not-at-top + request_iterator = _full_duplex_request_iterator(test_pb2) + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.pause(): + responses = stub.FullDuplexCall(request_iterator, SHORT_TIMEOUT) + with self.assertRaises(exceptions.ExpirationError): + list(responses) + + @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' + 'forever and fix.') + def testFullDuplexCallCancelled(self): + import test_pb2 # pylint: disable=g-import-not-at-top + with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): + request_iterator = _full_duplex_request_iterator(test_pb2) + responses = stub.FullDuplexCall(request_iterator, LONG_TIMEOUT) + next(responses) + responses.cancel() + with self.assertRaises(future.CancelledError): + next(responses) + + @unittest.skip('TODO(atash,nathaniel): figure out why this hangs forever ' + 'and fix.') + def testFullDuplexCallFailed(self): + import test_pb2 # pylint: disable=g-import-not-at-top + request_iterator = _full_duplex_request_iterator(test_pb2) + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.fail(): + responses = stub.FullDuplexCall(request_iterator, LONG_TIMEOUT) + self.assertIsNotNone(responses) + with self.assertRaises(exceptions.ServicerError): + next(responses) + + @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' + 'forever and fix.') + def testHalfDuplexCall(self): + import test_pb2 # pylint: disable=g-import-not-at-top + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + def half_duplex_request_iterator(): + request = test_pb2.StreamingOutputCallRequest() + request.response_parameters.add(size=1, interval_us=0) + yield request + request = test_pb2.StreamingOutputCallRequest() + request.response_parameters.add(size=2, interval_us=0) + request.response_parameters.add(size=3, interval_us=0) + yield request + responses = stub.HalfDuplexCall( + half_duplex_request_iterator(), LONG_TIMEOUT) + expected_responses = methods.HalfDuplexCall( + half_duplex_request_iterator(), 'not a real RpcContext!') + for check in itertools.izip_longest(expected_responses, responses): + expected_response, response = check + self.assertEqual(expected_response, response) + + def testHalfDuplexCallWedged(self): + import test_pb2 # pylint: disable=g-import-not-at-top + condition = threading.Condition() + wait_cell = [False] + @contextlib.contextmanager + def wait(): # pylint: disable=invalid-name + # Where's Python 3's 'nonlocal' statement when you need it? + with condition: + wait_cell[0] = True + yield + with condition: + wait_cell[0] = False + condition.notify_all() + def half_duplex_request_iterator(): + request = test_pb2.StreamingOutputCallRequest() + request.response_parameters.add(size=1, interval_us=0) + yield request + with condition: + while wait_cell[0]: + condition.wait() + with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): + with wait(): + responses = stub.HalfDuplexCall( + half_duplex_request_iterator(), SHORT_TIMEOUT) + # half-duplex waits for the client to send all info + with self.assertRaises(exceptions.ExpirationError): + next(responses) + + +if __name__ == '__main__': + os.chdir(os.path.dirname(sys.argv[0])) + unittest.main(verbosity=2) diff --git a/src/python/grpcio_test/grpc_protoc_plugin/test.proto b/src/python/grpcio_test/grpc_protoc_plugin/test.proto new file mode 100644 index 0000000000..ed7c6a7b79 --- /dev/null +++ b/src/python/grpcio_test/grpc_protoc_plugin/test.proto @@ -0,0 +1,139 @@ +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +// An integration test service that covers all the method signature permutations +// of unary/streaming requests/responses. +// This file is duplicated around the code base. See GitHub issue #526. +syntax = "proto2"; + +package grpc.testing; + +enum PayloadType { + // Compressable text format. + COMPRESSABLE= 1; + + // Uncompressable binary format. + UNCOMPRESSABLE = 2; + + // Randomly chosen from all other formats defined in this enum. + RANDOM = 3; +} + +message Payload { + required PayloadType payload_type = 1; + oneof payload_body { + string payload_compressable = 2; + bytes payload_uncompressable = 3; + } +} + +message SimpleRequest { + // Desired payload type in the response from the server. + // If response_type is RANDOM, server randomly chooses one from other formats. + optional PayloadType response_type = 1 [default=COMPRESSABLE]; + + // Desired payload size in the response from the server. + // If response_type is COMPRESSABLE, this denotes the size before compression. + optional int32 response_size = 2; + + // Optional input payload sent along with the request. + optional Payload payload = 3; +} + +message SimpleResponse { + optional Payload payload = 1; +} + +message StreamingInputCallRequest { + // Optional input payload sent along with the request. + optional Payload payload = 1; + + // Not expecting any payload from the response. +} + +message StreamingInputCallResponse { + // Aggregated size of payloads received from the client. + optional int32 aggregated_payload_size = 1; +} + +message ResponseParameters { + // Desired payload sizes in responses from the server. + // If response_type is COMPRESSABLE, this denotes the size before compression. + required int32 size = 1; + + // Desired interval between consecutive responses in the response stream in + // microseconds. + required int32 interval_us = 2; +} + +message StreamingOutputCallRequest { + // Desired payload type in the response from the server. + // If response_type is RANDOM, the payload from each response in the stream + // might be of different types. This is to simulate a mixed type of payload + // stream. + optional PayloadType response_type = 1 [default=COMPRESSABLE]; + + repeated ResponseParameters response_parameters = 2; + + // Optional input payload sent along with the request. + optional Payload payload = 3; +} + +message StreamingOutputCallResponse { + optional Payload payload = 1; +} + +service TestService { + // One request followed by one response. + // The server returns the client payload as-is. + rpc UnaryCall(SimpleRequest) returns (SimpleResponse); + + // One request followed by a sequence of responses (streamed download). + // The server returns the payload with client desired type and sizes. + rpc StreamingOutputCall(StreamingOutputCallRequest) + returns (stream StreamingOutputCallResponse); + + // A sequence of requests followed by one response (streamed upload). + // The server returns the aggregated size of client payload as the result. + rpc StreamingInputCall(stream StreamingInputCallRequest) + returns (StreamingInputCallResponse); + + // A sequence of requests with each request served by the server immediately. + // As one request could lead to multiple responses, this interface + // demonstrates the idea of full duplexing. + rpc FullDuplexCall(stream StreamingOutputCallRequest) + returns (stream StreamingOutputCallResponse); + + // A sequence of requests followed by a sequence of responses. + // The server buffers all the client requests and then serves them in order. A + // stream of responses are returned to the client when the server starts with + // first request. + rpc HalfDuplexCall(stream StreamingOutputCallRequest) + returns (stream StreamingOutputCallResponse); +} diff --git a/src/python/grpcio_test/setup.py b/src/python/grpcio_test/setup.py index 925c32720f..a6203cae2d 100644 --- a/src/python/grpcio_test/setup.py +++ b/src/python/grpcio_test/setup.py @@ -48,8 +48,13 @@ _PACKAGE_DIRECTORIES = { _PACKAGE_DATA = { 'grpc_interop': [ - 'credentials/ca.pem', 'credentials/server1.key', - 'credentials/server1.pem',] + 'credentials/ca.pem', + 'credentials/server1.key', + 'credentials/server1.pem', + ], + 'grpc_protoc_plugin': [ + 'test.proto', + ], } _SETUP_REQUIRES = ( @@ -75,5 +80,5 @@ setuptools.setup( package_data=_PACKAGE_DATA, install_requires=_INSTALL_REQUIRES + _SETUP_REQUIRES, setup_requires=_SETUP_REQUIRES, - cmdclass=_COMMAND_CLASS + cmdclass=_COMMAND_CLASS, ) -- cgit v1.2.3 From 93c3b3f7eaaabf50493df1e26b05a6de2f7e7f21 Mon Sep 17 00:00:00 2001 From: Masood Malekghassemi Date: Wed, 5 Aug 2015 13:33:15 -0700 Subject: Use common timeouts in Python face-layer test-cases --- ...blocking_invocation_inline_service_test_case.py | 34 ++++++------ ...vocation_synchronous_event_service_test_case.py | 64 +++++++++++++--------- ...ocation_asynchronous_event_service_test_case.py | 45 ++++++++------- 3 files changed, 80 insertions(+), 63 deletions(-) (limited to 'src/python') diff --git a/src/python/grpcio_test/grpc_test/framework/face/testing/blocking_invocation_inline_service_test_case.py b/src/python/grpcio_test/grpc_test/framework/face/testing/blocking_invocation_inline_service_test_case.py index 7e1158f96b..251e1eb68e 100644 --- a/src/python/grpcio_test/grpc_test/framework/face/testing/blocking_invocation_inline_service_test_case.py +++ b/src/python/grpcio_test/grpc_test/framework/face/testing/blocking_invocation_inline_service_test_case.py @@ -34,15 +34,13 @@ import abc import unittest # pylint: disable=unused-import from grpc.framework.face import exceptions +from grpc_test.framework.common import test_constants from grpc_test.framework.face.testing import control from grpc_test.framework.face.testing import coverage from grpc_test.framework.face.testing import digest from grpc_test.framework.face.testing import stock_service from grpc_test.framework.face.testing import test_case -_TIMEOUT = 3 -_LONG_TIMEOUT = 45 - class BlockingInvocationInlineServiceTestCase( test_case.FaceTestCase, coverage.BlockingCoverage): @@ -79,7 +77,7 @@ class BlockingInvocationInlineServiceTestCase( request = test_messages.request() response = self.stub.blocking_value_in_value_out( - name, request, _LONG_TIMEOUT) + name, request, test_constants.LONG_TIMEOUT) test_messages.verify(request, response, self) @@ -90,7 +88,7 @@ class BlockingInvocationInlineServiceTestCase( request = test_messages.request() response_iterator = self.stub.inline_value_in_stream_out( - name, request, _LONG_TIMEOUT) + name, request, test_constants.LONG_TIMEOUT) responses = list(response_iterator) test_messages.verify(request, responses, self) @@ -102,7 +100,7 @@ class BlockingInvocationInlineServiceTestCase( requests = test_messages.requests() response = self.stub.blocking_stream_in_value_out( - name, iter(requests), _LONG_TIMEOUT) + name, iter(requests), test_constants.LONG_TIMEOUT) test_messages.verify(requests, response, self) @@ -113,7 +111,7 @@ class BlockingInvocationInlineServiceTestCase( requests = test_messages.requests() response_iterator = self.stub.inline_stream_in_stream_out( - name, iter(requests), _LONG_TIMEOUT) + name, iter(requests), test_constants.LONG_TIMEOUT) responses = list(response_iterator) test_messages.verify(requests, responses, self) @@ -126,12 +124,12 @@ class BlockingInvocationInlineServiceTestCase( second_request = test_messages.request() first_response = self.stub.blocking_value_in_value_out( - name, first_request, _TIMEOUT) + name, first_request, test_constants.SHORT_TIMEOUT) test_messages.verify(first_request, first_response, self) second_response = self.stub.blocking_value_in_value_out( - name, second_request, _TIMEOUT) + name, second_request, test_constants.SHORT_TIMEOUT) test_messages.verify(second_request, second_response, self) @@ -144,7 +142,7 @@ class BlockingInvocationInlineServiceTestCase( with self.control.pause(), self.assertRaises( exceptions.ExpirationError): multi_callable = self.stub.unary_unary_multi_callable(name) - multi_callable(request, _TIMEOUT) + multi_callable(request, test_constants.SHORT_TIMEOUT) def testExpiredUnaryRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -155,7 +153,7 @@ class BlockingInvocationInlineServiceTestCase( with self.control.pause(), self.assertRaises( exceptions.ExpirationError): response_iterator = self.stub.inline_value_in_stream_out( - name, request, _TIMEOUT) + name, request, test_constants.SHORT_TIMEOUT) list(response_iterator) def testExpiredStreamRequestUnaryResponse(self): @@ -167,7 +165,7 @@ class BlockingInvocationInlineServiceTestCase( with self.control.pause(), self.assertRaises( exceptions.ExpirationError): multi_callable = self.stub.stream_unary_multi_callable(name) - multi_callable(iter(requests), _TIMEOUT) + multi_callable(iter(requests), test_constants.SHORT_TIMEOUT) def testExpiredStreamRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -178,7 +176,7 @@ class BlockingInvocationInlineServiceTestCase( with self.control.pause(), self.assertRaises( exceptions.ExpirationError): response_iterator = self.stub.inline_stream_in_stream_out( - name, iter(requests), _TIMEOUT) + name, iter(requests), test_constants.SHORT_TIMEOUT) list(response_iterator) def testFailedUnaryRequestUnaryResponse(self): @@ -188,7 +186,8 @@ class BlockingInvocationInlineServiceTestCase( request = test_messages.request() with self.control.fail(), self.assertRaises(exceptions.ServicerError): - self.stub.blocking_value_in_value_out(name, request, _TIMEOUT) + self.stub.blocking_value_in_value_out(name, request, + test_constants.SHORT_TIMEOUT) def testFailedUnaryRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -198,7 +197,7 @@ class BlockingInvocationInlineServiceTestCase( with self.control.fail(), self.assertRaises(exceptions.ServicerError): response_iterator = self.stub.inline_value_in_stream_out( - name, request, _TIMEOUT) + name, request, test_constants.SHORT_TIMEOUT) list(response_iterator) def testFailedStreamRequestUnaryResponse(self): @@ -208,7 +207,8 @@ class BlockingInvocationInlineServiceTestCase( requests = test_messages.requests() with self.control.fail(), self.assertRaises(exceptions.ServicerError): - self.stub.blocking_stream_in_value_out(name, iter(requests), _TIMEOUT) + self.stub.blocking_stream_in_value_out(name, iter(requests), + test_constants.SHORT_TIMEOUT) def testFailedStreamRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -218,5 +218,5 @@ class BlockingInvocationInlineServiceTestCase( with self.control.fail(), self.assertRaises(exceptions.ServicerError): response_iterator = self.stub.inline_stream_in_stream_out( - name, iter(requests), _TIMEOUT) + name, iter(requests), test_constants.SHORT_TIMEOUT) list(response_iterator) diff --git a/src/python/grpcio_test/grpc_test/framework/face/testing/event_invocation_synchronous_event_service_test_case.py b/src/python/grpcio_test/grpc_test/framework/face/testing/event_invocation_synchronous_event_service_test_case.py index 18eed53d6e..9df77678eb 100644 --- a/src/python/grpcio_test/grpc_test/framework/face/testing/event_invocation_synchronous_event_service_test_case.py +++ b/src/python/grpcio_test/grpc_test/framework/face/testing/event_invocation_synchronous_event_service_test_case.py @@ -33,6 +33,7 @@ import abc import unittest from grpc.framework.face import interfaces +from grpc_test.framework.common import test_constants from grpc_test.framework.face.testing import callback as testing_callback from grpc_test.framework.face.testing import control from grpc_test.framework.face.testing import coverage @@ -40,8 +41,6 @@ from grpc_test.framework.face.testing import digest from grpc_test.framework.face.testing import stock_service from grpc_test.framework.face.testing import test_case -_TIMEOUT = 3 - class EventInvocationSynchronousEventServiceTestCase( test_case.FaceTestCase, coverage.FullCoverage): @@ -79,7 +78,8 @@ class EventInvocationSynchronousEventServiceTestCase( callback = testing_callback.Callback() self.stub.event_value_in_value_out( - name, request, callback.complete, callback.abort, _TIMEOUT) + name, request, callback.complete, callback.abort, + test_constants.SHORT_TIMEOUT) callback.block_until_terminated() response = callback.response() @@ -93,7 +93,8 @@ class EventInvocationSynchronousEventServiceTestCase( callback = testing_callback.Callback() self.stub.event_value_in_stream_out( - name, request, callback, callback.abort, _TIMEOUT) + name, request, callback, callback.abort, + test_constants.SHORT_TIMEOUT) callback.block_until_terminated() responses = callback.responses() @@ -107,7 +108,8 @@ class EventInvocationSynchronousEventServiceTestCase( callback = testing_callback.Callback() unused_call, request_consumer = self.stub.event_stream_in_value_out( - name, callback.complete, callback.abort, _TIMEOUT) + name, callback.complete, callback.abort, + test_constants.SHORT_TIMEOUT) for request in requests: request_consumer.consume(request) request_consumer.terminate() @@ -124,7 +126,7 @@ class EventInvocationSynchronousEventServiceTestCase( callback = testing_callback.Callback() unused_call, request_consumer = self.stub.event_stream_in_stream_out( - name, callback, callback.abort, _TIMEOUT) + name, callback, callback.abort, test_constants.SHORT_TIMEOUT) for request in requests: request_consumer.consume(request) request_consumer.terminate() @@ -147,11 +149,11 @@ class EventInvocationSynchronousEventServiceTestCase( first_callback.complete(first_response) self.stub.event_value_in_value_out( name, second_request, second_callback.complete, - second_callback.abort, _TIMEOUT) + second_callback.abort, test_constants.SHORT_TIMEOUT) self.stub.event_value_in_value_out( name, first_request, make_second_invocation, first_callback.abort, - _TIMEOUT) + test_constants.SHORT_TIMEOUT) second_callback.block_until_terminated() first_response = first_callback.response() @@ -168,7 +170,8 @@ class EventInvocationSynchronousEventServiceTestCase( with self.control.pause(): self.stub.event_value_in_value_out( - name, request, callback.complete, callback.abort, _TIMEOUT) + name, request, callback.complete, callback.abort, + test_constants.SHORT_TIMEOUT) callback.block_until_terminated() self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion()) @@ -182,7 +185,8 @@ class EventInvocationSynchronousEventServiceTestCase( with self.control.pause(): self.stub.event_value_in_stream_out( - name, request, callback, callback.abort, _TIMEOUT) + name, request, callback, callback.abort, + test_constants.SHORT_TIMEOUT) callback.block_until_terminated() self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion()) @@ -194,7 +198,8 @@ class EventInvocationSynchronousEventServiceTestCase( callback = testing_callback.Callback() self.stub.event_stream_in_value_out( - name, callback.complete, callback.abort, _TIMEOUT) + name, callback.complete, callback.abort, + test_constants.SHORT_TIMEOUT) callback.block_until_terminated() self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion()) @@ -207,7 +212,7 @@ class EventInvocationSynchronousEventServiceTestCase( callback = testing_callback.Callback() unused_call, request_consumer = self.stub.event_stream_in_stream_out( - name, callback, callback.abort, _TIMEOUT) + name, callback, callback.abort, test_constants.SHORT_TIMEOUT) for request in requests: request_consumer.consume(request) callback.block_until_terminated() @@ -223,10 +228,12 @@ class EventInvocationSynchronousEventServiceTestCase( with self.control.fail(): self.stub.event_value_in_value_out( - name, request, callback.complete, callback.abort, _TIMEOUT) + name, request, callback.complete, callback.abort, + test_constants.SHORT_TIMEOUT) callback.block_until_terminated() - self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, callback.abortion()) + self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, + callback.abortion()) def testFailedUnaryRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -237,10 +244,12 @@ class EventInvocationSynchronousEventServiceTestCase( with self.control.fail(): self.stub.event_value_in_stream_out( - name, request, callback, callback.abort, _TIMEOUT) + name, request, callback, callback.abort, + test_constants.SHORT_TIMEOUT) callback.block_until_terminated() - self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, callback.abortion()) + self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, + callback.abortion()) def testFailedStreamRequestUnaryResponse(self): for name, test_messages_sequence in ( @@ -251,13 +260,15 @@ class EventInvocationSynchronousEventServiceTestCase( with self.control.fail(): unused_call, request_consumer = self.stub.event_stream_in_value_out( - name, callback.complete, callback.abort, _TIMEOUT) + name, callback.complete, callback.abort, + test_constants.SHORT_TIMEOUT) for request in requests: request_consumer.consume(request) request_consumer.terminate() callback.block_until_terminated() - self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, callback.abortion()) + self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, + callback.abortion()) def testFailedStreamRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -268,7 +279,7 @@ class EventInvocationSynchronousEventServiceTestCase( with self.control.fail(): unused_call, request_consumer = self.stub.event_stream_in_stream_out( - name, callback, callback.abort, _TIMEOUT) + name, callback, callback.abort, test_constants.SHORT_TIMEOUT) for request in requests: request_consumer.consume(request) request_consumer.terminate() @@ -287,10 +298,10 @@ class EventInvocationSynchronousEventServiceTestCase( self.stub.event_value_in_value_out( name, first_request, first_callback.complete, first_callback.abort, - _TIMEOUT) + test_constants.SHORT_TIMEOUT) self.stub.event_value_in_value_out( name, second_request, second_callback.complete, - second_callback.abort, _TIMEOUT) + second_callback.abort, test_constants.SHORT_TIMEOUT) first_callback.block_until_terminated() second_callback.block_until_terminated() @@ -312,7 +323,8 @@ class EventInvocationSynchronousEventServiceTestCase( with self.control.pause(): call = self.stub.event_value_in_value_out( - name, request, callback.complete, callback.abort, _TIMEOUT) + name, request, callback.complete, callback.abort, + test_constants.SHORT_TIMEOUT) call.cancel() callback.block_until_terminated() @@ -326,7 +338,8 @@ class EventInvocationSynchronousEventServiceTestCase( callback = testing_callback.Callback() call = self.stub.event_value_in_stream_out( - name, request, callback, callback.abort, _TIMEOUT) + name, request, callback, callback.abort, + test_constants.SHORT_TIMEOUT) call.cancel() callback.block_until_terminated() @@ -340,7 +353,8 @@ class EventInvocationSynchronousEventServiceTestCase( callback = testing_callback.Callback() call, request_consumer = self.stub.event_stream_in_value_out( - name, callback.complete, callback.abort, _TIMEOUT) + name, callback.complete, callback.abort, + test_constants.SHORT_TIMEOUT) for request in requests: request_consumer.consume(request) call.cancel() @@ -355,7 +369,7 @@ class EventInvocationSynchronousEventServiceTestCase( callback = testing_callback.Callback() call, unused_request_consumer = self.stub.event_stream_in_stream_out( - name, callback, callback.abort, _TIMEOUT) + name, callback, callback.abort, test_constants.SHORT_TIMEOUT) call.cancel() callback.block_until_terminated() diff --git a/src/python/grpcio_test/grpc_test/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py b/src/python/grpcio_test/grpc_test/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py index 3b42914342..70d86a0422 100644 --- a/src/python/grpcio_test/grpc_test/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py +++ b/src/python/grpcio_test/grpc_test/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py @@ -37,13 +37,13 @@ import unittest from grpc.framework.face import exceptions from grpc.framework.foundation import future from grpc.framework.foundation import logging_pool +from grpc_test.framework.common import test_constants from grpc_test.framework.face.testing import control from grpc_test.framework.face.testing import coverage from grpc_test.framework.face.testing import digest from grpc_test.framework.face.testing import stock_service from grpc_test.framework.face.testing import test_case -_TIMEOUT = 3 _MAXIMUM_POOL_SIZE = 10 @@ -110,7 +110,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( request = test_messages.request() response_future = self.stub.future_value_in_value_out( - name, request, _TIMEOUT) + name, request, test_constants.SHORT_TIMEOUT) response = response_future.result() test_messages.verify(request, response, self) @@ -122,7 +122,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( request = test_messages.request() response_iterator = self.stub.inline_value_in_stream_out( - name, request, _TIMEOUT) + name, request, test_constants.SHORT_TIMEOUT) responses = list(response_iterator) test_messages.verify(request, responses, self) @@ -138,7 +138,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( # returned to calling code before the iterator yields any requests. with request_iterator.pause(): response_future = self.stub.future_stream_in_value_out( - name, request_iterator, _TIMEOUT) + name, request_iterator, test_constants.SHORT_TIMEOUT) response = response_future.result() test_messages.verify(requests, response, self) @@ -154,7 +154,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( # returned to calling code before the iterator yields any requests. with request_iterator.pause(): response_iterator = self.stub.inline_stream_in_stream_out( - name, request_iterator, _TIMEOUT) + name, request_iterator, test_constants.SHORT_TIMEOUT) responses = list(response_iterator) test_messages.verify(requests, responses, self) @@ -167,13 +167,13 @@ class FutureInvocationAsynchronousEventServiceTestCase( second_request = test_messages.request() first_response_future = self.stub.future_value_in_value_out( - name, first_request, _TIMEOUT) + name, first_request, test_constants.SHORT_TIMEOUT) first_response = first_response_future.result() test_messages.verify(first_request, first_response, self) second_response_future = self.stub.future_value_in_value_out( - name, second_request, _TIMEOUT) + name, second_request, test_constants.SHORT_TIMEOUT) second_response = second_response_future.result() test_messages.verify(second_request, second_response, self) @@ -186,7 +186,8 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.pause(): multi_callable = self.stub.unary_unary_multi_callable(name) - response_future = multi_callable.future(request, _TIMEOUT) + response_future = multi_callable.future(request, + test_constants.SHORT_TIMEOUT) self.assertIsInstance( response_future.exception(), exceptions.ExpirationError) with self.assertRaises(exceptions.ExpirationError): @@ -200,7 +201,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.pause(): response_iterator = self.stub.inline_value_in_stream_out( - name, request, _TIMEOUT) + name, request, test_constants.SHORT_TIMEOUT) with self.assertRaises(exceptions.ExpirationError): list(response_iterator) @@ -212,7 +213,8 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.pause(): multi_callable = self.stub.stream_unary_multi_callable(name) - response_future = multi_callable.future(iter(requests), _TIMEOUT) + response_future = multi_callable.future(iter(requests), + test_constants.SHORT_TIMEOUT) self.assertIsInstance( response_future.exception(), exceptions.ExpirationError) with self.assertRaises(exceptions.ExpirationError): @@ -226,7 +228,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.pause(): response_iterator = self.stub.inline_stream_in_stream_out( - name, iter(requests), _TIMEOUT) + name, iter(requests), test_constants.SHORT_TIMEOUT) with self.assertRaises(exceptions.ExpirationError): list(response_iterator) @@ -238,7 +240,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.fail(): response_future = self.stub.future_value_in_value_out( - name, request, _TIMEOUT) + name, request, test_constants.SHORT_TIMEOUT) # Because the servicer fails outside of the thread from which the # servicer-side runtime called into it its failure is @@ -261,7 +263,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( # expiration of the RPC. with self.control.fail(), self.assertRaises(exceptions.ExpirationError): response_iterator = self.stub.inline_value_in_stream_out( - name, request, _TIMEOUT) + name, request, test_constants.SHORT_TIMEOUT) list(response_iterator) def testFailedStreamRequestUnaryResponse(self): @@ -272,7 +274,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.fail(): response_future = self.stub.future_stream_in_value_out( - name, iter(requests), _TIMEOUT) + name, iter(requests), test_constants.SHORT_TIMEOUT) # Because the servicer fails outside of the thread from which the # servicer-side runtime called into it its failure is @@ -295,7 +297,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( # expiration of the RPC. with self.control.fail(), self.assertRaises(exceptions.ExpirationError): response_iterator = self.stub.inline_stream_in_stream_out( - name, iter(requests), _TIMEOUT) + name, iter(requests), test_constants.SHORT_TIMEOUT) list(response_iterator) def testParallelInvocations(self): @@ -305,10 +307,11 @@ class FutureInvocationAsynchronousEventServiceTestCase( first_request = test_messages.request() second_request = test_messages.request() + # TODO(bug 2039): use LONG_TIMEOUT instead first_response_future = self.stub.future_value_in_value_out( - name, first_request, _TIMEOUT) + name, first_request, test_constants.SHORT_TIMEOUT) second_response_future = self.stub.future_value_in_value_out( - name, second_request, _TIMEOUT) + name, second_request, test_constants.SHORT_TIMEOUT) first_response = first_response_future.result() second_response = second_response_future.result() @@ -327,7 +330,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.pause(): response_future = self.stub.future_value_in_value_out( - name, request, _TIMEOUT) + name, request, test_constants.SHORT_TIMEOUT) cancel_method_return_value = response_future.cancel() self.assertFalse(cancel_method_return_value) @@ -341,7 +344,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.pause(): response_iterator = self.stub.inline_value_in_stream_out( - name, request, _TIMEOUT) + name, request, test_constants.SHORT_TIMEOUT) response_iterator.cancel() with self.assertRaises(future.CancelledError): @@ -355,7 +358,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.pause(): response_future = self.stub.future_stream_in_value_out( - name, iter(requests), _TIMEOUT) + name, iter(requests), test_constants.SHORT_TIMEOUT) cancel_method_return_value = response_future.cancel() self.assertFalse(cancel_method_return_value) @@ -369,7 +372,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.pause(): response_iterator = self.stub.inline_stream_in_stream_out( - name, iter(requests), _TIMEOUT) + name, iter(requests), test_constants.SHORT_TIMEOUT) response_iterator.cancel() with self.assertRaises(future.CancelledError): -- cgit v1.2.3 From 1116b0b4fb538e68044cf42ed8307ce1db96a6b8 Mon Sep 17 00:00:00 2001 From: Masood Malekghassemi Date: Thu, 6 Aug 2015 12:37:05 -0700 Subject: Fix debug annotation typo in Python --- src/python/grpcio/grpc/_adapter/_c/types/server.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/python') diff --git a/src/python/grpcio/grpc/_adapter/_c/types/server.c b/src/python/grpcio/grpc/_adapter/_c/types/server.c index c2190ea672..2a11d09d21 100644 --- a/src/python/grpcio/grpc/_adapter/_c/types/server.c +++ b/src/python/grpcio/grpc/_adapter/_c/types/server.c @@ -96,7 +96,7 @@ Server *pygrpc_Server_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) PyObject *py_args; grpc_channel_args c_args; char *keywords[] = {"cq", "args", NULL}; - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O!O:Channel", keywords, + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O!O:Server", keywords, &pygrpc_CompletionQueue_type, &cq, &py_args)) { return NULL; } -- cgit v1.2.3 From 6107bf457a62d8c5035565acc2e1476240806fd1 Mon Sep 17 00:00:00 2001 From: Masood Malekghassemi Date: Thu, 6 Aug 2015 11:01:11 -0700 Subject: Set Python user-agent string --- src/python/grpcio/grpc/_adapter/_c/module.c | 6 ++++++ src/python/grpcio/grpc/_adapter/_low.py | 4 ++++ src/python/grpcio_test/grpc_test/_adapter/_low_test.py | 17 ++++++++++++----- 3 files changed, 22 insertions(+), 5 deletions(-) (limited to 'src/python') diff --git a/src/python/grpcio/grpc/_adapter/_c/module.c b/src/python/grpcio/grpc/_adapter/_c/module.c index 1f3aedd9d8..9b93b051f6 100644 --- a/src/python/grpcio/grpc/_adapter/_c/module.c +++ b/src/python/grpcio/grpc/_adapter/_c/module.c @@ -53,6 +53,12 @@ PyMODINIT_FUNC init_c(void) { return; } + if (PyModule_AddStringConstant( + module, "PRIMARY_USER_AGENT_KEY", + GRPC_ARG_PRIMARY_USER_AGENT_STRING) < 0) { + return; + } + /* GRPC maintains an internal counter of how many times it has been initialized and handles multiple pairs of grpc_init()/grpc_shutdown() invocations accordingly. */ diff --git a/src/python/grpcio/grpc/_adapter/_low.py b/src/python/grpcio/grpc/_adapter/_low.py index dcf67dbc11..239aac81b2 100644 --- a/src/python/grpcio/grpc/_adapter/_low.py +++ b/src/python/grpcio/grpc/_adapter/_low.py @@ -27,9 +27,12 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +from grpc import _grpcio_metadata from grpc._adapter import _c from grpc._adapter import _types +_USER_AGENT = 'Python-gRPC-{}'.format(_grpcio_metadata.__version__) + ClientCredentials = _c.ClientCredentials ServerCredentials = _c.ServerCredentials @@ -76,6 +79,7 @@ class Call(_types.Call): class Channel(_types.Channel): def __init__(self, target, args, creds=None): + args = list(args) + [(_c.PRIMARY_USER_AGENT_KEY, _USER_AGENT)] if creds is None: self.channel = _c.Channel(target, args) else: diff --git a/src/python/grpcio_test/grpc_test/_adapter/_low_test.py b/src/python/grpcio_test/grpc_test/_adapter/_low_test.py index 9a8edfad0c..b6583662f3 100644 --- a/src/python/grpcio_test/grpc_test/_adapter/_low_test.py +++ b/src/python/grpcio_test/grpc_test/_adapter/_low_test.py @@ -31,11 +31,12 @@ import threading import time import unittest +from grpc import _grpcio_metadata from grpc._adapter import _types from grpc._adapter import _low -def WaitForEvents(completion_queues, deadline): +def wait_for_events(completion_queues, deadline): """ Args: completion_queues: list of completion queues to wait for events on @@ -62,6 +63,7 @@ def WaitForEvents(completion_queues, deadline): thread.join() return results + class InsecureServerInsecureClient(unittest.TestCase): def setUp(self): @@ -123,16 +125,21 @@ class InsecureServerInsecureClient(unittest.TestCase): ], client_call_tag) self.assertEquals(_types.CallError.OK, client_start_batch_result) - client_no_event, request_event, = WaitForEvents([self.client_completion_queue, self.server_completion_queue], time.time() + 2) + client_no_event, request_event, = wait_for_events([self.client_completion_queue, self.server_completion_queue], time.time() + 2) self.assertEquals(client_no_event, None) self.assertEquals(_types.EventType.OP_COMPLETE, request_event.type) self.assertIsInstance(request_event.call, _low.Call) self.assertIs(server_request_tag, request_event.tag) self.assertEquals(1, len(request_event.results)) - got_initial_metadata = dict(request_event.results[0].initial_metadata) + received_initial_metadata = dict(request_event.results[0].initial_metadata) + # Check that our metadata were transmitted self.assertEquals( dict(client_initial_metadata), - dict((x, got_initial_metadata[x]) for x in zip(*client_initial_metadata)[0])) + dict((x, received_initial_metadata[x]) for x in zip(*client_initial_metadata)[0])) + # Check that Python's user agent string is a part of the full user agent + # string + self.assertIn('Python-gRPC-{}'.format(_grpcio_metadata.__version__), + received_initial_metadata['user-agent']) self.assertEquals(METHOD, request_event.call_details.method) self.assertEquals(HOST, request_event.call_details.host) self.assertLess(abs(DEADLINE - request_event.call_details.deadline), DEADLINE_TOLERANCE) @@ -150,7 +157,7 @@ class InsecureServerInsecureClient(unittest.TestCase): ], server_call_tag) self.assertEquals(_types.CallError.OK, server_start_batch_result) - client_event, server_event, = WaitForEvents([self.client_completion_queue, self.server_completion_queue], time.time() + 1) + client_event, server_event, = wait_for_events([self.client_completion_queue, self.server_completion_queue], time.time() + 1) self.assertEquals(6, len(client_event.results)) found_client_op_types = set() -- cgit v1.2.3 From 9adf796d0687303f8e39b04f1235d6ef46849f73 Mon Sep 17 00:00:00 2001 From: Masood Malekghassemi Date: Thu, 6 Aug 2015 11:30:07 -0700 Subject: Implement timeout interop test for Python --- .../grpcio_test/grpc_interop/_interop_test_case.py | 3 +++ src/python/grpcio_test/grpc_interop/methods.py | 23 ++++++++++++++++++++++ 2 files changed, 26 insertions(+) (limited to 'src/python') diff --git a/src/python/grpcio_test/grpc_interop/_interop_test_case.py b/src/python/grpcio_test/grpc_interop/_interop_test_case.py index ed8f7ef009..b6d06b300d 100644 --- a/src/python/grpcio_test/grpc_interop/_interop_test_case.py +++ b/src/python/grpcio_test/grpc_interop/_interop_test_case.py @@ -59,3 +59,6 @@ class InteropTestCase(object): def testCancelAfterFirstResponse(self): methods.TestCase.CANCEL_AFTER_FIRST_RESPONSE.test_interoperability(self.stub, None) + + def testTimeoutOnSleepingServer(self): + methods.TestCase.TIMEOUT_ON_SLEEPING_SERVER.test_interoperability(self.stub, None) diff --git a/src/python/grpcio_test/grpc_interop/methods.py b/src/python/grpcio_test/grpc_interop/methods.py index f4c94685ee..7a831f3cbd 100644 --- a/src/python/grpcio_test/grpc_interop/methods.py +++ b/src/python/grpcio_test/grpc_interop/methods.py @@ -33,10 +33,12 @@ import enum import json import os import threading +import time from oauth2client import client as oauth2client_client from grpc.framework.alpha import utilities +from grpc.framework.alpha import exceptions from grpc_interop import empty_pb2 from grpc_interop import messages_pb2 @@ -318,6 +320,24 @@ def _cancel_after_first_response(stub): raise ValueError('expected call to be cancelled') +def _timeout_on_sleeping_server(stub): + request_payload_size = 27182 + with stub, _Pipe() as pipe: + response_iterator = stub.FullDuplexCall(pipe, 0.001) + + request = messages_pb2.StreamingOutputCallRequest( + response_type=messages_pb2.COMPRESSABLE, + payload=messages_pb2.Payload(body=b'\x00' * request_payload_size)) + pipe.add(request) + time.sleep(0.1) + try: + next(response_iterator) + except exceptions.ExpirationError: + pass + else: + raise ValueError('expected call to exceed deadline') + + def _compute_engine_creds(stub, args): response = _large_unary_common_behavior(stub, True, True) if args.default_service_account != response.username: @@ -351,6 +371,7 @@ class TestCase(enum.Enum): CANCEL_AFTER_FIRST_RESPONSE = 'cancel_after_first_response' COMPUTE_ENGINE_CREDS = 'compute_engine_creds' SERVICE_ACCOUNT_CREDS = 'service_account_creds' + TIMEOUT_ON_SLEEPING_SERVER = 'timeout_on_sleeping_server' def test_interoperability(self, stub, args): if self is TestCase.EMPTY_UNARY: @@ -367,6 +388,8 @@ class TestCase(enum.Enum): _cancel_after_begin(stub) elif self is TestCase.CANCEL_AFTER_FIRST_RESPONSE: _cancel_after_first_response(stub) + elif self is TestCase.TIMEOUT_ON_SLEEPING_SERVER: + _timeout_on_sleeping_server(stub) elif self is TestCase.COMPUTE_ENGINE_CREDS: _compute_engine_creds(stub, args) elif self is TestCase.SERVICE_ACCOUNT_CREDS: -- cgit v1.2.3 From be55ca770d99da1ef59170e744f87ada0645d2fa Mon Sep 17 00:00:00 2001 From: Masood Malekghassemi Date: Mon, 10 Aug 2015 15:49:21 -0700 Subject: Expose new core functionality to Python --- src/python/grpcio/grpc/_adapter/_c/types.h | 9 ++ src/python/grpcio/grpc/_adapter/_c/types/call.c | 8 ++ src/python/grpcio/grpc/_adapter/_c/types/channel.c | 54 +++++++++++- src/python/grpcio/grpc/_adapter/_c/utility.c | 21 ++++- .../grpcio/grpc/_adapter/_intermediary_low.py | 2 +- src/python/grpcio/grpc/_adapter/_low.py | 14 ++++ src/python/grpcio/grpc/_adapter/_types.py | 95 ++++++++++++++++++---- .../grpcio_test/grpc_test/_adapter/_low_test.py | 13 ++- 8 files changed, 196 insertions(+), 20 deletions(-) (limited to 'src/python') diff --git a/src/python/grpcio/grpc/_adapter/_c/types.h b/src/python/grpcio/grpc/_adapter/_c/types.h index 4e0da4a28a..f646465c63 100644 --- a/src/python/grpcio/grpc/_adapter/_c/types.h +++ b/src/python/grpcio/grpc/_adapter/_c/types.h @@ -113,6 +113,7 @@ Call *pygrpc_Call_new_empty(CompletionQueue *cq); void pygrpc_Call_dealloc(Call *self); PyObject *pygrpc_Call_start_batch(Call *self, PyObject *args, PyObject *kwargs); PyObject *pygrpc_Call_cancel(Call *self, PyObject *args, PyObject *kwargs); +PyObject *pygrpc_Call_peer(Call *self); extern PyTypeObject pygrpc_Call_type; @@ -129,6 +130,11 @@ Channel *pygrpc_Channel_new( void pygrpc_Channel_dealloc(Channel *self); Call *pygrpc_Channel_create_call( Channel *self, PyObject *args, PyObject *kwargs); +PyObject *pygrpc_Channel_check_connectivity_state(Channel *self, PyObject *args, + PyObject *kwargs); +PyObject *pygrpc_Channel_watch_connectivity_state(Channel *self, PyObject *args, + PyObject *kwargs); +PyObject *pygrpc_Channel_target(Channel *self); extern PyTypeObject pygrpc_Channel_type; @@ -181,6 +187,9 @@ pygrpc_tag *pygrpc_produce_request_tag(PyObject *user_tag, Call *empty_call); /* Construct a tag associated with a server shutdown. */ pygrpc_tag *pygrpc_produce_server_shutdown_tag(PyObject *user_tag); +/* Construct a tag associated with a channel state change. */ +pygrpc_tag *pygrpc_produce_channel_state_change_tag(PyObject *user_tag); + /* Frees all resources owned by the tag and the tag itself. */ void pygrpc_discard_tag(pygrpc_tag *tag); diff --git a/src/python/grpcio/grpc/_adapter/_c/types/call.c b/src/python/grpcio/grpc/_adapter/_c/types/call.c index 0739070044..5e46605c45 100644 --- a/src/python/grpcio/grpc/_adapter/_c/types/call.c +++ b/src/python/grpcio/grpc/_adapter/_c/types/call.c @@ -42,6 +42,7 @@ PyMethodDef pygrpc_Call_methods[] = { {"start_batch", (PyCFunction)pygrpc_Call_start_batch, METH_KEYWORDS, ""}, {"cancel", (PyCFunction)pygrpc_Call_cancel, METH_KEYWORDS, ""}, + {"peer", (PyCFunction)pygrpc_Call_peer, METH_NOARGS, ""}, {NULL} }; const char pygrpc_Call_doc[] = "See grpc._adapter._types.Call."; @@ -161,3 +162,10 @@ PyObject *pygrpc_Call_cancel(Call *self, PyObject *args, PyObject *kwargs) { } return PyInt_FromLong(errcode); } + +PyObject *pygrpc_Call_peer(Call *self) { + char *peer = grpc_call_get_peer(self->c_call); + PyObject *py_peer = PyString_FromString(peer); + gpr_free(peer); + return py_peer; +} diff --git a/src/python/grpcio/grpc/_adapter/_c/types/channel.c b/src/python/grpcio/grpc/_adapter/_c/types/channel.c index 963104742f..eb9d43d154 100644 --- a/src/python/grpcio/grpc/_adapter/_c/types/channel.c +++ b/src/python/grpcio/grpc/_adapter/_c/types/channel.c @@ -36,10 +36,14 @@ #define PY_SSIZE_T_CLEAN #include #include +#include PyMethodDef pygrpc_Channel_methods[] = { {"create_call", (PyCFunction)pygrpc_Channel_create_call, METH_KEYWORDS, ""}, + {"check_connectivity_state", (PyCFunction)pygrpc_Channel_check_connectivity_state, METH_KEYWORDS, ""}, + {"watch_connectivity_state", (PyCFunction)pygrpc_Channel_watch_connectivity_state, METH_KEYWORDS, ""}, + {"target", (PyCFunction)pygrpc_Channel_target, METH_NOARGS, ""}, {NULL} }; const char pygrpc_Channel_doc[] = "See grpc._adapter._types.Channel."; @@ -122,7 +126,7 @@ Call *pygrpc_Channel_create_call( const char *host; double deadline; char *keywords[] = {"cq", "method", "host", "deadline", NULL}; - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O!ssd:create_call", keywords, + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O!szd:create_call", keywords, &pygrpc_CompletionQueue_type, &cq, &method, &host, &deadline)) { return NULL; } @@ -132,3 +136,51 @@ Call *pygrpc_Channel_create_call( pygrpc_cast_double_to_gpr_timespec(deadline)); return call; } + +PyObject *pygrpc_Channel_check_connectivity_state( + Channel *self, PyObject *args, PyObject *kwargs) { + PyObject *py_try_to_connect; + int try_to_connect; + char *keywords[] = {"try_to_connect", NULL}; + grpc_connectivity_state state; + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O:connectivity_state", keywords, + &py_try_to_connect)) { + return NULL; + } + if (!PyBool_Check(py_try_to_connect)) { + Py_XDECREF(py_try_to_connect); + return NULL; + } + try_to_connect = Py_True == py_try_to_connect; + Py_DECREF(py_try_to_connect); + state = grpc_channel_check_connectivity_state(self->c_chan, try_to_connect); + return PyInt_FromLong(state); +} + +PyObject *pygrpc_Channel_watch_connectivity_state( + Channel *self, PyObject *args, PyObject *kwargs) { + PyObject *tag; + double deadline; + int last_observed_state; + CompletionQueue *completion_queue; + char *keywords[] = {"last_observed_state", "deadline", + "completion_queue", "tag"}; + if (!PyArg_ParseTupleAndKeywords( + args, kwargs, "idO!O:watch_connectivity_state", keywords, + &last_observed_state, &deadline, &pygrpc_CompletionQueue_type, + &completion_queue, &tag)) { + return NULL; + } + grpc_channel_watch_connectivity_state( + self->c_chan, (grpc_connectivity_state)last_observed_state, + pygrpc_cast_double_to_gpr_timespec(deadline), completion_queue->c_cq, + pygrpc_produce_channel_state_change_tag(tag)); + Py_RETURN_NONE; +} + +PyObject *pygrpc_Channel_target(Channel *self) { + char *target = grpc_channel_get_target(self->c_chan); + PyObject *py_target = PyString_FromString(target); + gpr_free(target); + return py_target; +} diff --git a/src/python/grpcio/grpc/_adapter/_c/utility.c b/src/python/grpcio/grpc/_adapter/_c/utility.c index 51f3c9be01..2eea0e18ef 100644 --- a/src/python/grpcio/grpc/_adapter/_c/utility.c +++ b/src/python/grpcio/grpc/_adapter/_c/utility.c @@ -88,6 +88,19 @@ pygrpc_tag *pygrpc_produce_server_shutdown_tag(PyObject *user_tag) { return tag; } +pygrpc_tag *pygrpc_produce_channel_state_change_tag(PyObject *user_tag) { + pygrpc_tag *tag = gpr_malloc(sizeof(pygrpc_tag)); + tag->user_tag = user_tag; + Py_XINCREF(tag->user_tag); + tag->call = NULL; + tag->ops = NULL; + tag->nops = 0; + grpc_call_details_init(&tag->request_call_details); + grpc_metadata_array_init(&tag->request_metadata); + tag->is_new_call = 0; + return tag; +} + void pygrpc_discard_tag(pygrpc_tag *tag) { if (!tag) { return; @@ -139,7 +152,7 @@ PyObject *pygrpc_consume_event(grpc_event event) { } int pygrpc_produce_op(PyObject *op, grpc_op *result) { - static const int OP_TUPLE_SIZE = 5; + static const int OP_TUPLE_SIZE = 6; static const int STATUS_TUPLE_SIZE = 2; static const int TYPE_INDEX = 0; static const int INITIAL_METADATA_INDEX = 1; @@ -148,6 +161,7 @@ int pygrpc_produce_op(PyObject *op, grpc_op *result) { static const int STATUS_INDEX = 4; static const int STATUS_CODE_INDEX = 0; static const int STATUS_DETAILS_INDEX = 1; + static const int WRITE_FLAGS_INDEX = 5; int type; Py_ssize_t message_size; char *message; @@ -170,7 +184,10 @@ int pygrpc_produce_op(PyObject *op, grpc_op *result) { return 0; } c_op.op = type; - c_op.flags = 0; + c_op.flags = PyInt_AsLong(PyTuple_GET_ITEM(op, WRITE_FLAGS_INDEX)); + if (PyErr_Occurred()) { + return 0; + } switch (type) { case GRPC_OP_SEND_INITIAL_METADATA: if (!pygrpc_cast_pyseq_to_send_metadata( diff --git a/src/python/grpcio/grpc/_adapter/_intermediary_low.py b/src/python/grpcio/grpc/_adapter/_intermediary_low.py index 3c7f0a2619..e7bf9dc462 100644 --- a/src/python/grpcio/grpc/_adapter/_intermediary_low.py +++ b/src/python/grpcio/grpc/_adapter/_intermediary_low.py @@ -127,7 +127,7 @@ class Call(object): def write(self, message, tag): return self._internal.start_batch([ - _types.OpArgs.send_message(message) + _types.OpArgs.send_message(message, 0) ], _TagAdapter(tag, Event.Kind.WRITE_ACCEPTED)) def complete(self, tag): diff --git a/src/python/grpcio/grpc/_adapter/_low.py b/src/python/grpcio/grpc/_adapter/_low.py index dcf67dbc11..06d30c1b32 100644 --- a/src/python/grpcio/grpc/_adapter/_low.py +++ b/src/python/grpcio/grpc/_adapter/_low.py @@ -72,6 +72,9 @@ class Call(_types.Call): else: return self.call.cancel(code, details) + def peer(self): + return self.call.peer() + class Channel(_types.Channel): @@ -84,6 +87,17 @@ class Channel(_types.Channel): def create_call(self, completion_queue, method, host, deadline=None): return Call(self.channel.create_call(completion_queue.completion_queue, method, host, deadline)) + def check_connectivity_state(self, try_to_connect): + return self.channel.check_connectivity_state(try_to_connect) + + def watch_connectivity_state(self, last_observed_state, deadline, + completion_queue, tag): + self.channel.watch_connectivity_state( + last_observed_state, deadline, completion_queue.completion_queue, tag) + + def target(self): + return self.channel.target() + _NO_TAG = object() diff --git a/src/python/grpcio/grpc/_adapter/_types.py b/src/python/grpcio/grpc/_adapter/_types.py index 5ddb1774ea..5470d2de4a 100644 --- a/src/python/grpcio/grpc/_adapter/_types.py +++ b/src/python/grpcio/grpc/_adapter/_types.py @@ -31,13 +31,12 @@ import abc import collections import enum -# TODO(atash): decide whether or not to move these enums to the _c module to -# force build errors with upstream changes. class GrpcChannelArgumentKeys(enum.Enum): """Mirrors keys used in grpc_channel_args for GRPC-specific arguments.""" SSL_TARGET_NAME_OVERRIDE = 'grpc.ssl_target_name_override' + @enum.unique class CallError(enum.IntEnum): """Mirrors grpc_call_error in the C core.""" @@ -53,6 +52,7 @@ class CallError(enum.IntEnum): ERROR_INVALID_FLAGS = 9 ERROR_INVALID_METADATA = 10 + @enum.unique class StatusCode(enum.IntEnum): """Mirrors grpc_status_code in the C core.""" @@ -74,6 +74,14 @@ class StatusCode(enum.IntEnum): DATA_LOSS = 15 UNAUTHENTICATED = 16 + +@enum.unique +class OpWriteFlags(enum.IntEnum): + """Mirrors defined write-flag constants in the C core.""" + WRITE_BUFFER_HINT = 1 + WRITE_NO_COMPRESS = 2 + + @enum.unique class OpType(enum.IntEnum): """Mirrors grpc_op_type in the C core.""" @@ -86,12 +94,24 @@ class OpType(enum.IntEnum): RECV_STATUS_ON_CLIENT = 6 RECV_CLOSE_ON_SERVER = 7 + @enum.unique class EventType(enum.IntEnum): """Mirrors grpc_completion_type in the C core.""" - QUEUE_SHUTDOWN = 0 - QUEUE_TIMEOUT = 1 # if seen on the Python side, something went horridly wrong - OP_COMPLETE = 2 + QUEUE_SHUTDOWN = 0 + QUEUE_TIMEOUT = 1 # if seen on the Python side, something went horridly wrong + OP_COMPLETE = 2 + + +@enum.unique +class ConnectivityState(enum.IntEnum): + """Mirrors grpc_connectivity_state in the C core.""" + IDLE = 0 + CONNECTING = 1 + READY = 2 + TRANSIENT_FAILURE = 3 + FATAL_FAILURE = 4 + class Status(collections.namedtuple( 'Status', [ @@ -105,6 +125,7 @@ class Status(collections.namedtuple( details (str): ... """ + class CallDetails(collections.namedtuple( 'CallDetails', [ 'method', @@ -119,6 +140,7 @@ class CallDetails(collections.namedtuple( deadline (float): ... """ + class OpArgs(collections.namedtuple( 'OpArgs', [ 'type', @@ -126,6 +148,7 @@ class OpArgs(collections.namedtuple( 'trailing_metadata', 'message', 'status', + 'write_flags', ])): """Arguments passed into a GRPC operation. @@ -138,39 +161,40 @@ class OpArgs(collections.namedtuple( message (bytes): Only valid if type == OpType.SEND_MESSAGE, else is None. status (Status): Only valid if type == OpType.SEND_STATUS_FROM_SERVER, else is None. + write_flags (int): a bit OR'ing of 0 or more OpWriteFlags values. """ @staticmethod def send_initial_metadata(initial_metadata): - return OpArgs(OpType.SEND_INITIAL_METADATA, initial_metadata, None, None, None) + return OpArgs(OpType.SEND_INITIAL_METADATA, initial_metadata, None, None, None, 0) @staticmethod - def send_message(message): - return OpArgs(OpType.SEND_MESSAGE, None, None, message, None) + def send_message(message, flags): + return OpArgs(OpType.SEND_MESSAGE, None, None, message, None, flags) @staticmethod def send_close_from_client(): - return OpArgs(OpType.SEND_CLOSE_FROM_CLIENT, None, None, None, None) + return OpArgs(OpType.SEND_CLOSE_FROM_CLIENT, None, None, None, None, 0) @staticmethod def send_status_from_server(trailing_metadata, status_code, status_details): - return OpArgs(OpType.SEND_STATUS_FROM_SERVER, None, trailing_metadata, None, Status(status_code, status_details)) + return OpArgs(OpType.SEND_STATUS_FROM_SERVER, None, trailing_metadata, None, Status(status_code, status_details), 0) @staticmethod def recv_initial_metadata(): - return OpArgs(OpType.RECV_INITIAL_METADATA, None, None, None, None); + return OpArgs(OpType.RECV_INITIAL_METADATA, None, None, None, None, 0); @staticmethod def recv_message(): - return OpArgs(OpType.RECV_MESSAGE, None, None, None, None) + return OpArgs(OpType.RECV_MESSAGE, None, None, None, None, 0) @staticmethod def recv_status_on_client(): - return OpArgs(OpType.RECV_STATUS_ON_CLIENT, None, None, None, None) + return OpArgs(OpType.RECV_STATUS_ON_CLIENT, None, None, None, None, 0) @staticmethod def recv_close_on_server(): - return OpArgs(OpType.RECV_CLOSE_ON_SERVER, None, None, None, None) + return OpArgs(OpType.RECV_CLOSE_ON_SERVER, None, None, None, None, 0) class OpResult(collections.namedtuple( @@ -290,6 +314,15 @@ class Call: """ return CallError.ERROR + @abc.abstractmethod + def peer(self): + """Get the peer of this call. + + Returns: + str: the peer of this call. + """ + return None + class Channel: __metaclass__ = abc.ABCMeta @@ -321,6 +354,40 @@ class Channel: """ return None + @abc.abstractmethod + def check_connectivity_state(self, try_to_connect): + """Check and optionally repair the connectivity state of the channel. + + Args: + try_to_connect (bool): whether or not to try to connect the channel if + disconnected. + + Returns: + ConnectivityState: state of the channel at the time of this invocation. + """ + return None + + @abc.abstractmethod + def watch_connectivity_state(self, last_observed_state, deadline, + completion_queue, tag): + """Watch for connectivity state changes from the last_observed_state. + + Args: + last_observed_state (ConnectivityState): ... + deadline (float): ... + completion_queue (CompletionQueue): ... + tag (object) ... + """ + + @abc.abstractmethod + def target(self): + """Get the target of this channel. + + Returns: + str: the target of this channel. + """ + return None + class Server: __metaclass__ = abc.ABCMeta diff --git a/src/python/grpcio_test/grpc_test/_adapter/_low_test.py b/src/python/grpcio_test/grpc_test/_adapter/_low_test.py index 9a8edfad0c..1e3aeb1787 100644 --- a/src/python/grpcio_test/grpc_test/_adapter/_low_test.py +++ b/src/python/grpcio_test/grpc_test/_adapter/_low_test.py @@ -115,7 +115,7 @@ class InsecureServerInsecureClient(unittest.TestCase): client_initial_metadata = [(CLIENT_METADATA_ASCII_KEY, CLIENT_METADATA_ASCII_VALUE), (CLIENT_METADATA_BIN_KEY, CLIENT_METADATA_BIN_VALUE)] client_start_batch_result = client_call.start_batch([ _types.OpArgs.send_initial_metadata(client_initial_metadata), - _types.OpArgs.send_message(REQUEST), + _types.OpArgs.send_message(REQUEST, 0), _types.OpArgs.send_close_from_client(), _types.OpArgs.recv_initial_metadata(), _types.OpArgs.recv_message(), @@ -137,6 +137,15 @@ class InsecureServerInsecureClient(unittest.TestCase): self.assertEquals(HOST, request_event.call_details.host) self.assertLess(abs(DEADLINE - request_event.call_details.deadline), DEADLINE_TOLERANCE) + # Check that the channel is connected, and that both it and the call have + # the proper target and peer; do this after the first flurry of messages to + # avoid the possibility that connection was delayed by the core until the + # first message was sent. + self.assertEqual(_types.ConnectivityState.READY, + self.client_channel.check_connectivity_state(False)) + self.assertIsNotNone(self.client_channel.target()) + self.assertIsNotNone(client_call.peer()) + server_call_tag = object() server_call = request_event.call server_initial_metadata = [(SERVER_INITIAL_METADATA_KEY, SERVER_INITIAL_METADATA_VALUE)] @@ -144,7 +153,7 @@ class InsecureServerInsecureClient(unittest.TestCase): server_start_batch_result = server_call.start_batch([ _types.OpArgs.send_initial_metadata(server_initial_metadata), _types.OpArgs.recv_message(), - _types.OpArgs.send_message(RESPONSE), + _types.OpArgs.send_message(RESPONSE, 0), _types.OpArgs.recv_close_on_server(), _types.OpArgs.send_status_from_server(server_trailing_metadata, SERVER_STATUS_CODE, SERVER_STATUS_DETAILS) ], server_call_tag) -- cgit v1.2.3