aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio/tests/unit
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio/tests/unit')
-rw-r--r--src/python/grpcio/tests/unit/_compression_test.py133
-rw-r--r--src/python/grpcio/tests/unit/_exit_scenarios.py249
-rw-r--r--src/python/grpcio/tests/unit/_exit_test.py185
3 files changed, 567 insertions, 0 deletions
diff --git a/src/python/grpcio/tests/unit/_compression_test.py b/src/python/grpcio/tests/unit/_compression_test.py
new file mode 100644
index 0000000000..6eddb6f83f
--- /dev/null
+++ b/src/python/grpcio/tests/unit/_compression_test.py
@@ -0,0 +1,133 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+"""Tests server and client side compression."""
+
+import unittest
+
+import grpc
+from grpc import _grpcio_metadata
+from grpc.framework.foundation import logging_pool
+
+from tests.unit import test_common
+from tests.unit.framework.common import test_constants
+
+_UNARY_UNARY = b'/test/UnaryUnary'
+_STREAM_STREAM = b'/test/StreamStream'
+
+
+def handle_unary(request, servicer_context):
+ servicer_context.send_initial_metadata([
+ ('grpc-internal-encoding-request', 'gzip')])
+ return request
+
+
+def handle_stream(request_iterator, servicer_context):
+ # TODO(issue:#6891) We should be able to remove this loop,
+ # and replace with return; yield
+ servicer_context.send_initial_metadata([
+ ('grpc-internal-encoding-request', 'gzip')])
+ for request in request_iterator:
+ yield request
+
+
+class _MethodHandler(grpc.RpcMethodHandler):
+
+ def __init__(self, request_streaming, response_streaming):
+ self.request_streaming = request_streaming
+ self.response_streaming = response_streaming
+ self.request_deserializer = None
+ self.response_serializer = None
+ self.unary_unary = None
+ self.unary_stream = None
+ self.stream_unary = None
+ self.stream_stream = None
+ if self.request_streaming and self.response_streaming:
+ self.stream_stream = lambda x, y: handle_stream(x, y)
+ elif not self.request_streaming and not self.response_streaming:
+ self.unary_unary = lambda x, y: handle_unary(x, y)
+
+
+class _GenericHandler(grpc.GenericRpcHandler):
+
+ def service(self, handler_call_details):
+ if handler_call_details.method == _UNARY_UNARY:
+ return _MethodHandler(False, False)
+ elif handler_call_details.method == _STREAM_STREAM:
+ return _MethodHandler(True, True)
+ else:
+ return None
+
+
+class CompressionTest(unittest.TestCase):
+
+ def setUp(self):
+ self._server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
+ self._server = grpc.server((_GenericHandler(),), self._server_pool)
+ self._port = self._server.add_insecure_port('[::]:0')
+ self._server.start()
+
+ def testUnary(self):
+ request = b'\x00' * 100
+
+ # Client -> server compressed through default client channel compression
+ # settings. Server -> client compressed via server-side metadata setting.
+ # TODO(https://github.com/grpc/grpc/issues/4078): replace the "1" integer
+ # literal with proper use of the public API.
+ compressed_channel = grpc.insecure_channel('localhost:%d' % self._port,
+ options=[('grpc.default_compression_algorithm', 1)])
+ multi_callable = compressed_channel.unary_unary(_UNARY_UNARY)
+ response = multi_callable(request)
+ self.assertEqual(request, response)
+
+ # Client -> server compressed through client metadata setting. Server ->
+ # client compressed via server-side metadata setting.
+ # TODO(https://github.com/grpc/grpc/issues/4078): replace the "0" integer
+ # literal with proper use of the public API.
+ uncompressed_channel = grpc.insecure_channel('localhost:%d' % self._port,
+ options=[('grpc.default_compression_algorithm', 0)])
+ multi_callable = compressed_channel.unary_unary(_UNARY_UNARY)
+ response = multi_callable(request, metadata=[
+ ('grpc-internal-encoding-request', 'gzip')])
+ self.assertEqual(request, response)
+
+ def testStreaming(self):
+ request = b'\x00' * 100
+
+ # TODO(https://github.com/grpc/grpc/issues/4078): replace the "1" integer
+ # literal with proper use of the public API.
+ compressed_channel = grpc.insecure_channel('localhost:%d' % self._port,
+ options=[('grpc.default_compression_algorithm', 1)])
+ multi_callable = compressed_channel.stream_stream(_STREAM_STREAM)
+ call = multi_callable([request] * test_constants.STREAM_LENGTH)
+ for response in call:
+ self.assertEqual(request, response)
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio/tests/unit/_exit_scenarios.py b/src/python/grpcio/tests/unit/_exit_scenarios.py
new file mode 100644
index 0000000000..24a2faef85
--- /dev/null
+++ b/src/python/grpcio/tests/unit/_exit_scenarios.py
@@ -0,0 +1,249 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Defines a number of module-scope gRPC scenarios to test clean exit."""
+
+import argparse
+import threading
+import time
+
+import grpc
+
+from tests.unit.framework.common import test_constants
+
+WAIT_TIME = 1000
+
+REQUEST = b'request'
+
+UNSTARTED_SERVER = 'unstarted_server'
+RUNNING_SERVER = 'running_server'
+POLL_CONNECTIVITY_NO_SERVER = 'poll_connectivity_no_server'
+POLL_CONNECTIVITY = 'poll_connectivity'
+IN_FLIGHT_UNARY_UNARY_CALL = 'in_flight_unary_unary_call'
+IN_FLIGHT_UNARY_STREAM_CALL = 'in_flight_unary_stream_call'
+IN_FLIGHT_STREAM_UNARY_CALL = 'in_flight_stream_unary_call'
+IN_FLIGHT_STREAM_STREAM_CALL = 'in_flight_stream_stream_call'
+IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL = 'in_flight_partial_unary_stream_call'
+IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL = 'in_flight_partial_stream_unary_call'
+IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL = 'in_flight_partial_stream_stream_call'
+
+UNARY_UNARY = b'/test/UnaryUnary'
+UNARY_STREAM = b'/test/UnaryStream'
+STREAM_UNARY = b'/test/StreamUnary'
+STREAM_STREAM = b'/test/StreamStream'
+PARTIAL_UNARY_STREAM = b'/test/PartialUnaryStream'
+PARTIAL_STREAM_UNARY = b'/test/PartialStreamUnary'
+PARTIAL_STREAM_STREAM = b'/test/PartialStreamStream'
+
+TEST_TO_METHOD = {
+ IN_FLIGHT_UNARY_UNARY_CALL: UNARY_UNARY,
+ IN_FLIGHT_UNARY_STREAM_CALL: UNARY_STREAM,
+ IN_FLIGHT_STREAM_UNARY_CALL: STREAM_UNARY,
+ IN_FLIGHT_STREAM_STREAM_CALL: STREAM_STREAM,
+ IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL: PARTIAL_UNARY_STREAM,
+ IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL: PARTIAL_STREAM_UNARY,
+ IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL: PARTIAL_STREAM_STREAM,
+}
+
+
+def hang_unary_unary(request, servicer_context):
+ time.sleep(WAIT_TIME)
+
+
+def hang_unary_stream(request, servicer_context):
+ time.sleep(WAIT_TIME)
+
+
+def hang_partial_unary_stream(request, servicer_context):
+ for _ in range(test_constants.STREAM_LENGTH // 2):
+ yield request
+ time.sleep(WAIT_TIME)
+
+
+def hang_stream_unary(request_iterator, servicer_context):
+ time.sleep(WAIT_TIME)
+
+
+def hang_partial_stream_unary(request_iterator, servicer_context):
+ for _ in range(test_constants.STREAM_LENGTH // 2):
+ next(request_iterator)
+ time.sleep(WAIT_TIME)
+
+
+def hang_stream_stream(request_iterator, servicer_context):
+ time.sleep(WAIT_TIME)
+
+
+def hang_partial_stream_stream(request_iterator, servicer_context):
+ for _ in range(test_constants.STREAM_LENGTH // 2):
+ yield next(request_iterator)
+ time.sleep(WAIT_TIME)
+
+
+class MethodHandler(grpc.RpcMethodHandler):
+
+ def __init__(self, request_streaming, response_streaming, partial_hang):
+ self.request_streaming = request_streaming
+ self.response_streaming = response_streaming
+ self.request_deserializer = None
+ self.response_serializer = None
+ self.unary_unary = None
+ self.unary_stream = None
+ self.stream_unary = None
+ self.stream_stream = None
+ if self.request_streaming and self.response_streaming:
+ if partial_hang:
+ self.stream_stream = hang_partial_stream_stream
+ else:
+ self.stream_stream = hang_stream_stream
+ elif self.request_streaming:
+ if partial_hang:
+ self.stream_unary = hang_partial_stream_unary
+ else:
+ self.stream_unary = hang_stream_unary
+ elif self.response_streaming:
+ if partial_hang:
+ self.unary_stream = hang_partial_unary_stream
+ else:
+ self.unary_stream = hang_unary_stream
+ else:
+ self.unary_unary = hang_unary_unary
+
+
+class GenericHandler(grpc.GenericRpcHandler):
+
+ def service(self, handler_call_details):
+ if handler_call_details.method == UNARY_UNARY:
+ return MethodHandler(False, False, False)
+ elif handler_call_details.method == UNARY_STREAM:
+ return MethodHandler(False, True, False)
+ elif handler_call_details.method == STREAM_UNARY:
+ return MethodHandler(True, False, False)
+ elif handler_call_details.method == STREAM_STREAM:
+ return MethodHandler(True, True, False)
+ elif handler_call_details.method == PARTIAL_UNARY_STREAM:
+ return MethodHandler(False, True, True)
+ elif handler_call_details.method == PARTIAL_STREAM_UNARY:
+ return MethodHandler(True, False, True)
+ elif handler_call_details.method == PARTIAL_STREAM_STREAM:
+ return MethodHandler(True, True, True)
+ else:
+ return None
+
+
+# Traditional executors will not exit until all their
+# current jobs complete. Because we submit jobs that will
+# never finish, we don't want to block exit on these jobs.
+class DaemonPool(object):
+
+ def submit(self, fn, *args, **kwargs):
+ thread = threading.Thread(target=fn, args=args, kwargs=kwargs)
+ thread.daemon = True
+ thread.start()
+
+ def shutdown(self, wait=True):
+ pass
+
+
+def infinite_request_iterator():
+ while True:
+ yield REQUEST
+
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser()
+ parser.add_argument('scenario', type=str)
+ parser.add_argument(
+ '--wait_for_interrupt', dest='wait_for_interrupt', action='store_true')
+ args = parser.parse_args()
+
+ if args.scenario == UNSTARTED_SERVER:
+ server = grpc.server((), DaemonPool())
+ if args.wait_for_interrupt:
+ time.sleep(WAIT_TIME)
+ elif args.scenario == RUNNING_SERVER:
+ server = grpc.server((), DaemonPool())
+ port = server.add_insecure_port('[::]:0')
+ server.start()
+ if args.wait_for_interrupt:
+ time.sleep(WAIT_TIME)
+ elif args.scenario == POLL_CONNECTIVITY_NO_SERVER:
+ channel = grpc.insecure_channel('localhost:12345')
+
+ def connectivity_callback(connectivity):
+ pass
+
+ channel.subscribe(connectivity_callback, try_to_connect=True)
+ if args.wait_for_interrupt:
+ time.sleep(WAIT_TIME)
+ elif args.scenario == POLL_CONNECTIVITY:
+ server = grpc.server((), DaemonPool())
+ port = server.add_insecure_port('[::]:0')
+ server.start()
+ channel = grpc.insecure_channel('localhost:%d' % port)
+
+ def connectivity_callback(connectivity):
+ pass
+
+ channel.subscribe(connectivity_callback, try_to_connect=True)
+ if args.wait_for_interrupt:
+ time.sleep(WAIT_TIME)
+
+ else:
+ handler = GenericHandler()
+ server = grpc.server((), DaemonPool())
+ port = server.add_insecure_port('[::]:0')
+ server.add_generic_rpc_handlers((handler,))
+ server.start()
+ channel = grpc.insecure_channel('localhost:%d' % port)
+
+ method = TEST_TO_METHOD[args.scenario]
+
+ if args.scenario == IN_FLIGHT_UNARY_UNARY_CALL:
+ multi_callable = channel.unary_unary(method)
+ future = multi_callable.future(REQUEST)
+ result, call = multi_callable.with_call(REQUEST)
+ elif (args.scenario == IN_FLIGHT_UNARY_STREAM_CALL or
+ args.scenario == IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL):
+ multi_callable = channel.unary_stream(method)
+ response_iterator = multi_callable(REQUEST)
+ for response in response_iterator:
+ pass
+ elif (args.scenario == IN_FLIGHT_STREAM_UNARY_CALL or
+ args.scenario == IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL):
+ multi_callable = channel.stream_unary(method)
+ future = multi_callable.future(infinite_request_iterator())
+ result, call = multi_callable.with_call(
+ [REQUEST] * test_constants.STREAM_LENGTH)
+ elif (args.scenario == IN_FLIGHT_STREAM_STREAM_CALL or
+ args.scenario == IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL):
+ multi_callable = channel.stream_stream(method)
+ response_iterator = multi_callable(infinite_request_iterator())
+ for response in response_iterator:
+ pass
diff --git a/src/python/grpcio/tests/unit/_exit_test.py b/src/python/grpcio/tests/unit/_exit_test.py
new file mode 100644
index 0000000000..b0d6af73e5
--- /dev/null
+++ b/src/python/grpcio/tests/unit/_exit_test.py
@@ -0,0 +1,185 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tests clean exit of server/client on Python Interpreter exit/sigint.
+
+The tests in this module spawn a subprocess for each test case, the
+test is considered successful if it doesn't hang/timeout.
+"""
+
+import atexit
+import os
+import signal
+import six
+import subprocess
+import sys
+import threading
+import time
+import unittest
+
+from tests.unit import _exit_scenarios
+
+SCENARIO_FILE = os.path.abspath(os.path.join(
+ os.path.dirname(os.path.realpath(__file__)), '_exit_scenarios.py'))
+INTERPRETER = sys.executable
+BASE_COMMAND = [INTERPRETER, SCENARIO_FILE]
+BASE_SIGTERM_COMMAND = BASE_COMMAND + ['--wait_for_interrupt']
+
+INIT_TIME = 1.0
+
+
+processes = []
+process_lock = threading.Lock()
+
+
+# Make sure we attempt to clean up any
+# processes we may have left running
+def cleanup_processes():
+ with process_lock:
+ for process in processes:
+ try:
+ process.kill()
+ except Exception:
+ pass
+atexit.register(cleanup_processes)
+
+
+def interrupt_and_wait(process):
+ with process_lock:
+ processes.append(process)
+ time.sleep(INIT_TIME)
+ os.kill(process.pid, signal.SIGINT)
+ process.wait()
+
+
+def wait(process):
+ with process_lock:
+ processes.append(process)
+ process.wait()
+
+
+class ExitTest(unittest.TestCase):
+
+ def test_unstarted_server(self):
+ process = subprocess.Popen(
+ BASE_COMMAND + [_exit_scenarios.UNSTARTED_SERVER],
+ stdout=sys.stdout, stderr=sys.stderr)
+ wait(process)
+
+ def test_unstarted_server_terminate(self):
+ process = subprocess.Popen(
+ BASE_SIGTERM_COMMAND + [_exit_scenarios.UNSTARTED_SERVER],
+ stdout=sys.stdout)
+ interrupt_and_wait(process)
+
+ def test_running_server(self):
+ process = subprocess.Popen(
+ BASE_COMMAND + [_exit_scenarios.RUNNING_SERVER],
+ stdout=sys.stdout, stderr=sys.stderr)
+ wait(process)
+
+ def test_running_server_terminate(self):
+ process = subprocess.Popen(
+ BASE_SIGTERM_COMMAND + [_exit_scenarios.RUNNING_SERVER],
+ stdout=sys.stdout, stderr=sys.stderr)
+ interrupt_and_wait(process)
+
+ def test_poll_connectivity_no_server(self):
+ process = subprocess.Popen(
+ BASE_COMMAND + [_exit_scenarios.POLL_CONNECTIVITY_NO_SERVER],
+ stdout=sys.stdout, stderr=sys.stderr)
+ wait(process)
+
+ def test_poll_connectivity_no_server_terminate(self):
+ process = subprocess.Popen(
+ BASE_SIGTERM_COMMAND + [_exit_scenarios.POLL_CONNECTIVITY_NO_SERVER],
+ stdout=sys.stdout, stderr=sys.stderr)
+ interrupt_and_wait(process)
+
+ def test_poll_connectivity(self):
+ process = subprocess.Popen(
+ BASE_COMMAND + [_exit_scenarios.POLL_CONNECTIVITY],
+ stdout=sys.stdout, stderr=sys.stderr)
+ wait(process)
+
+ def test_poll_connectivity_terminate(self):
+ process = subprocess.Popen(
+ BASE_SIGTERM_COMMAND + [_exit_scenarios.POLL_CONNECTIVITY],
+ stdout=sys.stdout, stderr=sys.stderr)
+ interrupt_and_wait(process)
+
+ def test_in_flight_unary_unary_call(self):
+ process = subprocess.Popen(
+ BASE_COMMAND + [_exit_scenarios.IN_FLIGHT_UNARY_UNARY_CALL],
+ stdout=sys.stdout, stderr=sys.stderr)
+ interrupt_and_wait(process)
+
+ @unittest.skipIf(six.PY2, 'https://github.com/grpc/grpc/issues/6999')
+ def test_in_flight_unary_stream_call(self):
+ process = subprocess.Popen(
+ BASE_COMMAND + [_exit_scenarios.IN_FLIGHT_UNARY_STREAM_CALL],
+ stdout=sys.stdout, stderr=sys.stderr)
+ interrupt_and_wait(process)
+
+ def test_in_flight_stream_unary_call(self):
+ process = subprocess.Popen(
+ BASE_COMMAND + [_exit_scenarios.IN_FLIGHT_STREAM_UNARY_CALL],
+ stdout=sys.stdout, stderr=sys.stderr)
+ interrupt_and_wait(process)
+
+ @unittest.skipIf(six.PY2, 'https://github.com/grpc/grpc/issues/6999')
+ def test_in_flight_stream_stream_call(self):
+ process = subprocess.Popen(
+ BASE_COMMAND + [_exit_scenarios.IN_FLIGHT_STREAM_STREAM_CALL],
+ stdout=sys.stdout, stderr=sys.stderr)
+ interrupt_and_wait(process)
+
+ @unittest.skipIf(six.PY2, 'https://github.com/grpc/grpc/issues/6999')
+ def test_in_flight_partial_unary_stream_call(self):
+ process = subprocess.Popen(
+ BASE_COMMAND + [_exit_scenarios.IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL],
+ stdout=sys.stdout, stderr=sys.stderr)
+ interrupt_and_wait(process)
+
+ def test_in_flight_partial_stream_unary_call(self):
+ process = subprocess.Popen(
+ BASE_COMMAND + [_exit_scenarios.IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL],
+ stdout=sys.stdout, stderr=sys.stderr)
+ interrupt_and_wait(process)
+
+ @unittest.skipIf(six.PY2, 'https://github.com/grpc/grpc/issues/6999')
+ def test_in_flight_partial_stream_stream_call(self):
+ process = subprocess.Popen(
+ BASE_COMMAND + [_exit_scenarios.IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL],
+ stdout=sys.stdout, stderr=sys.stderr)
+ interrupt_and_wait(process)
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)