diff options
Diffstat (limited to 'src/python/grpcio/grpc/_channel.py')
-rw-r--r-- | src/python/grpcio/grpc/_channel.py | 56 |
1 files changed, 33 insertions, 23 deletions
diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index 3ff7658748..8051fb306c 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -25,7 +25,6 @@ from grpc._cython import cygrpc from grpc.framework.foundation import callable_util _LOGGER = logging.getLogger(__name__) -_LOGGER.addHandler(logging.NullHandler()) _USER_AGENT = 'grpc-python/{}'.format(_grpcio_metadata.__version__) @@ -176,6 +175,7 @@ def _event_handler(state, response_deserializer): return handle_event +#pylint: disable=too-many-statements def _consume_request_iterator(request_iterator, state, call, request_serializer, event_handler): if cygrpc.is_fork_support_enabled(): @@ -499,6 +499,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): self._method = method self._request_serializer = request_serializer self._response_deserializer = response_deserializer + self._context = cygrpc.build_context() def _prepare(self, request, timeout, metadata, wait_for_ready): deadline, serialized_request, rendezvous = _start_unary_request( @@ -525,17 +526,18 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): state, operations, deadline, rendezvous = self._prepare( request, timeout, metadata, wait_for_ready) if state is None: - raise rendezvous + raise rendezvous # pylint: disable-msg=raising-bad-type else: call = self._channel.segregated_call( - 0, self._method, None, deadline, metadata, None + cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, + self._method, None, deadline, metadata, None if credentials is None else credentials._credentials, (( operations, None, - ),)) + ),), self._context) event = call.next_event() _handle_event(event, state, self._response_deserializer) - return state, call, + return state, call def __call__(self, request, @@ -566,13 +568,14 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): state, operations, deadline, rendezvous = self._prepare( request, timeout, metadata, wait_for_ready) if state is None: - raise rendezvous + raise rendezvous # pylint: disable-msg=raising-bad-type else: event_handler = _event_handler(state, self._response_deserializer) call = self._managed_call( - 0, self._method, None, deadline, metadata, None + cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, + self._method, None, deadline, metadata, None if credentials is None else credentials._credentials, - (operations,), event_handler) + (operations,), event_handler, self._context) return _Rendezvous(state, call, self._response_deserializer, deadline) @@ -587,6 +590,7 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): self._method = method self._request_serializer = request_serializer self._response_deserializer = response_deserializer + self._context = cygrpc.build_context() def __call__(self, request, @@ -599,7 +603,7 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( wait_for_ready) if serialized_request is None: - raise rendezvous + raise rendezvous # pylint: disable-msg=raising-bad-type else: state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) operationses = ( @@ -615,9 +619,10 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): ) event_handler = _event_handler(state, self._response_deserializer) call = self._managed_call( - 0, self._method, None, deadline, metadata, None + cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, + self._method, None, deadline, metadata, None if credentials is None else credentials._credentials, - operationses, event_handler) + operationses, event_handler, self._context) return _Rendezvous(state, call, self._response_deserializer, deadline) @@ -632,6 +637,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): self._method = method self._request_serializer = request_serializer self._response_deserializer = response_deserializer + self._context = cygrpc.build_context() def _blocking(self, request_iterator, timeout, metadata, credentials, wait_for_ready): @@ -640,10 +646,11 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( wait_for_ready) call = self._channel.segregated_call( - 0, self._method, None, deadline, metadata, None + cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method, + None, deadline, metadata, None if credentials is None else credentials._credentials, _stream_unary_invocation_operationses_and_tags( - metadata, initial_metadata_flags)) + metadata, initial_metadata_flags), self._context) _consume_request_iterator(request_iterator, state, call, self._request_serializer, None) while True: @@ -653,7 +660,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): state.condition.notify_all() if not state.due: break - return state, call, + return state, call def __call__(self, request_iterator, @@ -687,10 +694,11 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( wait_for_ready) call = self._managed_call( - 0, self._method, None, deadline, metadata, None + cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method, + None, deadline, metadata, None if credentials is None else credentials._credentials, _stream_unary_invocation_operationses( - metadata, initial_metadata_flags), event_handler) + metadata, initial_metadata_flags), event_handler, self._context) _consume_request_iterator(request_iterator, state, call, self._request_serializer, event_handler) return _Rendezvous(state, call, self._response_deserializer, deadline) @@ -706,6 +714,7 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): self._method = method self._request_serializer = request_serializer self._response_deserializer = response_deserializer + self._context = cygrpc.build_context() def __call__(self, request_iterator, @@ -727,9 +736,10 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): ) event_handler = _event_handler(state, self._response_deserializer) call = self._managed_call( - 0, self._method, None, deadline, metadata, None + cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method, + None, deadline, metadata, None if credentials is None else credentials._credentials, operationses, - event_handler) + event_handler, self._context) _consume_request_iterator(request_iterator, state, call, self._request_serializer, event_handler) return _Rendezvous(state, call, self._response_deserializer, deadline) @@ -745,10 +755,10 @@ class _InitialMetadataFlags(int): def with_wait_for_ready(self, wait_for_ready): if wait_for_ready is not None: if wait_for_ready: - self = self.__class__(self | cygrpc.InitialMetadataFlags.wait_for_ready | \ + return self.__class__(self | cygrpc.InitialMetadataFlags.wait_for_ready | \ cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set) elif not wait_for_ready: - self = self.__class__(self & ~cygrpc.InitialMetadataFlags.wait_for_ready | \ + return self.__class__(self & ~cygrpc.InitialMetadataFlags.wait_for_ready | \ cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set) return self @@ -789,7 +799,7 @@ def _channel_managed_call_management(state): # pylint: disable=too-many-arguments def create(flags, method, host, deadline, metadata, credentials, - operationses, event_handler): + operationses, event_handler, context): """Creates a cygrpc.IntegratedCall. Args: @@ -804,7 +814,7 @@ def _channel_managed_call_management(state): started on the call. event_handler: A behavior to call to handle the events resultant from the operations on the call. - + context: Context object for distributed tracing. Returns: A cygrpc.IntegratedCall with which to conduct an RPC. """ @@ -815,7 +825,7 @@ def _channel_managed_call_management(state): with state.lock: call = state.channel.integrated_call(flags, method, host, deadline, metadata, credentials, - operationses_and_tags) + operationses_and_tags, context) if state.managed_calls == 0: state.managed_calls = 1 _run_channel_spin_thread(state) |