diff options
Diffstat (limited to 'src/python/grpcio/grpc/_channel.py')
-rw-r--r-- | src/python/grpcio/grpc/_channel.py | 55 |
1 files changed, 25 insertions, 30 deletions
diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index 24be042f61..bfc7208310 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -27,7 +27,6 @@ from grpc.framework.foundation import callable_util _USER_AGENT = 'grpc-python/{}'.format(_grpcio_metadata.__version__) _EMPTY_FLAGS = 0 -_INFINITE_FUTURE = cygrpc.Timespec(float('+inf')) _UNARY_UNARY_INITIAL_DUE = ( cygrpc.OperationType.send_initial_metadata, @@ -61,11 +60,7 @@ _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = ( def _deadline(timeout): - if timeout is None: - return None, _INFINITE_FUTURE - else: - deadline = time.time() + timeout - return deadline, cygrpc.Timespec(deadline) + return None if timeout is None else time.time() + timeout def _unknown_code_details(unknown_cygrpc_code, details): @@ -420,15 +415,15 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call): def _start_unary_request(request, timeout, request_serializer): - deadline, deadline_timespec = _deadline(timeout) + deadline = _deadline(timeout) serialized_request = _common.serialize(request, request_serializer) if serialized_request is None: state = _RPCState((), (), (), grpc.StatusCode.INTERNAL, 'Exception serializing request!') rendezvous = _Rendezvous(state, None, None, deadline) - return deadline, deadline_timespec, None, rendezvous + return deadline, None, rendezvous else: - return deadline, deadline_timespec, serialized_request, None + return deadline, serialized_request, None def _end_unary_response_blocking(state, call, with_call, deadline): @@ -453,10 +448,10 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): self._response_deserializer = response_deserializer def _prepare(self, request, timeout, metadata): - deadline, deadline_timespec, serialized_request, rendezvous = ( - _start_unary_request(request, timeout, self._request_serializer)) + deadline, serialized_request, rendezvous = (_start_unary_request( + request, timeout, self._request_serializer)) if serialized_request is None: - return None, None, None, None, rendezvous + return None, None, None, rendezvous else: state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None) operations = ( @@ -467,18 +462,17 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), ) - return state, operations, deadline, deadline_timespec, None + return state, operations, deadline, None def _blocking(self, request, timeout, metadata, credentials): - state, operations, deadline, deadline_timespec, rendezvous = self._prepare( + state, operations, deadline, rendezvous = self._prepare( request, timeout, metadata) if rendezvous: raise rendezvous else: completion_queue = cygrpc.CompletionQueue() call = self._channel.create_call(None, 0, completion_queue, - self._method, None, - deadline_timespec) + self._method, None, deadline) if credentials is not None: call.set_credentials(credentials._credentials) call_error = call.start_client_batch(operations, None) @@ -498,13 +492,13 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): return _end_unary_response_blocking(state, call, True, deadline) def future(self, request, timeout=None, metadata=None, credentials=None): - state, operations, deadline, deadline_timespec, rendezvous = self._prepare( + state, operations, deadline, rendezvous = self._prepare( request, timeout, metadata) if rendezvous: return rendezvous else: call, drive_call = self._managed_call(None, 0, self._method, None, - deadline_timespec) + deadline) if credentials is not None: call.set_credentials(credentials._credentials) event_handler = _event_handler(state, call, @@ -530,14 +524,14 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): self._response_deserializer = response_deserializer def __call__(self, request, timeout=None, metadata=None, credentials=None): - deadline, deadline_timespec, serialized_request, rendezvous = ( - _start_unary_request(request, timeout, self._request_serializer)) + deadline, serialized_request, rendezvous = (_start_unary_request( + request, timeout, self._request_serializer)) if serialized_request is None: raise rendezvous else: state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) call, drive_call = self._managed_call(None, 0, self._method, None, - deadline_timespec) + deadline) if credentials is not None: call.set_credentials(credentials._credentials) event_handler = _event_handler(state, call, @@ -573,11 +567,11 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): self._response_deserializer = response_deserializer def _blocking(self, request_iterator, timeout, metadata, credentials): - deadline, deadline_timespec = _deadline(timeout) + deadline = _deadline(timeout) state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) completion_queue = cygrpc.CompletionQueue() call = self._channel.create_call(None, 0, completion_queue, - self._method, None, deadline_timespec) + self._method, None, deadline) if credentials is not None: call.set_credentials(credentials._credentials) with state.condition: @@ -624,10 +618,10 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): timeout=None, metadata=None, credentials=None): - deadline, deadline_timespec = _deadline(timeout) + deadline = _deadline(timeout) state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) call, drive_call = self._managed_call(None, 0, self._method, None, - deadline_timespec) + deadline) if credentials is not None: call.set_credentials(credentials._credentials) event_handler = _event_handler(state, call, self._response_deserializer) @@ -665,10 +659,10 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): timeout=None, metadata=None, credentials=None): - deadline, deadline_timespec = _deadline(timeout) + deadline = _deadline(timeout) state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None) call, drive_call = self._managed_call(None, 0, self._method, None, - deadline_timespec) + deadline) if credentials is not None: call.set_credentials(credentials._credentials) event_handler = _event_handler(state, call, self._response_deserializer) @@ -737,7 +731,8 @@ def _channel_managed_call_management(state): flags: An integer bitfield of call flags. method: The RPC method. host: A host string for the created call. - deadline: A cygrpc.Timespec to be the deadline of the created call. + deadline: A float to be the deadline of the created call or None if the + call is to have an infinite deadline. Returns: A cygrpc.Call with which to conduct an RPC and a function to call if @@ -827,8 +822,8 @@ def _poll_connectivity(state, channel, initial_try_to_connect): completion_queue = cygrpc.CompletionQueue() while True: channel.watch_connectivity_state(connectivity, - cygrpc.Timespec(time.time() + 0.2), - completion_queue, None) + time.time() + 0.2, completion_queue, + None) event = completion_queue.poll() with state.lock: if not state.callbacks_and_connectivities and not state.try_to_connect: |