diff options
Diffstat (limited to 'src/python/grpcio_testing/grpc_testing/_channel/_channel_state.py')
-rw-r--r-- | src/python/grpcio_testing/grpc_testing/_channel/_channel_state.py | 48 |
1 files changed, 48 insertions, 0 deletions
diff --git a/src/python/grpcio_testing/grpc_testing/_channel/_channel_state.py b/src/python/grpcio_testing/grpc_testing/_channel/_channel_state.py new file mode 100644 index 0000000000..569c41d79d --- /dev/null +++ b/src/python/grpcio_testing/grpc_testing/_channel/_channel_state.py @@ -0,0 +1,48 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import threading + +from grpc_testing import _common +from grpc_testing._channel import _rpc_state + + +class State(_common.ChannelHandler): + + def __init__(self): + self._condition = threading.Condition() + self._rpc_states = collections.defaultdict(list) + + def invoke_rpc( + self, method_full_rpc_name, invocation_metadata, requests, + requests_closed, timeout): + rpc_state = _rpc_state.State( + invocation_metadata, requests, requests_closed) + with self._condition: + self._rpc_states[method_full_rpc_name].append(rpc_state) + self._condition.notify_all() + return rpc_state + + def take_rpc_state(self, method_descriptor): + method_full_rpc_name = '/{}/{}'.format( + method_descriptor.containing_service.full_name, + method_descriptor.name) + with self._condition: + while True: + method_rpc_states = self._rpc_states[method_full_rpc_name] + if method_rpc_states: + return method_rpc_states.pop(0) + else: + self._condition.wait() |