aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio/grpc/beta
diff options
context:
space:
mode:
authorGravatar Masood Malekghassemi <atash@google.com>2017-01-13 19:20:10 -0800
committerGravatar Masood Malekghassemi <atash@google.com>2017-01-17 10:55:33 -0800
commitcc793703bfba6f661f523b6fec82ff8a913e1759 (patch)
treef3cb0c7330565e9ed9947a07c6423f81e5c00f72 /src/python/grpcio/grpc/beta
parent06dea573daa2175b244a430bb89b49bb5c8e8c5b (diff)
Run Python formatting
Diffstat (limited to 'src/python/grpcio/grpc/beta')
-rw-r--r--src/python/grpcio/grpc/beta/_client_adaptations.py1028
-rw-r--r--src/python/grpcio/grpc/beta/_connectivity_channel.py213
-rw-r--r--src/python/grpcio/grpc/beta/_server_adaptations.py544
-rw-r--r--src/python/grpcio/grpc/beta/implementations.py173
-rw-r--r--src/python/grpcio/grpc/beta/interfaces.py78
-rw-r--r--src/python/grpcio/grpc/beta/utilities.py202
6 files changed, 1195 insertions, 1043 deletions
diff --git a/src/python/grpcio/grpc/beta/_client_adaptations.py b/src/python/grpcio/grpc/beta/_client_adaptations.py
index e4ee44d7a3..e5b28e9408 100644
--- a/src/python/grpcio/grpc/beta/_client_adaptations.py
+++ b/src/python/grpcio/grpc/beta/_client_adaptations.py
@@ -26,7 +26,6 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
"""Translates gRPC's client-side API into gRPC's client-side Beta API."""
import grpc
@@ -38,531 +37,654 @@ from grpc.framework.foundation import future
from grpc.framework.interfaces.face import face
_STATUS_CODE_TO_ABORTION_KIND_AND_ABORTION_ERROR_CLASS = {
- grpc.StatusCode.CANCELLED: (
- face.Abortion.Kind.CANCELLED, face.CancellationError),
- grpc.StatusCode.UNKNOWN: (
- face.Abortion.Kind.REMOTE_FAILURE, face.RemoteError),
- grpc.StatusCode.DEADLINE_EXCEEDED: (
- face.Abortion.Kind.EXPIRED, face.ExpirationError),
- grpc.StatusCode.UNIMPLEMENTED: (
- face.Abortion.Kind.LOCAL_FAILURE, face.LocalError),
+ grpc.StatusCode.CANCELLED: (face.Abortion.Kind.CANCELLED,
+ face.CancellationError),
+ grpc.StatusCode.UNKNOWN: (face.Abortion.Kind.REMOTE_FAILURE,
+ face.RemoteError),
+ grpc.StatusCode.DEADLINE_EXCEEDED: (face.Abortion.Kind.EXPIRED,
+ face.ExpirationError),
+ grpc.StatusCode.UNIMPLEMENTED: (face.Abortion.Kind.LOCAL_FAILURE,
+ face.LocalError),
}
def _effective_metadata(metadata, metadata_transformer):
- non_none_metadata = () if metadata is None else metadata
- if metadata_transformer is None:
- return non_none_metadata
- else:
- return metadata_transformer(non_none_metadata)
+ non_none_metadata = () if metadata is None else metadata
+ if metadata_transformer is None:
+ return non_none_metadata
+ else:
+ return metadata_transformer(non_none_metadata)
def _credentials(grpc_call_options):
- return None if grpc_call_options is None else grpc_call_options.credentials
+ return None if grpc_call_options is None else grpc_call_options.credentials
def _abortion(rpc_error_call):
- code = rpc_error_call.code()
- pair = _STATUS_CODE_TO_ABORTION_KIND_AND_ABORTION_ERROR_CLASS.get(code)
- error_kind = face.Abortion.Kind.LOCAL_FAILURE if pair is None else pair[0]
- return face.Abortion(
- error_kind, rpc_error_call.initial_metadata(),
- rpc_error_call.trailing_metadata(), code, rpc_error_call.details())
+ code = rpc_error_call.code()
+ pair = _STATUS_CODE_TO_ABORTION_KIND_AND_ABORTION_ERROR_CLASS.get(code)
+ error_kind = face.Abortion.Kind.LOCAL_FAILURE if pair is None else pair[0]
+ return face.Abortion(error_kind,
+ rpc_error_call.initial_metadata(),
+ rpc_error_call.trailing_metadata(), code,
+ rpc_error_call.details())
def _abortion_error(rpc_error_call):
- code = rpc_error_call.code()
- pair = _STATUS_CODE_TO_ABORTION_KIND_AND_ABORTION_ERROR_CLASS.get(code)
- exception_class = face.AbortionError if pair is None else pair[1]
- return exception_class(
- rpc_error_call.initial_metadata(), rpc_error_call.trailing_metadata(),
- code, rpc_error_call.details())
+ code = rpc_error_call.code()
+ pair = _STATUS_CODE_TO_ABORTION_KIND_AND_ABORTION_ERROR_CLASS.get(code)
+ exception_class = face.AbortionError if pair is None else pair[1]
+ return exception_class(rpc_error_call.initial_metadata(),
+ rpc_error_call.trailing_metadata(), code,
+ rpc_error_call.details())
class _InvocationProtocolContext(interfaces.GRPCInvocationContext):
- def disable_next_request_compression(self):
- pass # TODO(https://github.com/grpc/grpc/issues/4078): design, implement.
+ def disable_next_request_compression(self):
+ pass # TODO(https://github.com/grpc/grpc/issues/4078): design, implement.
class _Rendezvous(future.Future, face.Call):
- def __init__(self, response_future, response_iterator, call):
- self._future = response_future
- self._iterator = response_iterator
- self._call = call
+ def __init__(self, response_future, response_iterator, call):
+ self._future = response_future
+ self._iterator = response_iterator
+ self._call = call
- def cancel(self):
- return self._call.cancel()
+ def cancel(self):
+ return self._call.cancel()
- def cancelled(self):
- return self._future.cancelled()
+ def cancelled(self):
+ return self._future.cancelled()
- def running(self):
- return self._future.running()
+ def running(self):
+ return self._future.running()
- def done(self):
- return self._future.done()
+ def done(self):
+ return self._future.done()
- def result(self, timeout=None):
- try:
- return self._future.result(timeout=timeout)
- except grpc.RpcError as rpc_error_call:
- raise _abortion_error(rpc_error_call)
- except grpc.FutureTimeoutError:
- raise future.TimeoutError()
- except grpc.FutureCancelledError:
- raise future.CancelledError()
+ def result(self, timeout=None):
+ try:
+ return self._future.result(timeout=timeout)
+ except grpc.RpcError as rpc_error_call:
+ raise _abortion_error(rpc_error_call)
+ except grpc.FutureTimeoutError:
+ raise future.TimeoutError()
+ except grpc.FutureCancelledError:
+ raise future.CancelledError()
- def exception(self, timeout=None):
- try:
- rpc_error_call = self._future.exception(timeout=timeout)
- if rpc_error_call is None:
- return None
- else:
- return _abortion_error(rpc_error_call)
- except grpc.FutureTimeoutError:
- raise future.TimeoutError()
- except grpc.FutureCancelledError:
- raise future.CancelledError()
-
- def traceback(self, timeout=None):
- try:
- return self._future.traceback(timeout=timeout)
- except grpc.FutureTimeoutError:
- raise future.TimeoutError()
- except grpc.FutureCancelledError:
- raise future.CancelledError()
+ def exception(self, timeout=None):
+ try:
+ rpc_error_call = self._future.exception(timeout=timeout)
+ if rpc_error_call is None:
+ return None
+ else:
+ return _abortion_error(rpc_error_call)
+ except grpc.FutureTimeoutError:
+ raise future.TimeoutError()
+ except grpc.FutureCancelledError:
+ raise future.CancelledError()
- def add_done_callback(self, fn):
- self._future.add_done_callback(lambda ignored_callback: fn(self))
+ def traceback(self, timeout=None):
+ try:
+ return self._future.traceback(timeout=timeout)
+ except grpc.FutureTimeoutError:
+ raise future.TimeoutError()
+ except grpc.FutureCancelledError:
+ raise future.CancelledError()
- def __iter__(self):
- return self
+ def add_done_callback(self, fn):
+ self._future.add_done_callback(lambda ignored_callback: fn(self))
- def _next(self):
- try:
- return next(self._iterator)
- except grpc.RpcError as rpc_error_call:
- raise _abortion_error(rpc_error_call)
+ def __iter__(self):
+ return self
+
+ def _next(self):
+ try:
+ return next(self._iterator)
+ except grpc.RpcError as rpc_error_call:
+ raise _abortion_error(rpc_error_call)
+
+ def __next__(self):
+ return self._next()
- def __next__(self):
- return self._next()
+ def next(self):
+ return self._next()
- def next(self):
- return self._next()
+ def is_active(self):
+ return self._call.is_active()
- def is_active(self):
- return self._call.is_active()
+ def time_remaining(self):
+ return self._call.time_remaining()
- def time_remaining(self):
- return self._call.time_remaining()
+ def add_abortion_callback(self, abortion_callback):
- def add_abortion_callback(self, abortion_callback):
- def done_callback():
- if self.code() is not grpc.StatusCode.OK:
- abortion_callback(_abortion(self._call))
- registered = self._call.add_callback(done_callback)
- return None if registered else done_callback()
+ def done_callback():
+ if self.code() is not grpc.StatusCode.OK:
+ abortion_callback(_abortion(self._call))
- def protocol_context(self):
- return _InvocationProtocolContext()
+ registered = self._call.add_callback(done_callback)
+ return None if registered else done_callback()
- def initial_metadata(self):
- return self._call.initial_metadata()
+ def protocol_context(self):
+ return _InvocationProtocolContext()
- def terminal_metadata(self):
- return self._call.terminal_metadata()
+ def initial_metadata(self):
+ return self._call.initial_metadata()
- def code(self):
- return self._call.code()
+ def terminal_metadata(self):
+ return self._call.terminal_metadata()
- def details(self):
- return self._call.details()
+ def code(self):
+ return self._call.code()
+ def details(self):
+ return self._call.details()
-def _blocking_unary_unary(
- channel, group, method, timeout, with_call, protocol_options, metadata,
- metadata_transformer, request, request_serializer, response_deserializer):
- try:
+
+def _blocking_unary_unary(channel, group, method, timeout, with_call,
+ protocol_options, metadata, metadata_transformer,
+ request, request_serializer, response_deserializer):
+ try:
+ multi_callable = channel.unary_unary(
+ _common.fully_qualified_method(group, method),
+ request_serializer=request_serializer,
+ response_deserializer=response_deserializer)
+ effective_metadata = _effective_metadata(metadata, metadata_transformer)
+ if with_call:
+ response, call = multi_callable.with_call(
+ request,
+ timeout=timeout,
+ metadata=effective_metadata,
+ credentials=_credentials(protocol_options))
+ return response, _Rendezvous(None, None, call)
+ else:
+ return multi_callable(
+ request,
+ timeout=timeout,
+ metadata=effective_metadata,
+ credentials=_credentials(protocol_options))
+ except grpc.RpcError as rpc_error_call:
+ raise _abortion_error(rpc_error_call)
+
+
+def _future_unary_unary(channel, group, method, timeout, protocol_options,
+ metadata, metadata_transformer, request,
+ request_serializer, response_deserializer):
multi_callable = channel.unary_unary(
_common.fully_qualified_method(group, method),
request_serializer=request_serializer,
response_deserializer=response_deserializer)
effective_metadata = _effective_metadata(metadata, metadata_transformer)
- if with_call:
- response, call = multi_callable.with_call(
- request, timeout=timeout, metadata=effective_metadata,
- credentials=_credentials(protocol_options))
- return response, _Rendezvous(None, None, call)
- else:
- return multi_callable(
- request, timeout=timeout, metadata=effective_metadata,
- credentials=_credentials(protocol_options))
- except grpc.RpcError as rpc_error_call:
- raise _abortion_error(rpc_error_call)
-
-
-def _future_unary_unary(
- channel, group, method, timeout, protocol_options, metadata,
- metadata_transformer, request, request_serializer, response_deserializer):
- multi_callable = channel.unary_unary(
- _common.fully_qualified_method(group, method),
- request_serializer=request_serializer,
- response_deserializer=response_deserializer)
- effective_metadata = _effective_metadata(metadata, metadata_transformer)
- response_future = multi_callable.future(
- request, timeout=timeout, metadata=effective_metadata,
- credentials=_credentials(protocol_options))
- return _Rendezvous(response_future, None, response_future)
-
-
-def _unary_stream(
- channel, group, method, timeout, protocol_options, metadata,
- metadata_transformer, request, request_serializer, response_deserializer):
- multi_callable = channel.unary_stream(
- _common.fully_qualified_method(group, method),
- request_serializer=request_serializer,
- response_deserializer=response_deserializer)
- effective_metadata = _effective_metadata(metadata, metadata_transformer)
- response_iterator = multi_callable(
- request, timeout=timeout, metadata=effective_metadata,
- credentials=_credentials(protocol_options))
- return _Rendezvous(None, response_iterator, response_iterator)
-
-
-def _blocking_stream_unary(
- channel, group, method, timeout, with_call, protocol_options, metadata,
- metadata_transformer, request_iterator, request_serializer,
- response_deserializer):
- try:
+ response_future = multi_callable.future(
+ request,
+ timeout=timeout,
+ metadata=effective_metadata,
+ credentials=_credentials(protocol_options))
+ return _Rendezvous(response_future, None, response_future)
+
+
+def _unary_stream(channel, group, method, timeout, protocol_options, metadata,
+ metadata_transformer, request, request_serializer,
+ response_deserializer):
+ multi_callable = channel.unary_stream(
+ _common.fully_qualified_method(group, method),
+ request_serializer=request_serializer,
+ response_deserializer=response_deserializer)
+ effective_metadata = _effective_metadata(metadata, metadata_transformer)
+ response_iterator = multi_callable(
+ request,
+ timeout=timeout,
+ metadata=effective_metadata,
+ credentials=_credentials(protocol_options))
+ return _Rendezvous(None, response_iterator, response_iterator)
+
+
+def _blocking_stream_unary(channel, group, method, timeout, with_call,
+ protocol_options, metadata, metadata_transformer,
+ request_iterator, request_serializer,
+ response_deserializer):
+ try:
+ multi_callable = channel.stream_unary(
+ _common.fully_qualified_method(group, method),
+ request_serializer=request_serializer,
+ response_deserializer=response_deserializer)
+ effective_metadata = _effective_metadata(metadata, metadata_transformer)
+ if with_call:
+ response, call = multi_callable.with_call(
+ request_iterator,
+ timeout=timeout,
+ metadata=effective_metadata,
+ credentials=_credentials(protocol_options))
+ return response, _Rendezvous(None, None, call)
+ else:
+ return multi_callable(
+ request_iterator,
+ timeout=timeout,
+ metadata=effective_metadata,
+ credentials=_credentials(protocol_options))
+ except grpc.RpcError as rpc_error_call:
+ raise _abortion_error(rpc_error_call)
+
+
+def _future_stream_unary(channel, group, method, timeout, protocol_options,
+ metadata, metadata_transformer, request_iterator,
+ request_serializer, response_deserializer):
multi_callable = channel.stream_unary(
_common.fully_qualified_method(group, method),
request_serializer=request_serializer,
response_deserializer=response_deserializer)
effective_metadata = _effective_metadata(metadata, metadata_transformer)
- if with_call:
- response, call = multi_callable.with_call(
- request_iterator, timeout=timeout, metadata=effective_metadata,
- credentials=_credentials(protocol_options))
- return response, _Rendezvous(None, None, call)
- else:
- return multi_callable(
- request_iterator, timeout=timeout, metadata=effective_metadata,
- credentials=_credentials(protocol_options))
- except grpc.RpcError as rpc_error_call:
- raise _abortion_error(rpc_error_call)
-
-
-def _future_stream_unary(
- channel, group, method, timeout, protocol_options, metadata,
- metadata_transformer, request_iterator, request_serializer,
- response_deserializer):
- multi_callable = channel.stream_unary(
- _common.fully_qualified_method(group, method),
- request_serializer=request_serializer,
- response_deserializer=response_deserializer)
- effective_metadata = _effective_metadata(metadata, metadata_transformer)
- response_future = multi_callable.future(
- request_iterator, timeout=timeout, metadata=effective_metadata,
- credentials=_credentials(protocol_options))
- return _Rendezvous(response_future, None, response_future)
-
-
-def _stream_stream(
- channel, group, method, timeout, protocol_options, metadata,
- metadata_transformer, request_iterator, request_serializer,
- response_deserializer):
- multi_callable = channel.stream_stream(
- _common.fully_qualified_method(group, method),
- request_serializer=request_serializer,
- response_deserializer=response_deserializer)
- effective_metadata = _effective_metadata(metadata, metadata_transformer)
- response_iterator = multi_callable(
- request_iterator, timeout=timeout, metadata=effective_metadata,
- credentials=_credentials(protocol_options))
- return _Rendezvous(None, response_iterator, response_iterator)
+ response_future = multi_callable.future(
+ request_iterator,
+ timeout=timeout,
+ metadata=effective_metadata,
+ credentials=_credentials(protocol_options))
+ return _Rendezvous(response_future, None, response_future)
+
+
+def _stream_stream(channel, group, method, timeout, protocol_options, metadata,
+ metadata_transformer, request_iterator, request_serializer,
+ response_deserializer):
+ multi_callable = channel.stream_stream(
+ _common.fully_qualified_method(group, method),
+ request_serializer=request_serializer,
+ response_deserializer=response_deserializer)
+ effective_metadata = _effective_metadata(metadata, metadata_transformer)
+ response_iterator = multi_callable(
+ request_iterator,
+ timeout=timeout,
+ metadata=effective_metadata,
+ credentials=_credentials(protocol_options))
+ return _Rendezvous(None, response_iterator, response_iterator)
class _UnaryUnaryMultiCallable(face.UnaryUnaryMultiCallable):
- def __init__(
- self, channel, group, method, metadata_transformer, request_serializer,
- response_deserializer):
- self._channel = channel
- self._group = group
- self._method = method
- self._metadata_transformer = metadata_transformer
- self._request_serializer = request_serializer
- self._response_deserializer = response_deserializer
-
- def __call__(
- self, request, timeout, metadata=None, with_call=False,
- protocol_options=None):
- return _blocking_unary_unary(
- self._channel, self._group, self._method, timeout, with_call,
- protocol_options, metadata, self._metadata_transformer, request,
- self._request_serializer, self._response_deserializer)
-
- def future(self, request, timeout, metadata=None, protocol_options=None):
- return _future_unary_unary(
- self._channel, self._group, self._method, timeout, protocol_options,
- metadata, self._metadata_transformer, request, self._request_serializer,
- self._response_deserializer)
-
- def event(
- self, request, receiver, abortion_callback, timeout,
- metadata=None, protocol_options=None):
- raise NotImplementedError()
+ def __init__(self, channel, group, method, metadata_transformer,
+ request_serializer, response_deserializer):
+ self._channel = channel
+ self._group = group
+ self._method = method
+ self._metadata_transformer = metadata_transformer
+ self._request_serializer = request_serializer
+ self._response_deserializer = response_deserializer
+
+ def __call__(self,
+ request,
+ timeout,
+ metadata=None,
+ with_call=False,
+ protocol_options=None):
+ return _blocking_unary_unary(
+ self._channel, self._group, self._method, timeout, with_call,
+ protocol_options, metadata, self._metadata_transformer, request,
+ self._request_serializer, self._response_deserializer)
+
+ def future(self, request, timeout, metadata=None, protocol_options=None):
+ return _future_unary_unary(
+ self._channel, self._group, self._method, timeout, protocol_options,
+ metadata, self._metadata_transformer, request,
+ self._request_serializer, self._response_deserializer)
+
+ def event(self,
+ request,
+ receiver,
+ abortion_callback,
+ timeout,
+ metadata=None,
+ protocol_options=None):
+ raise NotImplementedError()
class _UnaryStreamMultiCallable(face.UnaryStreamMultiCallable):
- def __init__(
- self, channel, group, method, metadata_transformer, request_serializer,
- response_deserializer):
- self._channel = channel
- self._group = group
- self._method = method
- self._metadata_transformer = metadata_transformer
- self._request_serializer = request_serializer
- self._response_deserializer = response_deserializer
-
- def __call__(self, request, timeout, metadata=None, protocol_options=None):
- return _unary_stream(
- self._channel, self._group, self._method, timeout, protocol_options,
- metadata, self._metadata_transformer, request, self._request_serializer,
- self._response_deserializer)
-
- def event(
- self, request, receiver, abortion_callback, timeout,
- metadata=None, protocol_options=None):
- raise NotImplementedError()
+ def __init__(self, channel, group, method, metadata_transformer,
+ request_serializer, response_deserializer):
+ self._channel = channel
+ self._group = group
+ self._method = method
+ self._metadata_transformer = metadata_transformer
+ self._request_serializer = request_serializer
+ self._response_deserializer = response_deserializer
+
+ def __call__(self, request, timeout, metadata=None, protocol_options=None):
+ return _unary_stream(
+ self._channel, self._group, self._method, timeout, protocol_options,
+ metadata, self._metadata_transformer, request,
+ self._request_serializer, self._response_deserializer)
+
+ def event(self,
+ request,
+ receiver,
+ abortion_callback,
+ timeout,
+ metadata=None,
+ protocol_options=None):
+ raise NotImplementedError()
class _StreamUnaryMultiCallable(face.StreamUnaryMultiCallable):
- def __init__(
- self, channel, group, method, metadata_transformer, request_serializer,
- response_deserializer):
- self._channel = channel
- self._group = group
- self._method = method
- self._metadata_transformer = metadata_transformer
- self._request_serializer = request_serializer
- self._response_deserializer = response_deserializer
-
- def __call__(
- self, request_iterator, timeout, metadata=None, with_call=False,
- protocol_options=None):
- return _blocking_stream_unary(
- self._channel, self._group, self._method, timeout, with_call,
- protocol_options, metadata, self._metadata_transformer,
- request_iterator, self._request_serializer, self._response_deserializer)
-
- def future(
- self, request_iterator, timeout, metadata=None, protocol_options=None):
- return _future_stream_unary(
- self._channel, self._group, self._method, timeout, protocol_options,
- metadata, self._metadata_transformer, request_iterator,
- self._request_serializer, self._response_deserializer)
-
- def event(
- self, receiver, abortion_callback, timeout, metadata=None,
- protocol_options=None):
- raise NotImplementedError()
+ def __init__(self, channel, group, method, metadata_transformer,
+ request_serializer, response_deserializer):
+ self._channel = channel
+ self._group = group
+ self._method = method
+ self._metadata_transformer = metadata_transformer
+ self._request_serializer = request_serializer
+ self._response_deserializer = response_deserializer
+
+ def __call__(self,
+ request_iterator,
+ timeout,
+ metadata=None,
+ with_call=False,
+ protocol_options=None):
+ return _blocking_stream_unary(
+ self._channel, self._group, self._method, timeout, with_call,
+ protocol_options, metadata, self._metadata_transformer,
+ request_iterator, self._request_serializer,
+ self._response_deserializer)
+
+ def future(self,
+ request_iterator,
+ timeout,
+ metadata=None,
+ protocol_options=None):
+ return _future_stream_unary(
+ self._channel, self._group, self._method, timeout, protocol_options,
+ metadata, self._metadata_transformer, request_iterator,
+ self._request_serializer, self._response_deserializer)
+
+ def event(self,
+ receiver,
+ abortion_callback,
+ timeout,
+ metadata=None,
+ protocol_options=None):
+ raise NotImplementedError()
class _StreamStreamMultiCallable(face.StreamStreamMultiCallable):
- def __init__(
- self, channel, group, method, metadata_transformer, request_serializer,
- response_deserializer):
- self._channel = channel
- self._group = group
- self._method = method
- self._metadata_transformer = metadata_transformer
- self._request_serializer = request_serializer
- self._response_deserializer = response_deserializer
-
- def __call__(
- self, request_iterator, timeout, metadata=None, protocol_options=None):
- return _stream_stream(
- self._channel, self._group, self._method, timeout, protocol_options,
- metadata, self._metadata_transformer, request_iterator,
- self._request_serializer, self._response_deserializer)
-
- def event(
- self, receiver, abortion_callback, timeout, metadata=None,
- protocol_options=None):
- raise NotImplementedError()
+ def __init__(self, channel, group, method, metadata_transformer,
+ request_serializer, response_deserializer):
+ self._channel = channel
+ self._group = group
+ self._method = method
+ self._metadata_transformer = metadata_transformer
+ self._request_serializer = request_serializer
+ self._response_deserializer = response_deserializer
+
+ def __call__(self,
+ request_iterator,
+ timeout,
+ metadata=None,
+ protocol_options=None):
+ return _stream_stream(
+ self._channel, self._group, self._method, timeout, protocol_options,
+ metadata, self._metadata_transformer, request_iterator,
+ self._request_serializer, self._response_deserializer)
+
+ def event(self,
+ receiver,
+ abortion_callback,
+ timeout,
+ metadata=None,
+ protocol_options=None):
+ raise NotImplementedError()
class _GenericStub(face.GenericStub):
- def __init__(
- self, channel, metadata_transformer, request_serializers,
- response_deserializers):
- self._channel = channel
- self._metadata_transformer = metadata_transformer
- self._request_serializers = request_serializers or {}
- self._response_deserializers = response_deserializers or {}
-
- def blocking_unary_unary(
- self, group, method, request, timeout, metadata=None,
- with_call=None, protocol_options=None):
- request_serializer = self._request_serializers.get((group, method,))
- response_deserializer = self._response_deserializers.get((group, method,))
- return _blocking_unary_unary(
- self._channel, group, method, timeout, with_call, protocol_options,
- metadata, self._metadata_transformer, request, request_serializer,
- response_deserializer)
-
- def future_unary_unary(
- self, group, method, request, timeout, metadata=None,
- protocol_options=None):
- request_serializer = self._request_serializers.get((group, method,))
- response_deserializer = self._response_deserializers.get((group, method,))
- return _future_unary_unary(
- self._channel, group, method, timeout, protocol_options, metadata,
- self._metadata_transformer, request, request_serializer,
- response_deserializer)
-
- def inline_unary_stream(
- self, group, method, request, timeout, metadata=None,
- protocol_options=None):
- request_serializer = self._request_serializers.get((group, method,))
- response_deserializer = self._response_deserializers.get((group, method,))
- return _unary_stream(
- self._channel, group, method, timeout, protocol_options, metadata,
- self._metadata_transformer, request, request_serializer,
- response_deserializer)
-
- def blocking_stream_unary(
- self, group, method, request_iterator, timeout, metadata=None,
- with_call=None, protocol_options=None):
- request_serializer = self._request_serializers.get((group, method,))
- response_deserializer = self._response_deserializers.get((group, method,))
- return _blocking_stream_unary(
- self._channel, group, method, timeout, with_call, protocol_options,
- metadata, self._metadata_transformer, request_iterator,
- request_serializer, response_deserializer)
-
- def future_stream_unary(
- self, group, method, request_iterator, timeout, metadata=None,
- protocol_options=None):
- request_serializer = self._request_serializers.get((group, method,))
- response_deserializer = self._response_deserializers.get((group, method,))
- return _future_stream_unary(
- self._channel, group, method, timeout, protocol_options, metadata,
- self._metadata_transformer, request_iterator, request_serializer,
- response_deserializer)
-
- def inline_stream_stream(
- self, group, method, request_iterator, timeout, metadata=None,
- protocol_options=None):
- request_serializer = self._request_serializers.get((group, method,))
- response_deserializer = self._response_deserializers.get((group, method,))
- return _stream_stream(
- self._channel, group, method, timeout, protocol_options, metadata,
- self._metadata_transformer, request_iterator, request_serializer,
- response_deserializer)
-
- def event_unary_unary(
- self, group, method, request, receiver, abortion_callback, timeout,
- metadata=None, protocol_options=None):
- raise NotImplementedError()
-
- def event_unary_stream(
- self, group, method, request, receiver, abortion_callback, timeout,
- metadata=None, protocol_options=None):
- raise NotImplementedError()
-
- def event_stream_unary(
- self, group, method, receiver, abortion_callback, timeout,
- metadata=None, protocol_options=None):
- raise NotImplementedError()
-
- def event_stream_stream(
- self, group, method, receiver, abortion_callback, timeout,
- metadata=None, protocol_options=None):
- raise NotImplementedError()
-
- def unary_unary(self, group, method):
- request_serializer = self._request_serializers.get((group, method,))
- response_deserializer = self._response_deserializers.get((group, method,))
- return _UnaryUnaryMultiCallable(
- self._channel, group, method, self._metadata_transformer,
- request_serializer, response_deserializer)
-
- def unary_stream(self, group, method):
- request_serializer = self._request_serializers.get((group, method,))
- response_deserializer = self._response_deserializers.get((group, method,))
- return _UnaryStreamMultiCallable(
- self._channel, group, method, self._metadata_transformer,
- request_serializer, response_deserializer)
-
- def stream_unary(self, group, method):
- request_serializer = self._request_serializers.get((group, method,))
- response_deserializer = self._response_deserializers.get((group, method,))
- return _StreamUnaryMultiCallable(
- self._channel, group, method, self._metadata_transformer,
- request_serializer, response_deserializer)
-
- def stream_stream(self, group, method):
- request_serializer = self._request_serializers.get((group, method,))
- response_deserializer = self._response_deserializers.get((group, method,))
- return _StreamStreamMultiCallable(
- self._channel, group, method, self._metadata_transformer,
- request_serializer, response_deserializer)
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- return False
+ def __init__(self, channel, metadata_transformer, request_serializers,
+ response_deserializers):
+ self._channel = channel
+ self._metadata_transformer = metadata_transformer
+ self._request_serializers = request_serializers or {}
+ self._response_deserializers = response_deserializers or {}
+
+ def blocking_unary_unary(self,
+ group,
+ method,
+ request,
+ timeout,
+ metadata=None,
+ with_call=None,
+ protocol_options=None):
+ request_serializer = self._request_serializers.get((
+ group,
+ method,))
+ response_deserializer = self._response_deserializers.get((
+ group,
+ method,))
+ return _blocking_unary_unary(self._channel, group, method, timeout,
+ with_call, protocol_options, metadata,
+ self._metadata_transformer, request,
+ request_serializer, response_deserializer)
+
+ def future_unary_unary(self,
+ group,
+ method,
+ request,
+ timeout,
+ metadata=None,
+ protocol_options=None):
+ request_serializer = self._request_serializers.get((
+ group,
+ method,))
+ response_deserializer = self._response_deserializers.get((
+ group,
+ method,))
+ return _future_unary_unary(self._channel, group, method, timeout,
+ protocol_options, metadata,
+ self._metadata_transformer, request,
+ request_serializer, response_deserializer)
+
+ def inline_unary_stream(self,
+ group,
+ method,
+ request,
+ timeout,
+ metadata=None,
+ protocol_options=None):
+ request_serializer = self._request_serializers.get((
+ group,
+ method,))
+ response_deserializer = self._response_deserializers.get((
+ group,
+ method,))
+ return _unary_stream(self._channel, group, method, timeout,
+ protocol_options, metadata,
+ self._metadata_transformer, request,
+ request_serializer, response_deserializer)
+
+ def blocking_stream_unary(self,
+ group,
+ method,
+ request_iterator,
+ timeout,
+ metadata=None,
+ with_call=None,
+ protocol_options=None):
+ request_serializer = self._request_serializers.get((
+ group,
+ method,))
+ response_deserializer = self._response_deserializers.get((
+ group,
+ method,))
+ return _blocking_stream_unary(
+ self._channel, group, method, timeout, with_call, protocol_options,
+ metadata, self._metadata_transformer, request_iterator,
+ request_serializer, response_deserializer)
+
+ def future_stream_unary(self,
+ group,
+ method,
+ request_iterator,
+ timeout,
+ metadata=None,
+ protocol_options=None):
+ request_serializer = self._request_serializers.get((
+ group,
+ method,))
+ response_deserializer = self._response_deserializers.get((
+ group,
+ method,))
+ return _future_stream_unary(
+ self._channel, group, method, timeout, protocol_options, metadata,
+ self._metadata_transformer, request_iterator, request_serializer,
+ response_deserializer)
+
+ def inline_stream_stream(self,
+ group,
+ method,
+ request_iterator,
+ timeout,
+ metadata=None,
+ protocol_options=None):
+ request_serializer = self._request_serializers.get((
+ group,
+ method,))
+ response_deserializer = self._response_deserializers.get((
+ group,
+ method,))
+ return _stream_stream(self._channel, group, method, timeout,
+ protocol_options, metadata,
+ self._metadata_transformer, request_iterator,
+ request_serializer, response_deserializer)
+
+ def event_unary_unary(self,
+ group,
+ method,
+ request,
+ receiver,
+ abortion_callback,
+ timeout,
+ metadata=None,
+ protocol_options=None):
+ raise NotImplementedError()
+
+ def event_unary_stream(self,
+ group,
+ method,
+ request,
+ receiver,
+ abortion_callback,
+ timeout,
+ metadata=None,
+ protocol_options=None):
+ raise NotImplementedError()
+
+ def event_stream_unary(self,
+ group,
+ method,
+ receiver,
+ abortion_callback,
+ timeout,
+ metadata=None,
+ protocol_options=None):
+ raise NotImplementedError()
+
+ def event_stream_stream(self,
+ group,
+ method,
+ receiver,
+ abortion_callback,
+ timeout,
+ metadata=None,
+ protocol_options=None):
+ raise NotImplementedError()
+
+ def unary_unary(self, group, method):
+ request_serializer = self._request_serializers.get((
+ group,
+ method,))
+ response_deserializer = self._response_deserializers.get((
+ group,
+ method,))
+ return _UnaryUnaryMultiCallable(
+ self._channel, group, method, self._metadata_transformer,
+ request_serializer, response_deserializer)
+
+ def unary_stream(self, group, method):
+ request_serializer = self._request_serializers.get((
+ group,
+ method,))
+ response_deserializer = self._response_deserializers.get((
+ group,
+ method,))
+ return _UnaryStreamMultiCallable(
+ self._channel, group, method, self._metadata_transformer,
+ request_serializer, response_deserializer)
+
+ def stream_unary(self, group, method):
+ request_serializer = self._request_serializers.get((
+ group,
+ method,))
+ response_deserializer = self._response_deserializers.get((
+ group,
+ method,))
+ return _StreamUnaryMultiCallable(
+ self._channel, group, method, self._metadata_transformer,
+ request_serializer, response_deserializer)
+
+ def stream_stream(self, group, method):
+ request_serializer = self._request_serializers.get((
+ group,
+ method,))
+ response_deserializer = self._response_deserializers.get((
+ group,
+ method,))
+ return _StreamStreamMultiCallable(
+ self._channel, group, method, self._metadata_transformer,
+ request_serializer, response_deserializer)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ return False
class _DynamicStub(face.DynamicStub):
- def __init__(self, generic_stub, group, cardinalities):
- self._generic_stub = generic_stub
- self._group = group
- self._cardinalities = cardinalities
-
- def __getattr__(self, attr):
- method_cardinality = self._cardinalities.get(attr)
- if method_cardinality is cardinality.Cardinality.UNARY_UNARY:
- return self._generic_stub.unary_unary(self._group, attr)
- elif method_cardinality is cardinality.Cardinality.UNARY_STREAM:
- return self._generic_stub.unary_stream(self._group, attr)
- elif method_cardinality is cardinality.Cardinality.STREAM_UNARY:
- return self._generic_stub.stream_unary(self._group, attr)
- elif method_cardinality is cardinality.Cardinality.STREAM_STREAM:
- return self._generic_stub.stream_stream(self._group, attr)
- else:
- raise AttributeError('_DynamicStub object has no attribute "%s"!' % attr)
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- return False
-
-
-def generic_stub(
- channel, host, metadata_transformer, request_serializers,
- response_deserializers):
- return _GenericStub(
- channel, metadata_transformer, request_serializers,
- response_deserializers)
-
-
-def dynamic_stub(
- channel, service, cardinalities, host, metadata_transformer,
- request_serializers, response_deserializers):
- return _DynamicStub(
- _GenericStub(
- channel, metadata_transformer, request_serializers,
- response_deserializers),
- service, cardinalities)
+ def __init__(self, generic_stub, group, cardinalities):
+ self._generic_stub = generic_stub
+ self._group = group
+ self._cardinalities = cardinalities
+
+ def __getattr__(self, attr):
+ method_cardinality = self._cardinalities.get(attr)
+ if method_cardinality is cardinality.Cardinality.UNARY_UNARY:
+ return self._generic_stub.unary_unary(self._group, attr)
+ elif method_cardinality is cardinality.Cardinality.UNARY_STREAM:
+ return self._generic_stub.unary_stream(self._group, attr)
+ elif method_cardinality is cardinality.Cardinality.STREAM_UNARY:
+ return self._generic_stub.stream_unary(self._group, attr)
+ elif method_cardinality is cardinality.Cardinality.STREAM_STREAM:
+ return self._generic_stub.stream_stream(self._group, attr)
+ else:
+ raise AttributeError('_DynamicStub object has no attribute "%s"!' %
+ attr)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ return False
+
+
+def generic_stub(channel, host, metadata_transformer, request_serializers,
+ response_deserializers):
+ return _GenericStub(channel, metadata_transformer, request_serializers,
+ response_deserializers)
+
+
+def dynamic_stub(channel, service, cardinalities, host, metadata_transformer,
+ request_serializers, response_deserializers):
+ return _DynamicStub(
+ _GenericStub(channel, metadata_transformer, request_serializers,
+ response_deserializers), service, cardinalities)
diff --git a/src/python/grpcio/grpc/beta/_connectivity_channel.py b/src/python/grpcio/grpc/beta/_connectivity_channel.py
index 61674a70ad..39020d2b4e 100644
--- a/src/python/grpcio/grpc/beta/_connectivity_channel.py
+++ b/src/python/grpcio/grpc/beta/_connectivity_channel.py
@@ -26,7 +26,6 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
"""Affords a connectivity-state-listenable channel."""
import threading
@@ -41,116 +40,122 @@ _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
'Exception calling channel subscription callback!')
_LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY = {
- state: connectivity for state, connectivity in zip(
- _types.ConnectivityState, interfaces.ChannelConnectivity)
+ state: connectivity
+ for state, connectivity in zip(_types.ConnectivityState,
+ interfaces.ChannelConnectivity)
}
class ConnectivityChannel(object):
- def __init__(self, low_channel):
- self._lock = threading.Lock()
- self._low_channel = low_channel
-
- self._polling = False
- self._connectivity = None
- self._try_to_connect = False
- self._callbacks_and_connectivities = []
- self._delivering = False
-
- def _deliveries(self, connectivity):
- callbacks_needing_update = []
- for callback_and_connectivity in self._callbacks_and_connectivities:
- callback, callback_connectivity = callback_and_connectivity
- if callback_connectivity is not connectivity:
- callbacks_needing_update.append(callback)
- callback_and_connectivity[1] = connectivity
- return callbacks_needing_update
-
- def _deliver(self, initial_connectivity, initial_callbacks):
- connectivity = initial_connectivity
- callbacks = initial_callbacks
- while True:
- for callback in callbacks:
- callable_util.call_logging_exceptions(
- callback, _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE,
- connectivity)
- with self._lock:
- callbacks = self._deliveries(self._connectivity)
- if callbacks:
- connectivity = self._connectivity
- else:
- self._delivering = False
- return
-
- def _spawn_delivery(self, connectivity, callbacks):
- delivering_thread = threading.Thread(
- target=self._deliver, args=(connectivity, callbacks,))
- delivering_thread.start()
- self._delivering = True
+ def __init__(self, low_channel):
+ self._lock = threading.Lock()
+ self._low_channel = low_channel
- # TODO(issue 3064): Don't poll.
- def _poll_connectivity(self, low_channel, initial_try_to_connect):
- try_to_connect = initial_try_to_connect
- low_connectivity = low_channel.check_connectivity_state(try_to_connect)
- with self._lock:
- self._connectivity = _LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
- low_connectivity]
- callbacks = tuple(
- callback for callback, unused_but_known_to_be_none_connectivity
- in self._callbacks_and_connectivities)
- for callback_and_connectivity in self._callbacks_and_connectivities:
- callback_and_connectivity[1] = self._connectivity
- if callbacks:
- self._spawn_delivery(self._connectivity, callbacks)
- completion_queue = _low.CompletionQueue()
- while True:
- low_channel.watch_connectivity_state(
- low_connectivity, time.time() + 0.2, completion_queue, None)
- event = completion_queue.next()
- with self._lock:
- if not self._callbacks_and_connectivities and not self._try_to_connect:
- self._polling = False
- self._connectivity = None
- completion_queue.shutdown()
- break
- try_to_connect = self._try_to_connect
+ self._polling = False
+ self._connectivity = None
self._try_to_connect = False
- if event.success or try_to_connect:
+ self._callbacks_and_connectivities = []
+ self._delivering = False
+
+ def _deliveries(self, connectivity):
+ callbacks_needing_update = []
+ for callback_and_connectivity in self._callbacks_and_connectivities:
+ callback, callback_connectivity = callback_and_connectivity
+ if callback_connectivity is not connectivity:
+ callbacks_needing_update.append(callback)
+ callback_and_connectivity[1] = connectivity
+ return callbacks_needing_update
+
+ def _deliver(self, initial_connectivity, initial_callbacks):
+ connectivity = initial_connectivity
+ callbacks = initial_callbacks
+ while True:
+ for callback in callbacks:
+ callable_util.call_logging_exceptions(
+ callback, _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE,
+ connectivity)
+ with self._lock:
+ callbacks = self._deliveries(self._connectivity)
+ if callbacks:
+ connectivity = self._connectivity
+ else:
+ self._delivering = False
+ return
+
+ def _spawn_delivery(self, connectivity, callbacks):
+ delivering_thread = threading.Thread(
+ target=self._deliver, args=(
+ connectivity,
+ callbacks,))
+ delivering_thread.start()
+ self._delivering = True
+
+ # TODO(issue 3064): Don't poll.
+ def _poll_connectivity(self, low_channel, initial_try_to_connect):
+ try_to_connect = initial_try_to_connect
low_connectivity = low_channel.check_connectivity_state(try_to_connect)
with self._lock:
- self._connectivity = _LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
- low_connectivity]
- if not self._delivering:
- callbacks = self._deliveries(self._connectivity)
+ self._connectivity = _LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
+ low_connectivity]
+ callbacks = tuple(
+ callback
+ for callback, unused_but_known_to_be_none_connectivity in
+ self._callbacks_and_connectivities)
+ for callback_and_connectivity in self._callbacks_and_connectivities:
+ callback_and_connectivity[1] = self._connectivity
if callbacks:
- self._spawn_delivery(self._connectivity, callbacks)
-
- def subscribe(self, callback, try_to_connect):
- with self._lock:
- if not self._callbacks_and_connectivities and not self._polling:
- polling_thread = threading.Thread(
- target=self._poll_connectivity,
- args=(self._low_channel, bool(try_to_connect)))
- polling_thread.start()
- self._polling = True
- self._callbacks_and_connectivities.append([callback, None])
- elif not self._delivering and self._connectivity is not None:
- self._spawn_delivery(self._connectivity, (callback,))
- self._try_to_connect |= bool(try_to_connect)
- self._callbacks_and_connectivities.append(
- [callback, self._connectivity])
- else:
- self._try_to_connect |= bool(try_to_connect)
- self._callbacks_and_connectivities.append([callback, None])
-
- def unsubscribe(self, callback):
- with self._lock:
- for index, (subscribed_callback, unused_connectivity) in enumerate(
- self._callbacks_and_connectivities):
- if callback == subscribed_callback:
- self._callbacks_and_connectivities.pop(index)
- break
-
- def low_channel(self):
- return self._low_channel
+ self._spawn_delivery(self._connectivity, callbacks)
+ completion_queue = _low.CompletionQueue()
+ while True:
+ low_channel.watch_connectivity_state(low_connectivity,
+ time.time() + 0.2,
+ completion_queue, None)
+ event = completion_queue.next()
+ with self._lock:
+ if not self._callbacks_and_connectivities and not self._try_to_connect:
+ self._polling = False
+ self._connectivity = None
+ completion_queue.shutdown()
+ break
+ try_to_connect = self._try_to_connect
+ self._try_to_connect = False
+ if event.success or try_to_connect:
+ low_connectivity = low_channel.check_connectivity_state(
+ try_to_connect)
+ with self._lock:
+ self._connectivity = _LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
+ low_connectivity]
+ if not self._delivering:
+ callbacks = self._deliveries(self._connectivity)
+ if callbacks:
+ self._spawn_delivery(self._connectivity, callbacks)
+
+ def subscribe(self, callback, try_to_connect):
+ with self._lock:
+ if not self._callbacks_and_connectivities and not self._polling:
+ polling_thread = threading.Thread(
+ target=self._poll_connectivity,
+ args=(self._low_channel, bool(try_to_connect)))
+ polling_thread.start()
+ self._polling = True
+ self._callbacks_and_connectivities.append([callback, None])
+ elif not self._delivering and self._connectivity is not None:
+ self._spawn_delivery(self._connectivity, (callback,))
+ self._try_to_connect |= bool(try_to_connect)
+ self._callbacks_and_connectivities.append(
+ [callback, self._connectivity])
+ else:
+ self._try_to_connect |= bool(try_to_connect)
+ self._callbacks_and_connectivities.append([callback, None])
+
+ def unsubscribe(self, callback):
+ with self._lock:
+ for index, (subscribed_callback, unused_connectivity
+ ) in enumerate(self._callbacks_and_connectivities):
+ if callback == subscribed_callback:
+ self._callbacks_and_connectivities.pop(index)
+ break
+
+ def low_channel(self):
+ return self._low_channel
diff --git a/src/python/grpcio/grpc/beta/_server_adaptations.py b/src/python/grpcio/grpc/beta/_server_adaptations.py
index cca4a1797a..bb7c0960d5 100644
--- a/src/python/grpcio/grpc/beta/_server_adaptations.py
+++ b/src/python/grpcio/grpc/beta/_server_adaptations.py
@@ -26,7 +26,6 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
"""Translates gRPC's server-side API into gRPC's server-side Beta API."""
import collections
@@ -47,329 +46,352 @@ _DEFAULT_POOL_SIZE = 8
class _ServerProtocolContext(interfaces.GRPCServicerContext):
- def __init__(self, servicer_context):
- self._servicer_context = servicer_context
+ def __init__(self, servicer_context):
+ self._servicer_context = servicer_context
- def peer(self):
- return self._servicer_context.peer()
+ def peer(self):
+ return self._servicer_context.peer()
- def disable_next_response_compression(self):
- pass # TODO(https://github.com/grpc/grpc/issues/4078): design, implement.
+ def disable_next_response_compression(self):
+ pass # TODO(https://github.com/grpc/grpc/issues/4078): design, implement.
class _FaceServicerContext(face.ServicerContext):
- def __init__(self, servicer_context):
- self._servicer_context = servicer_context
+ def __init__(self, servicer_context):
+ self._servicer_context = servicer_context
- def is_active(self):
- return self._servicer_context.is_active()
+ def is_active(self):
+ return self._servicer_context.is_active()
- def time_remaining(self):
- return self._servicer_context.time_remaining()
+ def time_remaining(self):
+ return self._servicer_context.time_remaining()
- def add_abortion_callback(self, abortion_callback):
- raise NotImplementedError(
- 'add_abortion_callback no longer supported server-side!')
+ def add_abortion_callback(self, abortion_callback):
+ raise NotImplementedError(
+ 'add_abortion_callback no longer supported server-side!')
- def cancel(self):
- self._servicer_context.cancel()
+ def cancel(self):
+ self._servicer_context.cancel()
- def protocol_context(self):
- return _ServerProtocolContext(self._servicer_context)
+ def protocol_context(self):
+ return _ServerProtocolContext(self._servicer_context)
- def invocation_metadata(self):
- return _common.cygrpc_metadata(
- self._servicer_context.invocation_metadata())
+ def invocation_metadata(self):
+ return _common.cygrpc_metadata(
+ self._servicer_context.invocation_metadata())
- def initial_metadata(self, initial_metadata):
- self._servicer_context.send_initial_metadata(initial_metadata)
+ def initial_metadata(self, initial_metadata):
+ self._servicer_context.send_initial_metadata(initial_metadata)
- def terminal_metadata(self, terminal_metadata):
- self._servicer_context.set_terminal_metadata(terminal_metadata)
+ def terminal_metadata(self, terminal_metadata):
+ self._servicer_context.set_terminal_metadata(terminal_metadata)
- def code(self, code):
- self._servicer_context.set_code(code)
+ def code(self, code):
+ self._servicer_context.set_code(code)
- def details(self, details):
- self._servicer_context.set_details(details)
+ def details(self, details):
+ self._servicer_context.set_details(details)
def _adapt_unary_request_inline(unary_request_inline):
- def adaptation(request, servicer_context):
- return unary_request_inline(request, _FaceServicerContext(servicer_context))
- return adaptation
+
+ def adaptation(request, servicer_context):
+ return unary_request_inline(request,
+ _FaceServicerContext(servicer_context))
+
+ return adaptation
def _adapt_stream_request_inline(stream_request_inline):
- def adaptation(request_iterator, servicer_context):
- return stream_request_inline(
- request_iterator, _FaceServicerContext(servicer_context))
- return adaptation
+
+ def adaptation(request_iterator, servicer_context):
+ return stream_request_inline(request_iterator,
+ _FaceServicerContext(servicer_context))
+
+ return adaptation
class _Callback(stream.Consumer):
- def __init__(self):
- self._condition = threading.Condition()
- self._values = []
- self._terminated = False
- self._cancelled = False
-
- def consume(self, value):
- with self._condition:
- self._values.append(value)
- self._condition.notify_all()
-
- def terminate(self):
- with self._condition:
- self._terminated = True
- self._condition.notify_all()
-
- def consume_and_terminate(self, value):
- with self._condition:
- self._values.append(value)
- self._terminated = True
- self._condition.notify_all()
-
- def cancel(self):
- with self._condition:
- self._cancelled = True
- self._condition.notify_all()
-
- def draw_one_value(self):
- with self._condition:
- while True:
- if self._cancelled:
- raise abandonment.Abandoned()
- elif self._values:
- return self._values.pop(0)
- elif self._terminated:
- return None
- else:
- self._condition.wait()
-
- def draw_all_values(self):
- with self._condition:
- while True:
- if self._cancelled:
- raise abandonment.Abandoned()
- elif self._terminated:
- all_values = tuple(self._values)
- self._values = None
- return all_values
- else:
- self._condition.wait()
+ def __init__(self):
+ self._condition = threading.Condition()
+ self._values = []
+ self._terminated = False
+ self._cancelled = False
+
+ def consume(self, value):
+ with self._condition:
+ self._values.append(value)
+ self._condition.notify_all()
+
+ def terminate(self):
+ with self._condition:
+ self._terminated = True
+ self._condition.notify_all()
+
+ def consume_and_terminate(self, value):
+ with self._condition:
+ self._values.append(value)
+ self._terminated = True
+ self._condition.notify_all()
+
+ def cancel(self):
+ with self._condition:
+ self._cancelled = True
+ self._condition.notify_all()
+
+ def draw_one_value(self):
+ with self._condition:
+ while True:
+ if self._cancelled:
+ raise abandonment.Abandoned()
+ elif self._values:
+ return self._values.pop(0)
+ elif self._terminated:
+ return None
+ else:
+ self._condition.wait()
+
+ def draw_all_values(self):
+ with self._condition:
+ while True:
+ if self._cancelled:
+ raise abandonment.Abandoned()
+ elif self._terminated:
+ all_values = tuple(self._values)
+ self._values = None
+ return all_values
+ else:
+ self._condition.wait()
def _run_request_pipe_thread(request_iterator, request_consumer,
servicer_context):
- thread_joined = threading.Event()
- def pipe_requests():
- for request in request_iterator:
- if not servicer_context.is_active() or thread_joined.is_set():
- return
- request_consumer.consume(request)
- if not servicer_context.is_active() or thread_joined.is_set():
- return
- request_consumer.terminate()
+ thread_joined = threading.Event()
+
+ def pipe_requests():
+ for request in request_iterator:
+ if not servicer_context.is_active() or thread_joined.is_set():
+ return
+ request_consumer.consume(request)
+ if not servicer_context.is_active() or thread_joined.is_set():
+ return
+ request_consumer.terminate()
- def stop_request_pipe(timeout):
- thread_joined.set()
+ def stop_request_pipe(timeout):
+ thread_joined.set()
- request_pipe_thread = _common.CleanupThread(
- stop_request_pipe, target=pipe_requests)
- request_pipe_thread.start()
+ request_pipe_thread = _common.CleanupThread(
+ stop_request_pipe, target=pipe_requests)
+ request_pipe_thread.start()
def _adapt_unary_unary_event(unary_unary_event):
- def adaptation(request, servicer_context):
- callback = _Callback()
- if not servicer_context.add_callback(callback.cancel):
- raise abandonment.Abandoned()
- unary_unary_event(
- request, callback.consume_and_terminate,
- _FaceServicerContext(servicer_context))
- return callback.draw_all_values()[0]
- return adaptation
+
+ def adaptation(request, servicer_context):
+ callback = _Callback()
+ if not servicer_context.add_callback(callback.cancel):
+ raise abandonment.Abandoned()
+ unary_unary_event(request, callback.consume_and_terminate,
+ _FaceServicerContext(servicer_context))
+ return callback.draw_all_values()[0]
+
+ return adaptation
def _adapt_unary_stream_event(unary_stream_event):
- def adaptation(request, servicer_context):
- callback = _Callback()
- if not servicer_context.add_callback(callback.cancel):
- raise abandonment.Abandoned()
- unary_stream_event(
- request, callback, _FaceServicerContext(servicer_context))
- while True:
- response = callback.draw_one_value()
- if response is None:
- return
- else:
- yield response
- return adaptation
+
+ def adaptation(request, servicer_context):
+ callback = _Callback()
+ if not servicer_context.add_callback(callback.cancel):
+ raise abandonment.Abandoned()
+ unary_stream_event(request, callback,
+ _FaceServicerContext(servicer_context))
+ while True:
+ response = callback.draw_one_value()
+ if response is None:
+ return
+ else:
+ yield response
+
+ return adaptation
def _adapt_stream_unary_event(stream_unary_event):
- def adaptation(request_iterator, servicer_context):
- callback = _Callback()
- if not servicer_context.add_callback(callback.cancel):
- raise abandonment.Abandoned()
- request_consumer = stream_unary_event(
- callback.consume_and_terminate, _FaceServicerContext(servicer_context))
- _run_request_pipe_thread(
- request_iterator, request_consumer, servicer_context)
- return callback.draw_all_values()[0]
- return adaptation
+
+ def adaptation(request_iterator, servicer_context):
+ callback = _Callback()
+ if not servicer_context.add_callback(callback.cancel):
+ raise abandonment.Abandoned()
+ request_consumer = stream_unary_event(
+ callback.consume_and_terminate,
+ _FaceServicerContext(servicer_context))
+ _run_request_pipe_thread(request_iterator, request_consumer,
+ servicer_context)
+ return callback.draw_all_values()[0]
+
+ return adaptation
def _adapt_stream_stream_event(stream_stream_event):
- def adaptation(request_iterator, servicer_context):
- callback = _Callback()
- if not servicer_context.add_callback(callback.cancel):
- raise abandonment.Abandoned()
- request_consumer = stream_stream_event(
- callback, _FaceServicerContext(servicer_context))
- _run_request_pipe_thread(
- request_iterator, request_consumer, servicer_context)
- while True:
- response = callback.draw_one_value()
- if response is None:
- return
- else:
- yield response
- return adaptation
+
+ def adaptation(request_iterator, servicer_context):
+ callback = _Callback()
+ if not servicer_context.add_callback(callback.cancel):
+ raise abandonment.Abandoned()
+ request_consumer = stream_stream_event(
+ callback, _FaceServicerContext(servicer_context))
+ _run_request_pipe_thread(request_iterator, request_consumer,
+ servicer_context)
+ while True:
+ response = callback.draw_one_value()
+ if response is None:
+ return
+ else:
+ yield response
+
+ return adaptation
class _SimpleMethodHandler(
- collections.namedtuple(
- '_MethodHandler',
- ('request_streaming', 'response_streaming', 'request_deserializer',
- 'response_serializer', 'unary_unary', 'unary_stream', 'stream_unary',
- 'stream_stream',)),
- grpc.RpcMethodHandler):
- pass
-
-
-def _simple_method_handler(
- implementation, request_deserializer, response_serializer):
- if implementation.style is style.Service.INLINE:
- if implementation.cardinality is cardinality.Cardinality.UNARY_UNARY:
- return _SimpleMethodHandler(
- False, False, request_deserializer, response_serializer,
- _adapt_unary_request_inline(implementation.unary_unary_inline), None,
- None, None)
- elif implementation.cardinality is cardinality.Cardinality.UNARY_STREAM:
- return _SimpleMethodHandler(
- False, True, request_deserializer, response_serializer, None,
- _adapt_unary_request_inline(implementation.unary_stream_inline), None,
- None)
- elif implementation.cardinality is cardinality.Cardinality.STREAM_UNARY:
- return _SimpleMethodHandler(
- True, False, request_deserializer, response_serializer, None, None,
- _adapt_stream_request_inline(implementation.stream_unary_inline),
- None)
- elif implementation.cardinality is cardinality.Cardinality.STREAM_STREAM:
- return _SimpleMethodHandler(
- True, True, request_deserializer, response_serializer, None, None,
- None,
- _adapt_stream_request_inline(implementation.stream_stream_inline))
- elif implementation.style is style.Service.EVENT:
- if implementation.cardinality is cardinality.Cardinality.UNARY_UNARY:
- return _SimpleMethodHandler(
- False, False, request_deserializer, response_serializer,
- _adapt_unary_unary_event(implementation.unary_unary_event), None,
- None, None)
- elif implementation.cardinality is cardinality.Cardinality.UNARY_STREAM:
- return _SimpleMethodHandler(
- False, True, request_deserializer, response_serializer, None,
- _adapt_unary_stream_event(implementation.unary_stream_event), None,
- None)
- elif implementation.cardinality is cardinality.Cardinality.STREAM_UNARY:
- return _SimpleMethodHandler(
- True, False, request_deserializer, response_serializer, None, None,
- _adapt_stream_unary_event(implementation.stream_unary_event), None)
- elif implementation.cardinality is cardinality.Cardinality.STREAM_STREAM:
- return _SimpleMethodHandler(
- True, True, request_deserializer, response_serializer, None, None,
- None, _adapt_stream_stream_event(implementation.stream_stream_event))
+ collections.namedtuple('_MethodHandler', (
+ 'request_streaming',
+ 'response_streaming',
+ 'request_deserializer',
+ 'response_serializer',
+ 'unary_unary',
+ 'unary_stream',
+ 'stream_unary',
+ 'stream_stream',)), grpc.RpcMethodHandler):
+ pass
+
+
+def _simple_method_handler(implementation, request_deserializer,
+ response_serializer):
+ if implementation.style is style.Service.INLINE:
+ if implementation.cardinality is cardinality.Cardinality.UNARY_UNARY:
+ return _SimpleMethodHandler(
+ False, False, request_deserializer, response_serializer,
+ _adapt_unary_request_inline(implementation.unary_unary_inline),
+ None, None, None)
+ elif implementation.cardinality is cardinality.Cardinality.UNARY_STREAM:
+ return _SimpleMethodHandler(
+ False, True, request_deserializer, response_serializer, None,
+ _adapt_unary_request_inline(implementation.unary_stream_inline),
+ None, None)
+ elif implementation.cardinality is cardinality.Cardinality.STREAM_UNARY:
+ return _SimpleMethodHandler(True, False, request_deserializer,
+ response_serializer, None, None,
+ _adapt_stream_request_inline(
+ implementation.stream_unary_inline),
+ None)
+ elif implementation.cardinality is cardinality.Cardinality.STREAM_STREAM:
+ return _SimpleMethodHandler(
+ True, True, request_deserializer, response_serializer, None,
+ None, None,
+ _adapt_stream_request_inline(
+ implementation.stream_stream_inline))
+ elif implementation.style is style.Service.EVENT:
+ if implementation.cardinality is cardinality.Cardinality.UNARY_UNARY:
+ return _SimpleMethodHandler(
+ False, False, request_deserializer, response_serializer,
+ _adapt_unary_unary_event(implementation.unary_unary_event),
+ None, None, None)
+ elif implementation.cardinality is cardinality.Cardinality.UNARY_STREAM:
+ return _SimpleMethodHandler(
+ False, True, request_deserializer, response_serializer, None,
+ _adapt_unary_stream_event(implementation.unary_stream_event),
+ None, None)
+ elif implementation.cardinality is cardinality.Cardinality.STREAM_UNARY:
+ return _SimpleMethodHandler(
+ True, False, request_deserializer, response_serializer, None,
+ None,
+ _adapt_stream_unary_event(implementation.stream_unary_event),
+ None)
+ elif implementation.cardinality is cardinality.Cardinality.STREAM_STREAM:
+ return _SimpleMethodHandler(
+ True, True, request_deserializer, response_serializer, None,
+ None, None,
+ _adapt_stream_stream_event(implementation.stream_stream_event))
def _flatten_method_pair_map(method_pair_map):
- method_pair_map = method_pair_map or {}
- flat_map = {}
- for method_pair in method_pair_map:
- method = _common.fully_qualified_method(method_pair[0], method_pair[1])
- flat_map[method] = method_pair_map[method_pair]
- return flat_map
+ method_pair_map = method_pair_map or {}
+ flat_map = {}
+ for method_pair in method_pair_map:
+ method = _common.fully_qualified_method(method_pair[0], method_pair[1])
+ flat_map[method] = method_pair_map[method_pair]
+ return flat_map
class _GenericRpcHandler(grpc.GenericRpcHandler):
- def __init__(
- self, method_implementations, multi_method_implementation,
- request_deserializers, response_serializers):
- self._method_implementations = _flatten_method_pair_map(
- method_implementations)
- self._request_deserializers = _flatten_method_pair_map(
- request_deserializers)
- self._response_serializers = _flatten_method_pair_map(
- response_serializers)
- self._multi_method_implementation = multi_method_implementation
-
- def service(self, handler_call_details):
- method_implementation = self._method_implementations.get(
- handler_call_details.method)
- if method_implementation is not None:
- return _simple_method_handler(
- method_implementation,
- self._request_deserializers.get(handler_call_details.method),
- self._response_serializers.get(handler_call_details.method))
- elif self._multi_method_implementation is None:
- return None
- else:
- try:
- return None #TODO(nathaniel): call the multimethod.
- except face.NoSuchMethodError:
- return None
+ def __init__(self, method_implementations, multi_method_implementation,
+ request_deserializers, response_serializers):
+ self._method_implementations = _flatten_method_pair_map(
+ method_implementations)
+ self._request_deserializers = _flatten_method_pair_map(
+ request_deserializers)
+ self._response_serializers = _flatten_method_pair_map(
+ response_serializers)
+ self._multi_method_implementation = multi_method_implementation
+
+ def service(self, handler_call_details):
+ method_implementation = self._method_implementations.get(
+ handler_call_details.method)
+ if method_implementation is not None:
+ return _simple_method_handler(
+ method_implementation,
+ self._request_deserializers.get(handler_call_details.method),
+ self._response_serializers.get(handler_call_details.method))
+ elif self._multi_method_implementation is None:
+ return None
+ else:
+ try:
+ return None #TODO(nathaniel): call the multimethod.
+ except face.NoSuchMethodError:
+ return None
class _Server(interfaces.Server):
- def __init__(self, server):
- self._server = server
+ def __init__(self, server):
+ self._server = server
- def add_insecure_port(self, address):
- return self._server.add_insecure_port(address)
+ def add_insecure_port(self, address):
+ return self._server.add_insecure_port(address)
- def add_secure_port(self, address, server_credentials):
- return self._server.add_secure_port(address, server_credentials)
+ def add_secure_port(self, address, server_credentials):
+ return self._server.add_secure_port(address, server_credentials)
- def start(self):
- self._server.start()
+ def start(self):
+ self._server.start()
- def stop(self, grace):
- return self._server.stop(grace)
+ def stop(self, grace):
+ return self._server.stop(grace)
- def __enter__(self):
- self._server.start()
- return self
+ def __enter__(self):
+ self._server.start()
+ return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- self._server.stop(None)
- return False
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self._server.stop(None)
+ return False
-def server(
- service_implementations, multi_method_implementation, request_deserializers,
- response_serializers, thread_pool, thread_pool_size):
- generic_rpc_handler = _GenericRpcHandler(
- service_implementations, multi_method_implementation,
- request_deserializers, response_serializers)
- if thread_pool is None:
- effective_thread_pool = logging_pool.pool(
- _DEFAULT_POOL_SIZE if thread_pool_size is None else thread_pool_size)
- else:
- effective_thread_pool = thread_pool
- return _Server(
- grpc.server(effective_thread_pool, handlers=(generic_rpc_handler,)))
+def server(service_implementations, multi_method_implementation,
+ request_deserializers, response_serializers, thread_pool,
+ thread_pool_size):
+ generic_rpc_handler = _GenericRpcHandler(
+ service_implementations, multi_method_implementation,
+ request_deserializers, response_serializers)
+ if thread_pool is None:
+ effective_thread_pool = logging_pool.pool(_DEFAULT_POOL_SIZE
+ if thread_pool_size is None
+ else thread_pool_size)
+ else:
+ effective_thread_pool = thread_pool
+ return _Server(
+ grpc.server(
+ effective_thread_pool, handlers=(generic_rpc_handler,)))
diff --git a/src/python/grpcio/grpc/beta/implementations.py b/src/python/grpcio/grpc/beta/implementations.py
index ab25fd5eec..7093852278 100644
--- a/src/python/grpcio/grpc/beta/implementations.py
+++ b/src/python/grpcio/grpc/beta/implementations.py
@@ -26,7 +26,6 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
"""Entry points into the Beta API of gRPC Python."""
# threading is referenced from specification in this module.
@@ -43,7 +42,6 @@ from grpc.beta import interfaces
from grpc.framework.common import cardinality # pylint: disable=unused-import
from grpc.framework.interfaces.face import face # pylint: disable=unused-import
-
ChannelCredentials = grpc.ChannelCredentials
ssl_channel_credentials = grpc.ssl_channel_credentials
CallCredentials = grpc.CallCredentials
@@ -51,7 +49,7 @@ metadata_call_credentials = grpc.metadata_call_credentials
def google_call_credentials(credentials):
- """Construct CallCredentials from GoogleCredentials.
+ """Construct CallCredentials from GoogleCredentials.
Args:
credentials: A GoogleCredentials object from the oauth2client library.
@@ -59,7 +57,8 @@ def google_call_credentials(credentials):
Returns:
A CallCredentials object for use in a GRPCCallOptions object.
"""
- return metadata_call_credentials(_auth.GoogleCallCredentials(credentials))
+ return metadata_call_credentials(_auth.GoogleCallCredentials(credentials))
+
access_token_call_credentials = grpc.access_token_call_credentials
composite_call_credentials = grpc.composite_call_credentials
@@ -67,18 +66,18 @@ composite_channel_credentials = grpc.composite_channel_credentials
class Channel(object):
- """A channel to a remote host through which RPCs may be conducted.
+ """A channel to a remote host through which RPCs may be conducted.
Only the "subscribe" and "unsubscribe" methods are supported for application
use. This class' instance constructor and all other attributes are
unsupported.
"""
- def __init__(self, channel):
- self._channel = channel
+ def __init__(self, channel):
+ self._channel = channel
- def subscribe(self, callback, try_to_connect=None):
- """Subscribes to this Channel's connectivity.
+ def subscribe(self, callback, try_to_connect=None):
+ """Subscribes to this Channel's connectivity.
Args:
callback: A callable to be invoked and passed an
@@ -90,20 +89,20 @@ class Channel(object):
attempt to connect if it is not already connected and ready to conduct
RPCs.
"""
- self._channel.subscribe(callback, try_to_connect=try_to_connect)
+ self._channel.subscribe(callback, try_to_connect=try_to_connect)
- def unsubscribe(self, callback):
- """Unsubscribes a callback from this Channel's connectivity.
+ def unsubscribe(self, callback):
+ """Unsubscribes a callback from this Channel's connectivity.
Args:
callback: A callable previously registered with this Channel from having
been passed to its "subscribe" method.
"""
- self._channel.unsubscribe(callback)
+ self._channel.unsubscribe(callback)
def insecure_channel(host, port):
- """Creates an insecure Channel to a remote host.
+ """Creates an insecure Channel to a remote host.
Args:
host: The name of the remote host to which to connect.
@@ -113,13 +112,13 @@ def insecure_channel(host, port):
Returns:
A Channel to the remote host through which RPCs may be conducted.
"""
- channel = grpc.insecure_channel(
- host if port is None else '%s:%d' % (host, port))
- return Channel(channel)
+ channel = grpc.insecure_channel(host
+ if port is None else '%s:%d' % (host, port))
+ return Channel(channel)
def secure_channel(host, port, channel_credentials):
- """Creates a secure Channel to a remote host.
+ """Creates a secure Channel to a remote host.
Args:
host: The name of the remote host to which to connect.
@@ -130,37 +129,39 @@ def secure_channel(host, port, channel_credentials):
Returns:
A secure Channel to the remote host through which RPCs may be conducted.
"""
- channel = grpc.secure_channel(
- host if port is None else '%s:%d' % (host, port), channel_credentials)
- return Channel(channel)
+ channel = grpc.secure_channel(host if port is None else
+ '%s:%d' % (host, port), channel_credentials)
+ return Channel(channel)
class StubOptions(object):
- """A value encapsulating the various options for creation of a Stub.
+ """A value encapsulating the various options for creation of a Stub.
This class and its instances have no supported interface - it exists to define
the type of its instances and its instances exist to be passed to other
functions.
"""
- def __init__(
- self, host, request_serializers, response_deserializers,
- metadata_transformer, thread_pool, thread_pool_size):
- self.host = host
- self.request_serializers = request_serializers
- self.response_deserializers = response_deserializers
- self.metadata_transformer = metadata_transformer
- self.thread_pool = thread_pool
- self.thread_pool_size = thread_pool_size
+ def __init__(self, host, request_serializers, response_deserializers,
+ metadata_transformer, thread_pool, thread_pool_size):
+ self.host = host
+ self.request_serializers = request_serializers
+ self.response_deserializers = response_deserializers
+ self.metadata_transformer = metadata_transformer
+ self.thread_pool = thread_pool
+ self.thread_pool_size = thread_pool_size
-_EMPTY_STUB_OPTIONS = StubOptions(
- None, None, None, None, None, None)
+_EMPTY_STUB_OPTIONS = StubOptions(None, None, None, None, None, None)
-def stub_options(
- host=None, request_serializers=None, response_deserializers=None,
- metadata_transformer=None, thread_pool=None, thread_pool_size=None):
- """Creates a StubOptions value to be passed at stub creation.
+
+def stub_options(host=None,
+ request_serializers=None,
+ response_deserializers=None,
+ metadata_transformer=None,
+ thread_pool=None,
+ thread_pool_size=None):
+ """Creates a StubOptions value to be passed at stub creation.
All parameters are optional and should always be passed by keyword.
@@ -180,13 +181,12 @@ def stub_options(
Returns:
A StubOptions value created from the passed parameters.
"""
- return StubOptions(
- host, request_serializers, response_deserializers,
- metadata_transformer, thread_pool, thread_pool_size)
+ return StubOptions(host, request_serializers, response_deserializers,
+ metadata_transformer, thread_pool, thread_pool_size)
def generic_stub(channel, options=None):
- """Creates a face.GenericStub on which RPCs can be made.
+ """Creates a face.GenericStub on which RPCs can be made.
Args:
channel: A Channel for use by the created stub.
@@ -195,16 +195,17 @@ def generic_stub(channel, options=None):
Returns:
A face.GenericStub on which RPCs can be made.
"""
- effective_options = _EMPTY_STUB_OPTIONS if options is None else options
- return _client_adaptations.generic_stub(
- channel._channel, # pylint: disable=protected-access
- effective_options.host, effective_options.metadata_transformer,
- effective_options.request_serializers,
- effective_options.response_deserializers)
+ effective_options = _EMPTY_STUB_OPTIONS if options is None else options
+ return _client_adaptations.generic_stub(
+ channel._channel, # pylint: disable=protected-access
+ effective_options.host,
+ effective_options.metadata_transformer,
+ effective_options.request_serializers,
+ effective_options.response_deserializers)
def dynamic_stub(channel, service, cardinalities, options=None):
- """Creates a face.DynamicStub with which RPCs can be invoked.
+ """Creates a face.DynamicStub with which RPCs can be invoked.
Args:
channel: A Channel for the returned face.DynamicStub to use.
@@ -217,13 +218,15 @@ def dynamic_stub(channel, service, cardinalities, options=None):
Returns:
A face.DynamicStub with which RPCs can be invoked.
"""
- effective_options = StubOptions() if options is None else options
- return _client_adaptations.dynamic_stub(
- channel._channel, # pylint: disable=protected-access
- service, cardinalities, effective_options.host,
- effective_options.metadata_transformer,
- effective_options.request_serializers,
- effective_options.response_deserializers)
+ effective_options = StubOptions() if options is None else options
+ return _client_adaptations.dynamic_stub(
+ channel._channel, # pylint: disable=protected-access
+ service,
+ cardinalities,
+ effective_options.host,
+ effective_options.metadata_transformer,
+ effective_options.request_serializers,
+ effective_options.response_deserializers)
ServerCredentials = grpc.ServerCredentials
@@ -231,34 +234,36 @@ ssl_server_credentials = grpc.ssl_server_credentials
class ServerOptions(object):
- """A value encapsulating the various options for creation of a Server.
+ """A value encapsulating the various options for creation of a Server.
This class and its instances have no supported interface - it exists to define
the type of its instances and its instances exist to be passed to other
functions.
"""
- def __init__(
- self, multi_method_implementation, request_deserializers,
- response_serializers, thread_pool, thread_pool_size, default_timeout,
- maximum_timeout):
- self.multi_method_implementation = multi_method_implementation
- self.request_deserializers = request_deserializers
- self.response_serializers = response_serializers
- self.thread_pool = thread_pool
- self.thread_pool_size = thread_pool_size
- self.default_timeout = default_timeout
- self.maximum_timeout = maximum_timeout
+ def __init__(self, multi_method_implementation, request_deserializers,
+ response_serializers, thread_pool, thread_pool_size,
+ default_timeout, maximum_timeout):
+ self.multi_method_implementation = multi_method_implementation
+ self.request_deserializers = request_deserializers
+ self.response_serializers = response_serializers
+ self.thread_pool = thread_pool
+ self.thread_pool_size = thread_pool_size
+ self.default_timeout = default_timeout
+ self.maximum_timeout = maximum_timeout
+
-_EMPTY_SERVER_OPTIONS = ServerOptions(
- None, None, None, None, None, None, None)
+_EMPTY_SERVER_OPTIONS = ServerOptions(None, None, None, None, None, None, None)
-def server_options(
- multi_method_implementation=None, request_deserializers=None,
- response_serializers=None, thread_pool=None, thread_pool_size=None,
- default_timeout=None, maximum_timeout=None):
- """Creates a ServerOptions value to be passed at server creation.
+def server_options(multi_method_implementation=None,
+ request_deserializers=None,
+ response_serializers=None,
+ thread_pool=None,
+ thread_pool_size=None,
+ default_timeout=None,
+ maximum_timeout=None):
+ """Creates a ServerOptions value to be passed at server creation.
All parameters are optional and should always be passed by keyword.
@@ -282,13 +287,13 @@ def server_options(
Returns:
A StubOptions value created from the passed parameters.
"""
- return ServerOptions(
- multi_method_implementation, request_deserializers, response_serializers,
- thread_pool, thread_pool_size, default_timeout, maximum_timeout)
+ return ServerOptions(multi_method_implementation, request_deserializers,
+ response_serializers, thread_pool, thread_pool_size,
+ default_timeout, maximum_timeout)
def server(service_implementations, options=None):
- """Creates an interfaces.Server with which RPCs can be serviced.
+ """Creates an interfaces.Server with which RPCs can be serviced.
Args:
service_implementations: A dictionary from service name-method name pair to
@@ -299,9 +304,9 @@ def server(service_implementations, options=None):
Returns:
An interfaces.Server with which RPCs can be serviced.
"""
- effective_options = _EMPTY_SERVER_OPTIONS if options is None else options
- return _server_adaptations.server(
- service_implementations, effective_options.multi_method_implementation,
- effective_options.request_deserializers,
- effective_options.response_serializers, effective_options.thread_pool,
- effective_options.thread_pool_size)
+ effective_options = _EMPTY_SERVER_OPTIONS if options is None else options
+ return _server_adaptations.server(
+ service_implementations, effective_options.multi_method_implementation,
+ effective_options.request_deserializers,
+ effective_options.response_serializers, effective_options.thread_pool,
+ effective_options.thread_pool_size)
diff --git a/src/python/grpcio/grpc/beta/interfaces.py b/src/python/grpcio/grpc/beta/interfaces.py
index 90f6bbbfcc..361d1bcffe 100644
--- a/src/python/grpcio/grpc/beta/interfaces.py
+++ b/src/python/grpcio/grpc/beta/interfaces.py
@@ -26,7 +26,6 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
"""Constants and interfaces of the Beta API of gRPC Python."""
import abc
@@ -43,21 +42,21 @@ StatusCode = grpc.StatusCode
class GRPCCallOptions(object):
- """A value encapsulating gRPC-specific options passed on RPC invocation.
+ """A value encapsulating gRPC-specific options passed on RPC invocation.
This class and its instances have no supported interface - it exists to
define the type of its instances and its instances exist to be passed to
other functions.
"""
- def __init__(self, disable_compression, subcall_of, credentials):
- self.disable_compression = disable_compression
- self.subcall_of = subcall_of
- self.credentials = credentials
+ def __init__(self, disable_compression, subcall_of, credentials):
+ self.disable_compression = disable_compression
+ self.subcall_of = subcall_of
+ self.credentials = credentials
def grpc_call_options(disable_compression=False, credentials=None):
- """Creates a GRPCCallOptions value to be passed at RPC invocation.
+ """Creates a GRPCCallOptions value to be passed at RPC invocation.
All parameters are optional and should always be passed by keyword.
@@ -67,7 +66,8 @@ def grpc_call_options(disable_compression=False, credentials=None):
request-unary RPCs.
credentials: A CallCredentials object to use for the invoked RPC.
"""
- return GRPCCallOptions(disable_compression, None, credentials)
+ return GRPCCallOptions(disable_compression, None, credentials)
+
GRPCAuthMetadataContext = grpc.AuthMetadataContext
GRPCAuthMetadataPluginCallback = grpc.AuthMetadataPluginCallback
@@ -75,38 +75,38 @@ GRPCAuthMetadataPlugin = grpc.AuthMetadataPlugin
class GRPCServicerContext(six.with_metaclass(abc.ABCMeta)):
- """Exposes gRPC-specific options and behaviors to code servicing RPCs."""
+ """Exposes gRPC-specific options and behaviors to code servicing RPCs."""
- @abc.abstractmethod
- def peer(self):
- """Identifies the peer that invoked the RPC being serviced.
+ @abc.abstractmethod
+ def peer(self):
+ """Identifies the peer that invoked the RPC being serviced.
Returns:
A string identifying the peer that invoked the RPC being serviced.
"""
- raise NotImplementedError()
+ raise NotImplementedError()
- @abc.abstractmethod
- def disable_next_response_compression(self):
- """Disables compression of the next response passed by the application."""
- raise NotImplementedError()
+ @abc.abstractmethod
+ def disable_next_response_compression(self):
+ """Disables compression of the next response passed by the application."""
+ raise NotImplementedError()
class GRPCInvocationContext(six.with_metaclass(abc.ABCMeta)):
- """Exposes gRPC-specific options and behaviors to code invoking RPCs."""
+ """Exposes gRPC-specific options and behaviors to code invoking RPCs."""
- @abc.abstractmethod
- def disable_next_request_compression(self):
- """Disables compression of the next request passed by the application."""
- raise NotImplementedError()
+ @abc.abstractmethod
+ def disable_next_request_compression(self):
+ """Disables compression of the next request passed by the application."""
+ raise NotImplementedError()
class Server(six.with_metaclass(abc.ABCMeta)):
- """Services RPCs."""
+ """Services RPCs."""
- @abc.abstractmethod
- def add_insecure_port(self, address):
- """Reserves a port for insecure RPC service once this Server becomes active.
+ @abc.abstractmethod
+ def add_insecure_port(self, address):
+ """Reserves a port for insecure RPC service once this Server becomes active.
This method may only be called before calling this Server's start method is
called.
@@ -120,11 +120,11 @@ class Server(six.with_metaclass(abc.ABCMeta)):
in the passed address, but will likely be different if the port number
contained in the passed address was zero.
"""
- raise NotImplementedError()
+ raise NotImplementedError()
- @abc.abstractmethod
- def add_secure_port(self, address, server_credentials):
- """Reserves a port for secure RPC service after this Server becomes active.
+ @abc.abstractmethod
+ def add_secure_port(self, address, server_credentials):
+ """Reserves a port for secure RPC service after this Server becomes active.
This method may only be called before calling this Server's start method is
called.
@@ -139,20 +139,20 @@ class Server(six.with_metaclass(abc.ABCMeta)):
in the passed address, but will likely be different if the port number
contained in the passed address was zero.
"""
- raise NotImplementedError()
+ raise NotImplementedError()
- @abc.abstractmethod
- def start(self):
- """Starts this Server's service of RPCs.
+ @abc.abstractmethod
+ def start(self):
+ """Starts this Server's service of RPCs.
This method may only be called while the server is not serving RPCs (i.e. it
is not idempotent).
"""
- raise NotImplementedError()
+ raise NotImplementedError()
- @abc.abstractmethod
- def stop(self, grace):
- """Stops this Server's service of RPCs.
+ @abc.abstractmethod
+ def stop(self, grace):
+ """Stops this Server's service of RPCs.
All calls to this method immediately stop service of new RPCs. When existing
RPCs are aborted is controlled by the grace period parameter passed to this
@@ -177,4 +177,4 @@ class Server(six.with_metaclass(abc.ABCMeta)):
at the time it was stopped or if all RPCs that it had underway completed
very early in the grace period).
"""
- raise NotImplementedError()
+ raise NotImplementedError()
diff --git a/src/python/grpcio/grpc/beta/utilities.py b/src/python/grpcio/grpc/beta/utilities.py
index fb07a76579..60525350a7 100644
--- a/src/python/grpcio/grpc/beta/utilities.py
+++ b/src/python/grpcio/grpc/beta/utilities.py
@@ -26,7 +26,6 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
"""Utilities for the gRPC Python Beta API."""
import threading
@@ -44,107 +43,107 @@ _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE = (
class _ChannelReadyFuture(future.Future):
- def __init__(self, channel):
- self._condition = threading.Condition()
- self._channel = channel
-
- self._matured = False
- self._cancelled = False
- self._done_callbacks = []
-
- def _block(self, timeout):
- until = None if timeout is None else time.time() + timeout
- with self._condition:
- while True:
- if self._cancelled:
- raise future.CancelledError()
- elif self._matured:
- return
- else:
- if until is None:
- self._condition.wait()
- else:
- remaining = until - time.time()
- if remaining < 0:
- raise future.TimeoutError()
+ def __init__(self, channel):
+ self._condition = threading.Condition()
+ self._channel = channel
+
+ self._matured = False
+ self._cancelled = False
+ self._done_callbacks = []
+
+ def _block(self, timeout):
+ until = None if timeout is None else time.time() + timeout
+ with self._condition:
+ while True:
+ if self._cancelled:
+ raise future.CancelledError()
+ elif self._matured:
+ return
+ else:
+ if until is None:
+ self._condition.wait()
+ else:
+ remaining = until - time.time()
+ if remaining < 0:
+ raise future.TimeoutError()
+ else:
+ self._condition.wait(timeout=remaining)
+
+ def _update(self, connectivity):
+ with self._condition:
+ if (not self._cancelled and
+ connectivity is interfaces.ChannelConnectivity.READY):
+ self._matured = True
+ self._channel.unsubscribe(self._update)
+ self._condition.notify_all()
+ done_callbacks = tuple(self._done_callbacks)
+ self._done_callbacks = None
+ else:
+ return
+
+ for done_callback in done_callbacks:
+ callable_util.call_logging_exceptions(
+ done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self)
+
+ def cancel(self):
+ with self._condition:
+ if not self._matured:
+ self._cancelled = True
+ self._channel.unsubscribe(self._update)
+ self._condition.notify_all()
+ done_callbacks = tuple(self._done_callbacks)
+ self._done_callbacks = None
else:
- self._condition.wait(timeout=remaining)
-
- def _update(self, connectivity):
- with self._condition:
- if (not self._cancelled and
- connectivity is interfaces.ChannelConnectivity.READY):
- self._matured = True
- self._channel.unsubscribe(self._update)
- self._condition.notify_all()
- done_callbacks = tuple(self._done_callbacks)
- self._done_callbacks = None
- else:
- return
-
- for done_callback in done_callbacks:
- callable_util.call_logging_exceptions(
- done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self)
-
- def cancel(self):
- with self._condition:
- if not self._matured:
- self._cancelled = True
- self._channel.unsubscribe(self._update)
- self._condition.notify_all()
- done_callbacks = tuple(self._done_callbacks)
- self._done_callbacks = None
- else:
- return False
-
- for done_callback in done_callbacks:
- callable_util.call_logging_exceptions(
- done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self)
-
- def cancelled(self):
- with self._condition:
- return self._cancelled
-
- def running(self):
- with self._condition:
- return not self._cancelled and not self._matured
-
- def done(self):
- with self._condition:
- return self._cancelled or self._matured
-
- def result(self, timeout=None):
- self._block(timeout)
- return None
-
- def exception(self, timeout=None):
- self._block(timeout)
- return None
-
- def traceback(self, timeout=None):
- self._block(timeout)
- return None
-
- def add_done_callback(self, fn):
- with self._condition:
- if not self._cancelled and not self._matured:
- self._done_callbacks.append(fn)
- return
-
- fn(self)
-
- def start(self):
- with self._condition:
- self._channel.subscribe(self._update, try_to_connect=True)
-
- def __del__(self):
- with self._condition:
- if not self._cancelled and not self._matured:
- self._channel.unsubscribe(self._update)
+ return False
+
+ for done_callback in done_callbacks:
+ callable_util.call_logging_exceptions(
+ done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self)
+
+ def cancelled(self):
+ with self._condition:
+ return self._cancelled
+
+ def running(self):
+ with self._condition:
+ return not self._cancelled and not self._matured
+
+ def done(self):
+ with self._condition:
+ return self._cancelled or self._matured
+
+ def result(self, timeout=None):
+ self._block(timeout)
+ return None
+
+ def exception(self, timeout=None):
+ self._block(timeout)
+ return None
+
+ def traceback(self, timeout=None):
+ self._block(timeout)
+ return None
+
+ def add_done_callback(self, fn):
+ with self._condition:
+ if not self._cancelled and not self._matured:
+ self._done_callbacks.append(fn)
+ return
+
+ fn(self)
+
+ def start(self):
+ with self._condition:
+ self._channel.subscribe(self._update, try_to_connect=True)
+
+ def __del__(self):
+ with self._condition:
+ if not self._cancelled and not self._matured:
+ self._channel.unsubscribe(self._update)
def channel_ready_future(channel):
- """Creates a future.Future tracking when an implementations.Channel is ready.
+ """Creates a future.Future tracking when an implementations.Channel is ready.
Cancelling the returned future.Future does not tell the given
implementations.Channel to abandon attempts it may have been making to
@@ -158,7 +157,6 @@ def channel_ready_future(channel):
A future.Future that matures when the given Channel has connectivity
interfaces.ChannelConnectivity.READY.
"""
- ready_future = _ChannelReadyFuture(channel)
- ready_future.start()
- return ready_future
-
+ ready_future = _ChannelReadyFuture(channel)
+ ready_future.start()
+ return ready_future