aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio/grpc/_channel.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio/grpc/_channel.py')
-rw-r--r--src/python/grpcio/grpc/_channel.py55
1 files changed, 25 insertions, 30 deletions
diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py
index 24be042f61..bfc7208310 100644
--- a/src/python/grpcio/grpc/_channel.py
+++ b/src/python/grpcio/grpc/_channel.py
@@ -27,7 +27,6 @@ from grpc.framework.foundation import callable_util
_USER_AGENT = 'grpc-python/{}'.format(_grpcio_metadata.__version__)
_EMPTY_FLAGS = 0
-_INFINITE_FUTURE = cygrpc.Timespec(float('+inf'))
_UNARY_UNARY_INITIAL_DUE = (
cygrpc.OperationType.send_initial_metadata,
@@ -61,11 +60,7 @@ _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
def _deadline(timeout):
- if timeout is None:
- return None, _INFINITE_FUTURE
- else:
- deadline = time.time() + timeout
- return deadline, cygrpc.Timespec(deadline)
+ return None if timeout is None else time.time() + timeout
def _unknown_code_details(unknown_cygrpc_code, details):
@@ -420,15 +415,15 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call):
def _start_unary_request(request, timeout, request_serializer):
- deadline, deadline_timespec = _deadline(timeout)
+ deadline = _deadline(timeout)
serialized_request = _common.serialize(request, request_serializer)
if serialized_request is None:
state = _RPCState((), (), (), grpc.StatusCode.INTERNAL,
'Exception serializing request!')
rendezvous = _Rendezvous(state, None, None, deadline)
- return deadline, deadline_timespec, None, rendezvous
+ return deadline, None, rendezvous
else:
- return deadline, deadline_timespec, serialized_request, None
+ return deadline, serialized_request, None
def _end_unary_response_blocking(state, call, with_call, deadline):
@@ -453,10 +448,10 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
self._response_deserializer = response_deserializer
def _prepare(self, request, timeout, metadata):
- deadline, deadline_timespec, serialized_request, rendezvous = (
- _start_unary_request(request, timeout, self._request_serializer))
+ deadline, serialized_request, rendezvous = (_start_unary_request(
+ request, timeout, self._request_serializer))
if serialized_request is None:
- return None, None, None, None, rendezvous
+ return None, None, None, rendezvous
else:
state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None)
operations = (
@@ -467,18 +462,17 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
)
- return state, operations, deadline, deadline_timespec, None
+ return state, operations, deadline, None
def _blocking(self, request, timeout, metadata, credentials):
- state, operations, deadline, deadline_timespec, rendezvous = self._prepare(
+ state, operations, deadline, rendezvous = self._prepare(
request, timeout, metadata)
if rendezvous:
raise rendezvous
else:
completion_queue = cygrpc.CompletionQueue()
call = self._channel.create_call(None, 0, completion_queue,
- self._method, None,
- deadline_timespec)
+ self._method, None, deadline)
if credentials is not None:
call.set_credentials(credentials._credentials)
call_error = call.start_client_batch(operations, None)
@@ -498,13 +492,13 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
return _end_unary_response_blocking(state, call, True, deadline)
def future(self, request, timeout=None, metadata=None, credentials=None):
- state, operations, deadline, deadline_timespec, rendezvous = self._prepare(
+ state, operations, deadline, rendezvous = self._prepare(
request, timeout, metadata)
if rendezvous:
return rendezvous
else:
call, drive_call = self._managed_call(None, 0, self._method, None,
- deadline_timespec)
+ deadline)
if credentials is not None:
call.set_credentials(credentials._credentials)
event_handler = _event_handler(state, call,
@@ -530,14 +524,14 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
self._response_deserializer = response_deserializer
def __call__(self, request, timeout=None, metadata=None, credentials=None):
- deadline, deadline_timespec, serialized_request, rendezvous = (
- _start_unary_request(request, timeout, self._request_serializer))
+ deadline, serialized_request, rendezvous = (_start_unary_request(
+ request, timeout, self._request_serializer))
if serialized_request is None:
raise rendezvous
else:
state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
call, drive_call = self._managed_call(None, 0, self._method, None,
- deadline_timespec)
+ deadline)
if credentials is not None:
call.set_credentials(credentials._credentials)
event_handler = _event_handler(state, call,
@@ -573,11 +567,11 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
self._response_deserializer = response_deserializer
def _blocking(self, request_iterator, timeout, metadata, credentials):
- deadline, deadline_timespec = _deadline(timeout)
+ deadline = _deadline(timeout)
state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
completion_queue = cygrpc.CompletionQueue()
call = self._channel.create_call(None, 0, completion_queue,
- self._method, None, deadline_timespec)
+ self._method, None, deadline)
if credentials is not None:
call.set_credentials(credentials._credentials)
with state.condition:
@@ -624,10 +618,10 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
timeout=None,
metadata=None,
credentials=None):
- deadline, deadline_timespec = _deadline(timeout)
+ deadline = _deadline(timeout)
state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
call, drive_call = self._managed_call(None, 0, self._method, None,
- deadline_timespec)
+ deadline)
if credentials is not None:
call.set_credentials(credentials._credentials)
event_handler = _event_handler(state, call, self._response_deserializer)
@@ -665,10 +659,10 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
timeout=None,
metadata=None,
credentials=None):
- deadline, deadline_timespec = _deadline(timeout)
+ deadline = _deadline(timeout)
state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None)
call, drive_call = self._managed_call(None, 0, self._method, None,
- deadline_timespec)
+ deadline)
if credentials is not None:
call.set_credentials(credentials._credentials)
event_handler = _event_handler(state, call, self._response_deserializer)
@@ -737,7 +731,8 @@ def _channel_managed_call_management(state):
flags: An integer bitfield of call flags.
method: The RPC method.
host: A host string for the created call.
- deadline: A cygrpc.Timespec to be the deadline of the created call.
+ deadline: A float to be the deadline of the created call or None if the
+ call is to have an infinite deadline.
Returns:
A cygrpc.Call with which to conduct an RPC and a function to call if
@@ -827,8 +822,8 @@ def _poll_connectivity(state, channel, initial_try_to_connect):
completion_queue = cygrpc.CompletionQueue()
while True:
channel.watch_connectivity_state(connectivity,
- cygrpc.Timespec(time.time() + 0.2),
- completion_queue, None)
+ time.time() + 0.2, completion_queue,
+ None)
event = completion_queue.poll()
with state.lock:
if not state.callbacks_and_connectivities and not state.try_to_connect: