aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio_testing/grpc_testing/_channel/_channel_state.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio_testing/grpc_testing/_channel/_channel_state.py')
-rw-r--r--src/python/grpcio_testing/grpc_testing/_channel/_channel_state.py48
1 files changed, 48 insertions, 0 deletions
diff --git a/src/python/grpcio_testing/grpc_testing/_channel/_channel_state.py b/src/python/grpcio_testing/grpc_testing/_channel/_channel_state.py
new file mode 100644
index 0000000000..569c41d79d
--- /dev/null
+++ b/src/python/grpcio_testing/grpc_testing/_channel/_channel_state.py
@@ -0,0 +1,48 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+import threading
+
+from grpc_testing import _common
+from grpc_testing._channel import _rpc_state
+
+
+class State(_common.ChannelHandler):
+
+ def __init__(self):
+ self._condition = threading.Condition()
+ self._rpc_states = collections.defaultdict(list)
+
+ def invoke_rpc(
+ self, method_full_rpc_name, invocation_metadata, requests,
+ requests_closed, timeout):
+ rpc_state = _rpc_state.State(
+ invocation_metadata, requests, requests_closed)
+ with self._condition:
+ self._rpc_states[method_full_rpc_name].append(rpc_state)
+ self._condition.notify_all()
+ return rpc_state
+
+ def take_rpc_state(self, method_descriptor):
+ method_full_rpc_name = '/{}/{}'.format(
+ method_descriptor.containing_service.full_name,
+ method_descriptor.name)
+ with self._condition:
+ while True:
+ method_rpc_states = self._rpc_states[method_full_rpc_name]
+ if method_rpc_states:
+ return method_rpc_states.pop(0)
+ else:
+ self._condition.wait()