# Copyright 2018 gRPC authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Implementations of fork support test methods.""" import enum import json import logging import multiprocessing import os import threading import time import grpc from six.moves import queue from src.proto.grpc.testing import empty_pb2 from src.proto.grpc.testing import messages_pb2 from src.proto.grpc.testing import test_pb2_grpc _LOGGER = logging.getLogger(__name__) def _channel(args): target = '{}:{}'.format(args.server_host, args.server_port) if args.use_tls: channel_credentials = grpc.ssl_channel_credentials() channel = grpc.secure_channel(target, channel_credentials) else: channel = grpc.insecure_channel(target) return channel def _validate_payload_type_and_length(response, expected_type, expected_length): if response.payload.type is not expected_type: raise ValueError('expected payload type %s, got %s' % (expected_type, type(response.payload.type))) elif len(response.payload.body) != expected_length: raise ValueError('expected payload body size %d, got %d' % (expected_length, len(response.payload.body))) def _async_unary(stub): size = 314159 request = messages_pb2.SimpleRequest( response_type=messages_pb2.COMPRESSABLE, response_size=size, payload=messages_pb2.Payload(body=b'\x00' * 271828)) response_future = stub.UnaryCall.future(request) response = response_future.result() _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size) def _blocking_unary(stub): size = 314159 request = messages_pb2.SimpleRequest( response_type=messages_pb2.COMPRESSABLE, response_size=size, payload=messages_pb2.Payload(body=b'\x00' * 271828)) response = stub.UnaryCall(request) _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size) class _Pipe(object): def __init__(self): self._condition = threading.Condition() self._values = [] self._open = True def __iter__(self): return self def __next__(self): return self.next() def next(self): with self._condition: while not self._values and self._open: self._condition.wait() if self._values: return self._values.pop(0) else: raise StopIteration() def add(self, value): with self._condition: self._values.append(value) self._condition.notify() def close(self): with self._condition: self._open = False self._condition.notify() def __enter__(self): return self def __exit__(self, type, value, traceback): self.close() class _ChildProcess(object): def __init__(self, task, args=None): if args is None: args = () self._exceptions = multiprocessing.Queue() def record_exceptions(): try: task(*args) except Exception as e: # pylint: disable=broad-except self._exceptions.put(e) self._process = multiprocessing.Process(target=record_exceptions) def start(self): self._process.start() def finish(self): self._process.join() if self._process.exitcode != 0: raise ValueError('Child process failed with exitcode %d' % self._process.exitcode) try: exception = self._exceptions.get(block=False) raise ValueError('Child process failed: %s' % exception) except queue.Empty: pass def _async_unary_same_channel(channel): def child_target(): try: _async_unary(stub) raise Exception( 'Child should not be able to re-use channel after fork') except ValueError as expected_value_error: pass stub = test_pb2_grpc.TestServiceStub(channel) _async_unary(stub) child_process = _ChildProcess(child_target) child_process.start() _async_unary(stub) child_process.finish() def _async_unary_new_channel(channel, args): def child_target(): child_channel = _channel(args) child_stub = test_pb2_grpc.TestServiceStub(child_channel) _async_unary(child_stub) child_channel.close() stub = test_pb2_grpc.TestServiceStub(channel) _async_unary(stub) child_process = _ChildProcess(child_target) child_process.start() _async_unary(stub) child_process.finish() def _blocking_unary_same_channel(channel): def child_target(): try: _blocking_unary(stub) raise Exception( 'Child should not be able to re-use channel after fork') except ValueError as expected_value_error: pass stub = test_pb2_grpc.TestServiceStub(channel) _blocking_unary(stub) child_process = _ChildProcess(child_target) child_process.start() child_process.finish() def _blocking_unary_new_channel(channel, args): def child_target(): child_channel = _channel(args) child_stub = test_pb2_grpc.TestServiceStub(child_channel) _blocking_unary(child_stub) child_channel.close() stub = test_pb2_grpc.TestServiceStub(channel) _blocking_unary(stub) child_process = _ChildProcess(child_target) child_process.start() _blocking_unary(stub) child_process.finish() # Verify that the fork channel registry can handle already closed channels def _close_channel_before_fork(channel, args): def child_target(): new_channel.close() child_channel = _channel(args) child_stub = test_pb2_grpc.TestServiceStub(child_channel) _blocking_unary(child_stub) child_channel.close() stub = test_pb2_grpc.TestServiceStub(channel) _blocking_unary(stub) channel.close() new_channel = _channel(args) new_stub = test_pb2_grpc.TestServiceStub(new_channel) child_process = _ChildProcess(child_target) child_process.start() _blocking_unary(new_stub) child_process.finish() def _connectivity_watch(channel, args): def child_target(): def child_connectivity_callback(state): child_states.append(state) child_states = [] child_channel = _channel(args) child_stub = test_pb2_grpc.TestServiceStub(child_channel) child_channel.subscribe(child_connectivity_callback) _async_unary(child_stub) if len(child_states ) < 2 or child_states[-1] != grpc.ChannelConnectivity.READY: raise ValueError('Channel did not move to READY') if len(parent_states) > 1: raise ValueError('Received connectivity updates on parent callback') child_channel.unsubscribe(child_connectivity_callback) child_channel.close() def parent_connectivity_callback(state): parent_states.append(state) parent_states = [] channel.subscribe(parent_connectivity_callback) stub = test_pb2_grpc.TestServiceStub(channel) child_process = _ChildProcess(child_target) child_process.start() _async_unary(stub) if len(parent_states ) < 2 or parent_states[-1] != grpc.ChannelConnectivity.READY: raise ValueError('Channel did not move to READY') channel.unsubscribe(parent_connectivity_callback) child_process.finish() # Need to unsubscribe or _channel.py in _poll_connectivity triggers a # "Cannot invoke RPC on closed channel!" error. # TODO(ericgribkoff) Fix issue with channel.close() and connectivity polling channel.unsubscribe(parent_connectivity_callback) def _ping_pong_with_child_processes_after_first_response( channel, args, child_target, run_after_close=True): request_response_sizes = ( 31415, 9, 2653, 58979, ) request_payload_sizes = ( 27182, 8, 1828, 45904, ) stub = test_pb2_grpc.TestServiceStub(channel) pipe = _Pipe() parent_bidi_call = stub.FullDuplexCall(pipe) child_processes = [] first_message_received = False for response_size, payload_size in zip(request_response_sizes, request_payload_sizes): request = messages_pb2.StreamingOutputCallRequest( response_type=messages_pb2.COMPRESSABLE, response_parameters=( messages_pb2.ResponseParameters(size=response_size),), payload=messages_pb2.Payload(body=b'\x00' * payload_size)) pipe.add(request) if first_message_received: child_process = _ChildProcess(child_target, (parent_bidi_call, channel, args)) child_process.start() child_processes.append(child_process) response = next(parent_bidi_call) first_message_received = True child_process = _ChildProcess(child_target, (parent_bidi_call, channel, args)) child_process.start() child_processes.append(child_process) _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, response_size) pipe.close() if run_after_close: child_process = _ChildProcess(child_target, (parent_bidi_call, channel, args)) child_process.start() child_processes.append(child_process) for child_process in child_processes: child_process.finish() def _in_progress_bidi_continue_call(channel): def child_target(parent_bidi_call, parent_channel, args): stub = test_pb2_grpc.TestServiceStub(parent_channel) try: _async_unary(stub) raise Exception( 'Child should not be able to re-use channel after fork') except ValueError as expected_value_error: pass inherited_code = parent_bidi_call.code() inherited_details = parent_bidi_call.details() if inherited_code != grpc.StatusCode.CANCELLED: raise ValueError( 'Expected inherited code CANCELLED, got %s' % inherited_code) if inherited_details != 'Channel closed due to fork': raise ValueError( 'Expected inherited details Channel closed due to fork, got %s' % inherited_details) # Don't run child_target after closing the parent call, as the call may have # received a status from the server before fork occurs. _ping_pong_with_child_processes_after_first_response( channel, None, child_target, run_after_close=False) def _in_progress_bidi_same_channel_async_call(channel): def child_target(parent_bidi_call, parent_channel, args): stub = test_pb2_grpc.TestServiceStub(parent_channel) try: _async_unary(stub) raise Exception( 'Child should not be able to re-use channel after fork') except ValueError as expected_value_error: pass _ping_pong_with_child_processes_after_first_response( channel, None, child_target) def _in_progress_bidi_same_channel_blocking_call(channel): def child_target(parent_bidi_call, parent_channel, args): stub = test_pb2_grpc.TestServiceStub(parent_channel) try: _blocking_unary(stub) raise Exception( 'Child should not be able to re-use channel after fork') except ValueError as expected_value_error: pass _ping_pong_with_child_processes_after_first_response( channel, None, child_target) def _in_progress_bidi_new_channel_async_call(channel, args): def child_target(parent_bidi_call, parent_channel, args): channel = _channel(args) stub = test_pb2_grpc.TestServiceStub(channel) _async_unary(stub) _ping_pong_with_child_processes_after_first_response( channel, args, child_target) def _in_progress_bidi_new_channel_blocking_call(channel, args): def child_target(parent_bidi_call, parent_channel, args): channel = _channel(args) stub = test_pb2_grpc.TestServiceStub(channel) _blocking_unary(stub) _ping_pong_with_child_processes_after_first_response( channel, args, child_target) @enum.unique class TestCase(enum.Enum): CONNECTIVITY_WATCH = 'connectivity_watch' CLOSE_CHANNEL_BEFORE_FORK = 'close_channel_before_fork' ASYNC_UNARY_SAME_CHANNEL = 'async_unary_same_channel' ASYNC_UNARY_NEW_CHANNEL = 'async_unary_new_channel' BLOCKING_UNARY_SAME_CHANNEL = 'blocking_unary_same_channel' BLOCKING_UNARY_NEW_CHANNEL = 'blocking_unary_new_channel' IN_PROGRESS_BIDI_CONTINUE_CALL = 'in_progress_bidi_continue_call' IN_PROGRESS_BIDI_SAME_CHANNEL_ASYNC_CALL = 'in_progress_bidi_same_channel_async_call' IN_PROGRESS_BIDI_SAME_CHANNEL_BLOCKING_CALL = 'in_progress_bidi_same_channel_blocking_call' IN_PROGRESS_BIDI_NEW_CHANNEL_ASYNC_CALL = 'in_progress_bidi_new_channel_async_call' IN_PROGRESS_BIDI_NEW_CHANNEL_BLOCKING_CALL = 'in_progress_bidi_new_channel_blocking_call' def run_test(self, args): _LOGGER.info("Running %s", self) channel = _channel(args) if self is TestCase.ASYNC_UNARY_SAME_CHANNEL: _async_unary_same_channel(channel) elif self is TestCase.ASYNC_UNARY_NEW_CHANNEL: _async_unary_new_channel(channel, args) elif self is TestCase.BLOCKING_UNARY_SAME_CHANNEL: _blocking_unary_same_channel(channel) elif self is TestCase.BLOCKING_UNARY_NEW_CHANNEL: _blocking_unary_new_channel(channel, args) elif self is TestCase.CLOSE_CHANNEL_BEFORE_FORK: _close_channel_before_fork(channel, args) elif self is TestCase.CONNECTIVITY_WATCH: _connectivity_watch(channel, args) elif self is TestCase.IN_PROGRESS_BIDI_CONTINUE_CALL: _in_progress_bidi_continue_call(channel) elif self is TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_ASYNC_CALL: _in_progress_bidi_same_channel_async_call(channel) elif self is TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_BLOCKING_CALL: _in_progress_bidi_same_channel_blocking_call(channel) elif self is TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_ASYNC_CALL: _in_progress_bidi_new_channel_async_call(channel, args) elif self is TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_BLOCKING_CALL: _in_progress_bidi_new_channel_blocking_call(channel, args) else: raise NotImplementedError( 'Test case "%s" not implemented!' % self.name) channel.close()