diff options
Diffstat (limited to 'src/python/grpcio/grpc/_channel.py')
-rw-r--r-- | src/python/grpcio/grpc/_channel.py | 128 |
1 files changed, 84 insertions, 44 deletions
diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index 53a26727ab..41e9163cd6 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -36,8 +36,8 @@ import time import grpc from grpc import _common from grpc import _grpcio_metadata -from grpc.framework.foundation import callable_util from grpc._cython import cygrpc +from grpc.framework.foundation import callable_util _USER_AGENT = 'Python-gRPC-{}'.format(_grpcio_metadata.__version__) @@ -99,6 +99,22 @@ def _wait_once_until(condition, until): else: condition.wait(timeout=remaining) +_INTERNAL_CALL_ERROR_MESSAGE_FORMAT = ( + 'Internal gRPC call error %d. ' + + 'Please report to https://github.com/grpc/grpc/issues') + +def _check_call_error(call_error, metadata): + if call_error == cygrpc.CallError.invalid_metadata: + raise ValueError('metadata was invalid: %s' % metadata) + elif call_error != cygrpc.CallError.ok: + raise ValueError(_INTERNAL_CALL_ERROR_MESSAGE_FORMAT % call_error) + +def _call_error_set_RPCstate(state, call_error, metadata): + if call_error == cygrpc.CallError.invalid_metadata: + _abort(state, grpc.StatusCode.INTERNAL, 'metadata was invalid: %s' % metadata) + else: + _abort(state, grpc.StatusCode.INTERNAL, + _INTERNAL_CALL_ERROR_MESSAGE_FORMAT % call_error) class _RPCState(object): @@ -358,7 +374,7 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call): if self._state.callbacks is None: return False else: - self._state.callbacks.append(lambda: callback()) + self._state.callbacks.append(callback) return True def initial_metadata(self): @@ -435,10 +451,10 @@ def _end_unary_response_blocking(state, with_call, deadline): class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): def __init__( - self, channel, create_managed_call, method, request_serializer, + self, channel, managed_call, method, request_serializer, response_deserializer): self._channel = channel - self._create_managed_call = create_managed_call + self._managed_call = managed_call self._method = method self._request_serializer = request_serializer self._response_deserializer = response_deserializer @@ -472,7 +488,8 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): None, 0, completion_queue, self._method, None, deadline_timespec) if credentials is not None: call.set_credentials(credentials._credentials) - call.start_client_batch(cygrpc.Operations(operations), None) + call_error = call.start_client_batch(cygrpc.Operations(operations), None) + _check_call_error(call_error, metadata) _handle_event(completion_queue.poll(), state, self._response_deserializer) return state, deadline @@ -490,23 +507,28 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): if rendezvous: return rendezvous else: - call = self._create_managed_call( + call, drive_call = self._managed_call( None, 0, self._method, None, deadline_timespec) if credentials is not None: call.set_credentials(credentials._credentials) event_handler = _event_handler(state, call, self._response_deserializer) with state.condition: - call.start_client_batch(cygrpc.Operations(operations), event_handler) + call_error = call.start_client_batch(cygrpc.Operations(operations), + event_handler) + if call_error != cygrpc.CallError.ok: + _call_error_set_RPCstate(state, call_error, metadata) + return _Rendezvous(state, None, None, deadline) + drive_call() return _Rendezvous(state, call, self._response_deserializer, deadline) class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): def __init__( - self, channel, create_managed_call, method, request_serializer, + self, channel, managed_call, method, request_serializer, response_deserializer): self._channel = channel - self._create_managed_call = create_managed_call + self._managed_call = managed_call self._method = method self._request_serializer = request_serializer self._response_deserializer = response_deserializer @@ -518,7 +540,7 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): raise rendezvous else: state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) - call = self._create_managed_call( + call, drive_call = self._managed_call( None, 0, self._method, None, deadline_timespec) if credentials is not None: call.set_credentials(credentials._credentials) @@ -535,17 +557,22 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS), ) - call.start_client_batch(cygrpc.Operations(operations), event_handler) + call_error = call.start_client_batch(cygrpc.Operations(operations), + event_handler) + if call_error != cygrpc.CallError.ok: + _call_error_set_RPCstate(state, call_error, metadata) + return _Rendezvous(state, None, None, deadline) + drive_call() return _Rendezvous(state, call, self._response_deserializer, deadline) class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): def __init__( - self, channel, create_managed_call, method, request_serializer, + self, channel, managed_call, method, request_serializer, response_deserializer): self._channel = channel - self._create_managed_call = create_managed_call + self._managed_call = managed_call self._method = method self._request_serializer = request_serializer self._response_deserializer = response_deserializer @@ -569,7 +596,8 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): cygrpc.operation_receive_message(_EMPTY_FLAGS), cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS), ) - call.start_client_batch(cygrpc.Operations(operations), None) + call_error = call.start_client_batch(cygrpc.Operations(operations), None) + _check_call_error(call_error, metadata) _consume_request_iterator( request_iterator, state, call, self._request_serializer) while True: @@ -597,7 +625,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): self, request_iterator, timeout=None, metadata=None, credentials=None): deadline, deadline_timespec = _deadline(timeout) state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) - call = self._create_managed_call( + call, drive_call = self._managed_call( None, 0, self._method, None, deadline_timespec) if credentials is not None: call.set_credentials(credentials._credentials) @@ -613,7 +641,12 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): cygrpc.operation_receive_message(_EMPTY_FLAGS), cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS), ) - call.start_client_batch(cygrpc.Operations(operations), event_handler) + call_error = call.start_client_batch(cygrpc.Operations(operations), + event_handler) + if call_error != cygrpc.CallError.ok: + _call_error_set_RPCstate(state, call_error, metadata) + return _Rendezvous(state, None, None, deadline) + drive_call() _consume_request_iterator( request_iterator, state, call, self._request_serializer) return _Rendezvous(state, call, self._response_deserializer, deadline) @@ -622,10 +655,10 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): def __init__( - self, channel, create_managed_call, method, request_serializer, + self, channel, managed_call, method, request_serializer, response_deserializer): self._channel = channel - self._create_managed_call = create_managed_call + self._managed_call = managed_call self._method = method self._request_serializer = request_serializer self._response_deserializer = response_deserializer @@ -634,7 +667,7 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): self, request_iterator, timeout=None, metadata=None, credentials=None): deadline, deadline_timespec = _deadline(timeout) state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None) - call = self._create_managed_call( + call, drive_call = self._managed_call( None, 0, self._method, None, deadline_timespec) if credentials is not None: call.set_credentials(credentials._credentials) @@ -649,7 +682,12 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): _common.cygrpc_metadata(metadata), _EMPTY_FLAGS), cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS), ) - call.start_client_batch(cygrpc.Operations(operations), event_handler) + call_error = call.start_client_batch(cygrpc.Operations(operations), + event_handler) + if call_error != cygrpc.CallError.ok: + _call_error_set_RPCstate(state, call_error, metadata) + return _Rendezvous(state, None, None, deadline) + drive_call() _consume_request_iterator( request_iterator, state, call, self._request_serializer) return _Rendezvous(state, call, self._response_deserializer, deadline) @@ -687,16 +725,13 @@ def _run_channel_spin_thread(state): channel_spin_thread.start() -def _create_channel_managed_call(state): - def create_channel_managed_call(parent, flags, method, host, deadline): - """Creates a managed cygrpc.Call. +def _channel_managed_call_management(state): + def create(parent, flags, method, host, deadline): + """Creates a managed cygrpc.Call and a function to call to drive it. - Callers of this function must conduct at least one operation on the returned - call. The tags associated with operations conducted on the returned call - must be no-argument callables that return None to indicate that this channel - should continue polling for events associated with the call and return the - call itself to indicate that no more events associated with the call will be - generated. + If operations are successfully added to the returned cygrpc.Call, the + returned function must be called. If operations are not successfully added + to the returned cygrpc.Call, the returned function must not be called. Args: parent: A cygrpc.Call to be used as the parent of the created call. @@ -706,18 +741,22 @@ def _create_channel_managed_call(state): deadline: A cygrpc.Timespec to be the deadline of the created call. Returns: - A cygrpc.Call with which to conduct an RPC. + A cygrpc.Call with which to conduct an RPC and a function to call if + operations are successfully started on the call. """ - with state.lock: - call = state.channel.create_call( - parent, flags, state.completion_queue, method, host, deadline) - if state.managed_calls is None: - state.managed_calls = set((call,)) - _run_channel_spin_thread(state) - else: - state.managed_calls.add(call) - return call - return create_channel_managed_call + call = state.channel.create_call( + parent, flags, state.completion_queue, method, host, deadline) + + def drive(): + with state.lock: + if state.managed_calls is None: + state.managed_calls = set((call,)) + _run_channel_spin_thread(state) + else: + state.managed_calls.add(call) + + return call, drive + return create class _ChannelConnectivityState(object): @@ -847,6 +886,7 @@ def _options(options): class Channel(grpc.Channel): + """A cygrpc.Channel-backed implementation of grpc.Channel.""" def __init__(self, target, options, credentials): """Constructor. @@ -871,25 +911,25 @@ class Channel(grpc.Channel): def unary_unary( self, method, request_serializer=None, response_deserializer=None): return _UnaryUnaryMultiCallable( - self._channel, _create_channel_managed_call(self._call_state), + self._channel, _channel_managed_call_management(self._call_state), _common.encode(method), request_serializer, response_deserializer) def unary_stream( self, method, request_serializer=None, response_deserializer=None): return _UnaryStreamMultiCallable( - self._channel, _create_channel_managed_call(self._call_state), + self._channel, _channel_managed_call_management(self._call_state), _common.encode(method), request_serializer, response_deserializer) def stream_unary( self, method, request_serializer=None, response_deserializer=None): return _StreamUnaryMultiCallable( - self._channel, _create_channel_managed_call(self._call_state), + self._channel, _channel_managed_call_management(self._call_state), _common.encode(method), request_serializer, response_deserializer) def stream_stream( self, method, request_serializer=None, response_deserializer=None): return _StreamStreamMultiCallable( - self._channel, _create_channel_managed_call(self._call_state), + self._channel, _channel_managed_call_management(self._call_state), _common.encode(method), request_serializer, response_deserializer) def __del__(self): |