diff options
Diffstat (limited to 'src/python/grpcio_tests/tests/fork/methods.py')
-rw-r--r-- | src/python/grpcio_tests/tests/fork/methods.py | 445 |
1 files changed, 445 insertions, 0 deletions
diff --git a/src/python/grpcio_tests/tests/fork/methods.py b/src/python/grpcio_tests/tests/fork/methods.py new file mode 100644 index 0000000000..889ef13cb2 --- /dev/null +++ b/src/python/grpcio_tests/tests/fork/methods.py @@ -0,0 +1,445 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Implementations of fork support test methods.""" + +import enum +import json +import logging +import multiprocessing +import os +import threading +import time + +import grpc + +from six.moves import queue + +from src.proto.grpc.testing import empty_pb2 +from src.proto.grpc.testing import messages_pb2 +from src.proto.grpc.testing import test_pb2_grpc + +_LOGGER = logging.getLogger(__name__) + + +def _channel(args): + target = '{}:{}'.format(args.server_host, args.server_port) + if args.use_tls: + channel_credentials = grpc.ssl_channel_credentials() + channel = grpc.secure_channel(target, channel_credentials) + else: + channel = grpc.insecure_channel(target) + return channel + + +def _validate_payload_type_and_length(response, expected_type, expected_length): + if response.payload.type is not expected_type: + raise ValueError('expected payload type %s, got %s' % + (expected_type, type(response.payload.type))) + elif len(response.payload.body) != expected_length: + raise ValueError('expected payload body size %d, got %d' % + (expected_length, len(response.payload.body))) + + +def _async_unary(stub): + size = 314159 + request = messages_pb2.SimpleRequest( + response_type=messages_pb2.COMPRESSABLE, + response_size=size, + payload=messages_pb2.Payload(body=b'\x00' * 271828)) + response_future = stub.UnaryCall.future(request) + response = response_future.result() + _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size) + + +def _blocking_unary(stub): + size = 314159 + request = messages_pb2.SimpleRequest( + response_type=messages_pb2.COMPRESSABLE, + response_size=size, + payload=messages_pb2.Payload(body=b'\x00' * 271828)) + response = stub.UnaryCall(request) + _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size) + + +class _Pipe(object): + + def __init__(self): + self._condition = threading.Condition() + self._values = [] + self._open = True + + def __iter__(self): + return self + + def __next__(self): + return self.next() + + def next(self): + with self._condition: + while not self._values and self._open: + self._condition.wait() + if self._values: + return self._values.pop(0) + else: + raise StopIteration() + + def add(self, value): + with self._condition: + self._values.append(value) + self._condition.notify() + + def close(self): + with self._condition: + self._open = False + self._condition.notify() + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + self.close() + + +class _ChildProcess(object): + + def __init__(self, task, args=None): + if args is None: + args = () + self._exceptions = multiprocessing.Queue() + + def record_exceptions(): + try: + task(*args) + except Exception as e: # pylint: disable=broad-except + self._exceptions.put(e) + + self._process = multiprocessing.Process(target=record_exceptions) + + def start(self): + self._process.start() + + def finish(self): + self._process.join() + if self._process.exitcode != 0: + raise ValueError('Child process failed with exitcode %d' % + self._process.exitcode) + try: + exception = self._exceptions.get(block=False) + raise ValueError('Child process failed: %s' % exception) + except queue.Empty: + pass + + +def _async_unary_same_channel(channel): + + def child_target(): + try: + _async_unary(stub) + raise Exception( + 'Child should not be able to re-use channel after fork') + except ValueError as expected_value_error: + pass + + stub = test_pb2_grpc.TestServiceStub(channel) + _async_unary(stub) + child_process = _ChildProcess(child_target) + child_process.start() + _async_unary(stub) + child_process.finish() + + +def _async_unary_new_channel(channel, args): + + def child_target(): + child_channel = _channel(args) + child_stub = test_pb2_grpc.TestServiceStub(child_channel) + _async_unary(child_stub) + child_channel.close() + + stub = test_pb2_grpc.TestServiceStub(channel) + _async_unary(stub) + child_process = _ChildProcess(child_target) + child_process.start() + _async_unary(stub) + child_process.finish() + + +def _blocking_unary_same_channel(channel): + + def child_target(): + try: + _blocking_unary(stub) + raise Exception( + 'Child should not be able to re-use channel after fork') + except ValueError as expected_value_error: + pass + + stub = test_pb2_grpc.TestServiceStub(channel) + _blocking_unary(stub) + child_process = _ChildProcess(child_target) + child_process.start() + child_process.finish() + + +def _blocking_unary_new_channel(channel, args): + + def child_target(): + child_channel = _channel(args) + child_stub = test_pb2_grpc.TestServiceStub(child_channel) + _blocking_unary(child_stub) + child_channel.close() + + stub = test_pb2_grpc.TestServiceStub(channel) + _blocking_unary(stub) + child_process = _ChildProcess(child_target) + child_process.start() + _blocking_unary(stub) + child_process.finish() + + +# Verify that the fork channel registry can handle already closed channels +def _close_channel_before_fork(channel, args): + + def child_target(): + new_channel.close() + child_channel = _channel(args) + child_stub = test_pb2_grpc.TestServiceStub(child_channel) + _blocking_unary(child_stub) + child_channel.close() + + stub = test_pb2_grpc.TestServiceStub(channel) + _blocking_unary(stub) + channel.close() + + new_channel = _channel(args) + new_stub = test_pb2_grpc.TestServiceStub(new_channel) + child_process = _ChildProcess(child_target) + child_process.start() + _blocking_unary(new_stub) + child_process.finish() + + +def _connectivity_watch(channel, args): + + def child_target(): + + def child_connectivity_callback(state): + child_states.append(state) + + child_states = [] + child_channel = _channel(args) + child_stub = test_pb2_grpc.TestServiceStub(child_channel) + child_channel.subscribe(child_connectivity_callback) + _async_unary(child_stub) + if len(child_states + ) < 2 or child_states[-1] != grpc.ChannelConnectivity.READY: + raise ValueError('Channel did not move to READY') + if len(parent_states) > 1: + raise ValueError('Received connectivity updates on parent callback') + child_channel.unsubscribe(child_connectivity_callback) + child_channel.close() + + def parent_connectivity_callback(state): + parent_states.append(state) + + parent_states = [] + channel.subscribe(parent_connectivity_callback) + stub = test_pb2_grpc.TestServiceStub(channel) + child_process = _ChildProcess(child_target) + child_process.start() + _async_unary(stub) + if len(parent_states + ) < 2 or parent_states[-1] != grpc.ChannelConnectivity.READY: + raise ValueError('Channel did not move to READY') + channel.unsubscribe(parent_connectivity_callback) + child_process.finish() + + # Need to unsubscribe or _channel.py in _poll_connectivity triggers a + # "Cannot invoke RPC on closed channel!" error. + # TODO(ericgribkoff) Fix issue with channel.close() and connectivity polling + channel.unsubscribe(parent_connectivity_callback) + + +def _ping_pong_with_child_processes_after_first_response( + channel, args, child_target, run_after_close=True): + request_response_sizes = ( + 31415, + 9, + 2653, + 58979, + ) + request_payload_sizes = ( + 27182, + 8, + 1828, + 45904, + ) + stub = test_pb2_grpc.TestServiceStub(channel) + pipe = _Pipe() + parent_bidi_call = stub.FullDuplexCall(pipe) + child_processes = [] + first_message_received = False + for response_size, payload_size in zip(request_response_sizes, + request_payload_sizes): + request = messages_pb2.StreamingOutputCallRequest( + response_type=messages_pb2.COMPRESSABLE, + response_parameters=( + messages_pb2.ResponseParameters(size=response_size),), + payload=messages_pb2.Payload(body=b'\x00' * payload_size)) + pipe.add(request) + if first_message_received: + child_process = _ChildProcess(child_target, + (parent_bidi_call, channel, args)) + child_process.start() + child_processes.append(child_process) + response = next(parent_bidi_call) + first_message_received = True + child_process = _ChildProcess(child_target, + (parent_bidi_call, channel, args)) + child_process.start() + child_processes.append(child_process) + _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, + response_size) + pipe.close() + if run_after_close: + child_process = _ChildProcess(child_target, + (parent_bidi_call, channel, args)) + child_process.start() + child_processes.append(child_process) + for child_process in child_processes: + child_process.finish() + + +def _in_progress_bidi_continue_call(channel): + + def child_target(parent_bidi_call, parent_channel, args): + stub = test_pb2_grpc.TestServiceStub(parent_channel) + try: + _async_unary(stub) + raise Exception( + 'Child should not be able to re-use channel after fork') + except ValueError as expected_value_error: + pass + inherited_code = parent_bidi_call.code() + inherited_details = parent_bidi_call.details() + if inherited_code != grpc.StatusCode.CANCELLED: + raise ValueError( + 'Expected inherited code CANCELLED, got %s' % inherited_code) + if inherited_details != 'Channel closed due to fork': + raise ValueError( + 'Expected inherited details Channel closed due to fork, got %s' + % inherited_details) + + # Don't run child_target after closing the parent call, as the call may have + # received a status from the server before fork occurs. + _ping_pong_with_child_processes_after_first_response( + channel, None, child_target, run_after_close=False) + + +def _in_progress_bidi_same_channel_async_call(channel): + + def child_target(parent_bidi_call, parent_channel, args): + stub = test_pb2_grpc.TestServiceStub(parent_channel) + try: + _async_unary(stub) + raise Exception( + 'Child should not be able to re-use channel after fork') + except ValueError as expected_value_error: + pass + + _ping_pong_with_child_processes_after_first_response( + channel, None, child_target) + + +def _in_progress_bidi_same_channel_blocking_call(channel): + + def child_target(parent_bidi_call, parent_channel, args): + stub = test_pb2_grpc.TestServiceStub(parent_channel) + try: + _blocking_unary(stub) + raise Exception( + 'Child should not be able to re-use channel after fork') + except ValueError as expected_value_error: + pass + + _ping_pong_with_child_processes_after_first_response( + channel, None, child_target) + + +def _in_progress_bidi_new_channel_async_call(channel, args): + + def child_target(parent_bidi_call, parent_channel, args): + channel = _channel(args) + stub = test_pb2_grpc.TestServiceStub(channel) + _async_unary(stub) + + _ping_pong_with_child_processes_after_first_response( + channel, args, child_target) + + +def _in_progress_bidi_new_channel_blocking_call(channel, args): + + def child_target(parent_bidi_call, parent_channel, args): + channel = _channel(args) + stub = test_pb2_grpc.TestServiceStub(channel) + _blocking_unary(stub) + + _ping_pong_with_child_processes_after_first_response( + channel, args, child_target) + + +@enum.unique +class TestCase(enum.Enum): + + CONNECTIVITY_WATCH = 'connectivity_watch' + CLOSE_CHANNEL_BEFORE_FORK = 'close_channel_before_fork' + ASYNC_UNARY_SAME_CHANNEL = 'async_unary_same_channel' + ASYNC_UNARY_NEW_CHANNEL = 'async_unary_new_channel' + BLOCKING_UNARY_SAME_CHANNEL = 'blocking_unary_same_channel' + BLOCKING_UNARY_NEW_CHANNEL = 'blocking_unary_new_channel' + IN_PROGRESS_BIDI_CONTINUE_CALL = 'in_progress_bidi_continue_call' + IN_PROGRESS_BIDI_SAME_CHANNEL_ASYNC_CALL = 'in_progress_bidi_same_channel_async_call' + IN_PROGRESS_BIDI_SAME_CHANNEL_BLOCKING_CALL = 'in_progress_bidi_same_channel_blocking_call' + IN_PROGRESS_BIDI_NEW_CHANNEL_ASYNC_CALL = 'in_progress_bidi_new_channel_async_call' + IN_PROGRESS_BIDI_NEW_CHANNEL_BLOCKING_CALL = 'in_progress_bidi_new_channel_blocking_call' + + def run_test(self, args): + _LOGGER.info("Running %s", self) + channel = _channel(args) + if self is TestCase.ASYNC_UNARY_SAME_CHANNEL: + _async_unary_same_channel(channel) + elif self is TestCase.ASYNC_UNARY_NEW_CHANNEL: + _async_unary_new_channel(channel, args) + elif self is TestCase.BLOCKING_UNARY_SAME_CHANNEL: + _blocking_unary_same_channel(channel) + elif self is TestCase.BLOCKING_UNARY_NEW_CHANNEL: + _blocking_unary_new_channel(channel, args) + elif self is TestCase.CLOSE_CHANNEL_BEFORE_FORK: + _close_channel_before_fork(channel, args) + elif self is TestCase.CONNECTIVITY_WATCH: + _connectivity_watch(channel, args) + elif self is TestCase.IN_PROGRESS_BIDI_CONTINUE_CALL: + _in_progress_bidi_continue_call(channel) + elif self is TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_ASYNC_CALL: + _in_progress_bidi_same_channel_async_call(channel) + elif self is TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_BLOCKING_CALL: + _in_progress_bidi_same_channel_blocking_call(channel) + elif self is TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_ASYNC_CALL: + _in_progress_bidi_new_channel_async_call(channel, args) + elif self is TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_BLOCKING_CALL: + _in_progress_bidi_new_channel_blocking_call(channel, args) + else: + raise NotImplementedError( + 'Test case "%s" not implemented!' % self.name) + channel.close() |