aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio_testing/grpc_testing/_channel/_invocation.py
diff options
context:
space:
mode:
authorGravatar Nathaniel Manista <nathaniel@google.com>2017-07-28 01:24:52 +0000
committerGravatar Nathaniel Manista <nathaniel@google.com>2017-08-03 22:38:40 +0000
commit2010985ab269c8df0443e4f3782cbdffb083e9d4 (patch)
tree06a2a9a8a3f061542eeeac42edf2089b78c179be /src/python/grpcio_testing/grpc_testing/_channel/_invocation.py
parent69b7231776dc42c87abad33430c66e7b302bf00c (diff)
gRPC Python test infrastructure
(The channel-related second part of it.)
Diffstat (limited to 'src/python/grpcio_testing/grpc_testing/_channel/_invocation.py')
-rw-r--r--src/python/grpcio_testing/grpc_testing/_channel/_invocation.py322
1 files changed, 322 insertions, 0 deletions
diff --git a/src/python/grpcio_testing/grpc_testing/_channel/_invocation.py b/src/python/grpcio_testing/grpc_testing/_channel/_invocation.py
new file mode 100644
index 0000000000..ebce652eeb
--- /dev/null
+++ b/src/python/grpcio_testing/grpc_testing/_channel/_invocation.py
@@ -0,0 +1,322 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import threading
+
+import grpc
+
+_NOT_YET_OBSERVED = object()
+
+
+def _cancel(handler):
+ return handler.cancel(grpc.StatusCode.CANCELLED, 'Locally cancelled!')
+
+
+def _is_active(handler):
+ return handler.is_active()
+
+
+def _time_remaining(unused_handler):
+ raise NotImplementedError()
+
+
+def _add_callback(handler, callback):
+ return handler.add_callback(callback)
+
+
+def _initial_metadata(handler):
+ return handler.initial_metadata()
+
+
+def _trailing_metadata(handler):
+ trailing_metadata, unused_code, unused_details = handler.termination()
+ return trailing_metadata
+
+
+def _code(handler):
+ unused_trailing_metadata, code, unused_details = handler.termination()
+ return code
+
+
+def _details(handler):
+ unused_trailing_metadata, unused_code, details = handler.termination()
+ return details
+
+
+class _Call(grpc.Call):
+
+ def __init__(self, handler):
+ self._handler = handler
+
+ def cancel(self):
+ _cancel(self._handler)
+
+ def is_active(self):
+ return _is_active(self._handler)
+
+ def time_remaining(self):
+ return _time_remaining(self._handler)
+
+ def add_callback(self, callback):
+ return _add_callback(self._handler, callback)
+
+ def initial_metadata(self):
+ return _initial_metadata(self._handler)
+
+ def trailing_metadata(self):
+ return _trailing_metadata(self._handler)
+
+ def code(self):
+ return _code(self._handler)
+
+ def details(self):
+ return _details(self._handler)
+
+
+class _RpcErrorCall(grpc.RpcError, grpc.Call):
+
+ def __init__(self, handler):
+ self._handler = handler
+
+ def cancel(self):
+ _cancel(self._handler)
+
+ def is_active(self):
+ return _is_active(self._handler)
+
+ def time_remaining(self):
+ return _time_remaining(self._handler)
+
+ def add_callback(self, callback):
+ return _add_callback(self._handler, callback)
+
+ def initial_metadata(self):
+ return _initial_metadata(self._handler)
+
+ def trailing_metadata(self):
+ return _trailing_metadata(self._handler)
+
+ def code(self):
+ return _code(self._handler)
+
+ def details(self):
+ return _details(self._handler)
+
+
+def _next(handler):
+ read = handler.take_response()
+ if read.code is None:
+ return read.response
+ elif read.code is grpc.StatusCode.OK:
+ raise StopIteration()
+ else:
+ raise _RpcErrorCall(handler)
+
+
+class _HandlerExtras(object):
+
+ def __init__(self):
+ self.condition = threading.Condition()
+ self.unary_response = _NOT_YET_OBSERVED
+ self.cancelled = False
+
+
+def _with_extras_cancel(handler, extras):
+ with extras.condition:
+ if handler.cancel(grpc.StatusCode.CANCELLED, 'Locally cancelled!'):
+ extras.cancelled = True
+ return True
+ else:
+ return False
+
+
+def _extras_without_cancelled(extras):
+ with extras.condition:
+ return extras.cancelled
+
+
+def _running(handler):
+ return handler.is_active()
+
+
+def _done(handler):
+ return not handler.is_active()
+
+
+def _with_extras_unary_response(handler, extras):
+ with extras.condition:
+ if extras.unary_response is _NOT_YET_OBSERVED:
+ read = handler.take_response()
+ if read.code is None:
+ extras.unary_response = read.response
+ return read.response
+ else:
+ raise _RpcErrorCall(handler)
+ else:
+ return extras.unary_response
+
+
+def _exception(unused_handler):
+ raise NotImplementedError('TODO!')
+
+
+def _traceback(unused_handler):
+ raise NotImplementedError('TODO!')
+
+
+def _add_done_callback(handler, callback, future):
+ adapted_callback = lambda: callback(future)
+ if not handler.add_callback(adapted_callback):
+ callback(future)
+
+
+class _FutureCall(grpc.Future, grpc.Call):
+
+ def __init__(self, handler, extras):
+ self._handler = handler
+ self._extras = extras
+
+ def cancel(self):
+ return _with_extras_cancel(self._handler, self._extras)
+
+ def cancelled(self):
+ return _extras_without_cancelled(self._extras)
+
+ def running(self):
+ return _running(self._handler)
+
+ def done(self):
+ return _done(self._handler)
+
+ def result(self):
+ return _with_extras_unary_response(self._handler, self._extras)
+
+ def exception(self):
+ return _exception(self._handler)
+
+ def traceback(self):
+ return _traceback(self._handler)
+
+ def add_done_callback(self, fn):
+ _add_done_callback(self._handler, fn, self)
+
+ def is_active(self):
+ return _is_active(self._handler)
+
+ def time_remaining(self):
+ return _time_remaining(self._handler)
+
+ def add_callback(self, callback):
+ return _add_callback(self._handler, callback)
+
+ def initial_metadata(self):
+ return _initial_metadata(self._handler)
+
+ def trailing_metadata(self):
+ return _trailing_metadata(self._handler)
+
+ def code(self):
+ return _code(self._handler)
+
+ def details(self):
+ return _details(self._handler)
+
+
+def consume_requests(request_iterator, handler):
+
+ def _consume():
+ while True:
+ try:
+ request = next(request_iterator)
+ added = handler.add_request(request)
+ if not added:
+ break
+ except StopIteration:
+ handler.close_requests()
+ break
+ except Exception: # pylint: disable=broad-except
+ details = 'Exception iterating requests!'
+ logging.exception(details)
+ handler.cancel(grpc.StatusCode.UNKNOWN, details)
+
+ consumption = threading.Thread(target=_consume)
+ consumption.start()
+
+
+def blocking_unary_response(handler):
+ read = handler.take_response()
+ if read.code is None:
+ unused_trailing_metadata, code, unused_details = handler.termination()
+ if code is grpc.StatusCode.OK:
+ return read.response
+ else:
+ raise _RpcErrorCall(handler)
+ else:
+ raise _RpcErrorCall(handler)
+
+
+def blocking_unary_response_with_call(handler):
+ read = handler.take_response()
+ if read.code is None:
+ unused_trailing_metadata, code, unused_details = handler.termination()
+ if code is grpc.StatusCode.OK:
+ return read.response, _Call(handler)
+ else:
+ raise _RpcErrorCall(handler)
+ else:
+ raise _RpcErrorCall(handler)
+
+
+def future_call(handler):
+ return _FutureCall(handler, _HandlerExtras())
+
+
+class ResponseIteratorCall(grpc.Call):
+
+ def __init__(self, handler):
+ self._handler = handler
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ return _next(self._handler)
+
+ def next(self):
+ return _next(self._handler)
+
+ def cancel(self):
+ _cancel(self._handler)
+
+ def is_active(self):
+ return _is_active(self._handler)
+
+ def time_remaining(self):
+ return _time_remaining(self._handler)
+
+ def add_callback(self, callback):
+ return _add_callback(self._handler, callback)
+
+ def initial_metadata(self):
+ return _initial_metadata(self._handler)
+
+ def trailing_metadata(self):
+ return _trailing_metadata(self._handler)
+
+ def code(self):
+ return _code(self._handler)
+
+ def details(self):
+ return _details(self._handler)