aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio/grpc/_channel.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio/grpc/_channel.py')
-rw-r--r--src/python/grpcio/grpc/_channel.py445
1 files changed, 220 insertions, 225 deletions
diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py
index 2eff08aa57..2017d47130 100644
--- a/src/python/grpcio/grpc/_channel.py
+++ b/src/python/grpcio/grpc/_channel.py
@@ -58,6 +58,17 @@ _STREAM_STREAM_INITIAL_DUE = (
_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
'Exception calling channel subscription callback!')
+_OK_RENDEZVOUS_REPR_FORMAT = ('<_Rendezvous of RPC that terminated with:\n'
+ '\tstatus = {}\n'
+ '\tdetails = "{}"\n'
+ '>')
+
+_NON_OK_RENDEZVOUS_REPR_FORMAT = ('<_Rendezvous of RPC that terminated with:\n'
+ '\tstatus = {}\n'
+ '\tdetails = "{}"\n'
+ '\tdebug_error_string = "{}"\n'
+ '>')
+
def _deadline(timeout):
return None if timeout is None else time.time() + timeout
@@ -79,27 +90,6 @@ def _wait_once_until(condition, until):
condition.wait(timeout=remaining)
-_INTERNAL_CALL_ERROR_MESSAGE_FORMAT = (
- 'Internal gRPC call error %d. ' +
- 'Please report to https://github.com/grpc/grpc/issues')
-
-
-def _check_call_error(call_error, metadata):
- if call_error == cygrpc.CallError.invalid_metadata:
- raise ValueError('metadata was invalid: %s' % metadata)
- elif call_error != cygrpc.CallError.ok:
- raise ValueError(_INTERNAL_CALL_ERROR_MESSAGE_FORMAT % call_error)
-
-
-def _call_error_set_RPCstate(state, call_error, metadata):
- if call_error == cygrpc.CallError.invalid_metadata:
- _abort(state, grpc.StatusCode.INTERNAL,
- 'metadata was invalid: %s' % metadata)
- else:
- _abort(state, grpc.StatusCode.INTERNAL,
- _INTERNAL_CALL_ERROR_MESSAGE_FORMAT % call_error)
-
-
class _RPCState(object):
def __init__(self, due, initial_metadata, trailing_metadata, code, details):
@@ -112,6 +102,7 @@ class _RPCState(object):
self.trailing_metadata = trailing_metadata
self.code = code
self.details = details
+ self.debug_error_string = None
# The semantics of grpc.Future.cancel and grpc.Future.cancelled are
# slightly wonky, so they have to be tracked separately from the rest of the
# result of the RPC. This field tracks whether cancellation was requested
@@ -158,12 +149,13 @@ def _handle_event(event, state, response_deserializer):
else:
state.code = code
state.details = batch_operation.details()
+ state.debug_error_string = batch_operation.error_string()
callbacks.extend(state.callbacks)
state.callbacks = None
return callbacks
-def _event_handler(state, call, response_deserializer):
+def _event_handler(state, response_deserializer):
def handle_event(event):
with state.condition:
@@ -172,40 +164,47 @@ def _event_handler(state, call, response_deserializer):
done = not state.due
for callback in callbacks:
callback()
- return call if done else None
+ return done
return handle_event
-def _consume_request_iterator(request_iterator, state, call,
- request_serializer):
- event_handler = _event_handler(state, call, None)
+def _consume_request_iterator(request_iterator, state, call, request_serializer,
+ event_handler):
- def consume_request_iterator():
+ def consume_request_iterator(): # pylint: disable=too-many-branches
while True:
try:
request = next(request_iterator)
except StopIteration:
break
except Exception: # pylint: disable=broad-except
- logging.exception("Exception iterating requests!")
- call.cancel()
- _abort(state, grpc.StatusCode.UNKNOWN,
- "Exception iterating requests!")
+ code = grpc.StatusCode.UNKNOWN
+ details = 'Exception iterating requests!'
+ logging.exception(details)
+ call.cancel(_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
+ details)
+ _abort(state, code, details)
return
serialized_request = _common.serialize(request, request_serializer)
with state.condition:
if state.code is None and not state.cancelled:
if serialized_request is None:
- call.cancel()
+ code = grpc.StatusCode.INTERNAL # pylint: disable=redefined-variable-type
details = 'Exception serializing request!'
- _abort(state, grpc.StatusCode.INTERNAL, details)
+ call.cancel(
+ _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
+ details)
+ _abort(state, code, details)
return
else:
operations = (cygrpc.SendMessageOperation(
serialized_request, _EMPTY_FLAGS),)
- call.start_client_batch(operations, event_handler)
- state.due.add(cygrpc.OperationType.send_message)
+ operating = call.operate(operations, event_handler)
+ if operating:
+ state.due.add(cygrpc.OperationType.send_message)
+ else:
+ return
while True:
state.condition.wait()
if state.code is None:
@@ -219,19 +218,12 @@ def _consume_request_iterator(request_iterator, state, call,
if state.code is None:
operations = (
cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),)
- call.start_client_batch(operations, event_handler)
- state.due.add(cygrpc.OperationType.send_close_from_client)
-
- def stop_consumption_thread(timeout): # pylint: disable=unused-argument
- with state.condition:
- if state.code is None:
- call.cancel()
- state.cancelled = True
- _abort(state, grpc.StatusCode.CANCELLED, 'Cancelled!')
- state.condition.notify_all()
+ operating = call.operate(operations, event_handler)
+ if operating:
+ state.due.add(cygrpc.OperationType.send_close_from_client)
- consumption_thread = _common.CleanupThread(
- stop_consumption_thread, target=consume_request_iterator)
+ consumption_thread = threading.Thread(target=consume_request_iterator)
+ consumption_thread.daemon = True
consumption_thread.start()
@@ -247,9 +239,12 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call):
def cancel(self):
with self._state.condition:
if self._state.code is None:
- self._call.cancel()
+ code = grpc.StatusCode.CANCELLED
+ details = 'Locally cancelled by application!'
+ self._call.cancel(
+ _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details)
self._state.cancelled = True
- _abort(self._state, grpc.StatusCode.CANCELLED, 'Cancelled!')
+ _abort(self._state, code, details)
self._state.condition.notify_all()
return False
@@ -318,12 +313,13 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call):
def _next(self):
with self._state.condition:
if self._state.code is None:
- event_handler = _event_handler(self._state, self._call,
+ event_handler = _event_handler(self._state,
self._response_deserializer)
- self._call.start_client_batch(
+ operating = self._call.operate(
(cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
event_handler)
- self._state.due.add(cygrpc.OperationType.receive_message)
+ if operating:
+ self._state.due.add(cygrpc.OperationType.receive_message)
elif self._state.code is grpc.StatusCode.OK:
raise StopIteration()
else:
@@ -391,13 +387,23 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call):
self._state.condition.wait()
return _common.decode(self._state.details)
+ def debug_error_string(self):
+ with self._state.condition:
+ while self._state.debug_error_string is None:
+ self._state.condition.wait()
+ return _common.decode(self._state.debug_error_string)
+
def _repr(self):
with self._state.condition:
if self._state.code is None:
return '<_Rendezvous object of in-flight RPC>'
+ elif self._state.code is grpc.StatusCode.OK:
+ return _OK_RENDEZVOUS_REPR_FORMAT.format(
+ self._state.code, self._state.details)
else:
- return '<_Rendezvous of RPC that terminated with ({}, {})>'.format(
- self._state.code, _common.decode(self._state.details))
+ return _NON_OK_RENDEZVOUS_REPR_FORMAT.format(
+ self._state.code, self._state.details,
+ self._state.debug_error_string)
def __repr__(self):
return self._repr()
@@ -408,9 +414,12 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call):
def __del__(self):
with self._state.condition:
if self._state.code is None:
- self._call.cancel()
- self._state.cancelled = True
self._state.code = grpc.StatusCode.CANCELLED
+ self._state.details = 'Cancelled upon garbage collection!'
+ self._state.cancelled = True
+ self._call.cancel(
+ _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code],
+ self._state.details)
self._state.condition.notify_all()
@@ -437,6 +446,24 @@ def _end_unary_response_blocking(state, call, with_call, deadline):
raise _Rendezvous(state, None, None, deadline)
+def _stream_unary_invocation_operationses(metadata):
+ return (
+ (
+ cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
+ cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
+ ),
+ (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
+ )
+
+
+def _stream_unary_invocation_operationses_and_tags(metadata):
+ return tuple((
+ operations,
+ None,
+ ) for operations in _stream_unary_invocation_operationses(metadata))
+
+
class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
def __init__(self, channel, managed_call, method, request_serializer,
@@ -448,8 +475,8 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
self._response_deserializer = response_deserializer
def _prepare(self, request, timeout, metadata):
- deadline, serialized_request, rendezvous = (_start_unary_request(
- request, timeout, self._request_serializer))
+ deadline, serialized_request, rendezvous = _start_unary_request(
+ request, timeout, self._request_serializer)
if serialized_request is None:
return None, None, None, rendezvous
else:
@@ -467,48 +494,38 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
def _blocking(self, request, timeout, metadata, credentials):
state, operations, deadline, rendezvous = self._prepare(
request, timeout, metadata)
- if rendezvous:
+ if state is None:
raise rendezvous
else:
- completion_queue = cygrpc.CompletionQueue()
- call = self._channel.create_call(None, 0, completion_queue,
- self._method, None, deadline)
- if credentials is not None:
- call.set_credentials(credentials._credentials)
- call_error = call.start_client_batch(operations, None)
- _check_call_error(call_error, metadata)
- _handle_event(completion_queue.poll(), state,
- self._response_deserializer)
- return state, call, deadline
+ call = self._channel.segregated_call(
+ 0, self._method, None, deadline, metadata, None
+ if credentials is None else credentials._credentials, ((
+ operations,
+ None,
+ ),))
+ event = call.next_event()
+ _handle_event(event, state, self._response_deserializer)
+ return state, call,
def __call__(self, request, timeout=None, metadata=None, credentials=None):
- state, call, deadline = self._blocking(request, timeout, metadata,
- credentials)
- return _end_unary_response_blocking(state, call, False, deadline)
+ state, call, = self._blocking(request, timeout, metadata, credentials)
+ return _end_unary_response_blocking(state, call, False, None)
def with_call(self, request, timeout=None, metadata=None, credentials=None):
- state, call, deadline = self._blocking(request, timeout, metadata,
- credentials)
- return _end_unary_response_blocking(state, call, True, deadline)
+ state, call, = self._blocking(request, timeout, metadata, credentials)
+ return _end_unary_response_blocking(state, call, True, None)
def future(self, request, timeout=None, metadata=None, credentials=None):
state, operations, deadline, rendezvous = self._prepare(
request, timeout, metadata)
- if rendezvous:
- return rendezvous
+ if state is None:
+ raise rendezvous
else:
- call, drive_call = self._managed_call(None, 0, self._method, None,
- deadline)
- if credentials is not None:
- call.set_credentials(credentials._credentials)
- event_handler = _event_handler(state, call,
- self._response_deserializer)
- with state.condition:
- call_error = call.start_client_batch(operations, event_handler)
- if call_error != cygrpc.CallError.ok:
- _call_error_set_RPCstate(state, call_error, metadata)
- return _Rendezvous(state, None, None, deadline)
- drive_call()
+ event_handler = _event_handler(state, self._response_deserializer)
+ call = self._managed_call(
+ 0, self._method, None, deadline, metadata, None
+ if credentials is None else credentials._credentials,
+ (operations,), event_handler)
return _Rendezvous(state, call, self._response_deserializer,
deadline)
@@ -524,34 +541,27 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
self._response_deserializer = response_deserializer
def __call__(self, request, timeout=None, metadata=None, credentials=None):
- deadline, serialized_request, rendezvous = (_start_unary_request(
- request, timeout, self._request_serializer))
+ deadline, serialized_request, rendezvous = _start_unary_request(
+ request, timeout, self._request_serializer)
if serialized_request is None:
raise rendezvous
else:
state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
- call, drive_call = self._managed_call(None, 0, self._method, None,
- deadline)
- if credentials is not None:
- call.set_credentials(credentials._credentials)
- event_handler = _event_handler(state, call,
- self._response_deserializer)
- with state.condition:
- call.start_client_batch(
- (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
- event_handler)
- operations = (
+ operationses = (
+ (
cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
cygrpc.SendMessageOperation(serialized_request,
_EMPTY_FLAGS),
cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
- )
- call_error = call.start_client_batch(operations, event_handler)
- if call_error != cygrpc.CallError.ok:
- _call_error_set_RPCstate(state, call_error, metadata)
- return _Rendezvous(state, None, None, deadline)
- drive_call()
+ ),
+ (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
+ )
+ event_handler = _event_handler(state, self._response_deserializer)
+ call = self._managed_call(
+ 0, self._method, None, deadline, metadata, None
+ if credentials is None else credentials._credentials,
+ operationses, event_handler)
return _Rendezvous(state, call, self._response_deserializer,
deadline)
@@ -569,49 +579,38 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
def _blocking(self, request_iterator, timeout, metadata, credentials):
deadline = _deadline(timeout)
state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
- completion_queue = cygrpc.CompletionQueue()
- call = self._channel.create_call(None, 0, completion_queue,
- self._method, None, deadline)
- if credentials is not None:
- call.set_credentials(credentials._credentials)
- with state.condition:
- call.start_client_batch(
- (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), None)
- operations = (
- cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
- cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
- cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
- )
- call_error = call.start_client_batch(operations, None)
- _check_call_error(call_error, metadata)
- _consume_request_iterator(request_iterator, state, call,
- self._request_serializer)
+ call = self._channel.segregated_call(
+ 0, self._method, None, deadline, metadata, None
+ if credentials is None else credentials._credentials,
+ _stream_unary_invocation_operationses_and_tags(metadata))
+ _consume_request_iterator(request_iterator, state, call,
+ self._request_serializer, None)
while True:
- event = completion_queue.poll()
+ event = call.next_event()
with state.condition:
_handle_event(event, state, self._response_deserializer)
state.condition.notify_all()
if not state.due:
break
- return state, call, deadline
+ return state, call,
def __call__(self,
request_iterator,
timeout=None,
metadata=None,
credentials=None):
- state, call, deadline = self._blocking(request_iterator, timeout,
- metadata, credentials)
- return _end_unary_response_blocking(state, call, False, deadline)
+ state, call, = self._blocking(request_iterator, timeout, metadata,
+ credentials)
+ return _end_unary_response_blocking(state, call, False, None)
def with_call(self,
request_iterator,
timeout=None,
metadata=None,
credentials=None):
- state, call, deadline = self._blocking(request_iterator, timeout,
- metadata, credentials)
- return _end_unary_response_blocking(state, call, True, deadline)
+ state, call, = self._blocking(request_iterator, timeout, metadata,
+ credentials)
+ return _end_unary_response_blocking(state, call, True, None)
def future(self,
request_iterator,
@@ -620,27 +619,13 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
credentials=None):
deadline = _deadline(timeout)
state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
- call, drive_call = self._managed_call(None, 0, self._method, None,
- deadline)
- if credentials is not None:
- call.set_credentials(credentials._credentials)
- event_handler = _event_handler(state, call, self._response_deserializer)
- with state.condition:
- call.start_client_batch(
- (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
- event_handler)
- operations = (
- cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
- cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
- cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
- )
- call_error = call.start_client_batch(operations, event_handler)
- if call_error != cygrpc.CallError.ok:
- _call_error_set_RPCstate(state, call_error, metadata)
- return _Rendezvous(state, None, None, deadline)
- drive_call()
- _consume_request_iterator(request_iterator, state, call,
- self._request_serializer)
+ event_handler = _event_handler(state, self._response_deserializer)
+ call = self._managed_call(
+ 0, self._method, None, deadline, metadata, None
+ if credentials is None else credentials._credentials,
+ _stream_unary_invocation_operationses(metadata), event_handler)
+ _consume_request_iterator(request_iterator, state, call,
+ self._request_serializer, event_handler)
return _Rendezvous(state, call, self._response_deserializer, deadline)
@@ -661,26 +646,20 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
credentials=None):
deadline = _deadline(timeout)
state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None)
- call, drive_call = self._managed_call(None, 0, self._method, None,
- deadline)
- if credentials is not None:
- call.set_credentials(credentials._credentials)
- event_handler = _event_handler(state, call, self._response_deserializer)
- with state.condition:
- call.start_client_batch(
- (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
- event_handler)
- operations = (
+ operationses = (
+ (
cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
- )
- call_error = call.start_client_batch(operations, event_handler)
- if call_error != cygrpc.CallError.ok:
- _call_error_set_RPCstate(state, call_error, metadata)
- return _Rendezvous(state, None, None, deadline)
- drive_call()
- _consume_request_iterator(request_iterator, state, call,
- self._request_serializer)
+ ),
+ (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
+ )
+ event_handler = _event_handler(state, self._response_deserializer)
+ call = self._managed_call(
+ 0, self._method, None, deadline, metadata, None
+ if credentials is None else credentials._credentials, operationses,
+ event_handler)
+ _consume_request_iterator(request_iterator, state, call,
+ self._request_serializer, event_handler)
return _Rendezvous(state, call, self._response_deserializer, deadline)
@@ -689,67 +668,63 @@ class _ChannelCallState(object):
def __init__(self, channel):
self.lock = threading.Lock()
self.channel = channel
- self.completion_queue = cygrpc.CompletionQueue()
- self.managed_calls = None
+ self.managed_calls = 0
def _run_channel_spin_thread(state):
def channel_spin():
while True:
- event = state.completion_queue.poll()
- completed_call = event.tag(event)
- if completed_call is not None:
+ event = state.channel.next_call_event()
+ call_completed = event.tag(event)
+ if call_completed:
with state.lock:
- state.managed_calls.remove(completed_call)
- if not state.managed_calls:
- state.managed_calls = None
+ state.managed_calls -= 1
+ if state.managed_calls == 0:
return
- def stop_channel_spin(timeout): # pylint: disable=unused-argument
- with state.lock:
- if state.managed_calls is not None:
- for call in state.managed_calls:
- call.cancel()
-
- channel_spin_thread = _common.CleanupThread(
- stop_channel_spin, target=channel_spin)
+ channel_spin_thread = threading.Thread(target=channel_spin)
+ channel_spin_thread.daemon = True
channel_spin_thread.start()
def _channel_managed_call_management(state):
- def create(parent, flags, method, host, deadline):
- """Creates a managed cygrpc.Call and a function to call to drive it.
-
- If operations are successfully added to the returned cygrpc.Call, the
- returned function must be called. If operations are not successfully added
- to the returned cygrpc.Call, the returned function must not be called.
-
- Args:
- parent: A cygrpc.Call to be used as the parent of the created call.
- flags: An integer bitfield of call flags.
- method: The RPC method.
- host: A host string for the created call.
- deadline: A float to be the deadline of the created call or None if the
- call is to have an infinite deadline.
-
- Returns:
- A cygrpc.Call with which to conduct an RPC and a function to call if
- operations are successfully started on the call.
- """
- call = state.channel.create_call(parent, flags, state.completion_queue,
- method, host, deadline)
-
- def drive():
- with state.lock:
- if state.managed_calls is None:
- state.managed_calls = set((call,))
- _run_channel_spin_thread(state)
- else:
- state.managed_calls.add(call)
+ # pylint: disable=too-many-arguments
+ def create(flags, method, host, deadline, metadata, credentials,
+ operationses, event_handler):
+ """Creates a cygrpc.IntegratedCall.
- return call, drive
+ Args:
+ flags: An integer bitfield of call flags.
+ method: The RPC method.
+ host: A host string for the created call.
+ deadline: A float to be the deadline of the created call or None if
+ the call is to have an infinite deadline.
+ metadata: The metadata for the call or None.
+ credentials: A cygrpc.CallCredentials or None.
+ operationses: An iterable of iterables of cygrpc.Operations to be
+ started on the call.
+ event_handler: A behavior to call to handle the events resultant from
+ the operations on the call.
+
+ Returns:
+ A cygrpc.IntegratedCall with which to conduct an RPC.
+ """
+ operationses_and_tags = tuple((
+ operations,
+ event_handler,
+ ) for operations in operationses)
+ with state.lock:
+ call = state.channel.integrated_call(flags, method, host, deadline,
+ metadata, credentials,
+ operationses_and_tags)
+ if state.managed_calls == 0:
+ state.managed_calls = 1
+ _run_channel_spin_thread(state)
+ else:
+ state.managed_calls += 1
+ return call
return create
@@ -819,12 +794,9 @@ def _poll_connectivity(state, channel, initial_try_to_connect):
callback_and_connectivity[1] = state.connectivity
if callbacks:
_spawn_delivery(state, callbacks)
- completion_queue = cygrpc.CompletionQueue()
while True:
- channel.watch_connectivity_state(connectivity,
- time.time() + 0.2, completion_queue,
- None)
- event = completion_queue.poll()
+ event = channel.watch_connectivity_state(connectivity,
+ time.time() + 0.2)
with state.lock:
if not state.callbacks_and_connectivities and not state.try_to_connect:
state.polling = False
@@ -855,10 +827,10 @@ def _moot(state):
def _subscribe(state, callback, try_to_connect):
with state.lock:
if not state.callbacks_and_connectivities and not state.polling:
- polling_thread = _common.CleanupThread(
- lambda timeout: _moot(state),
+ polling_thread = threading.Thread(
target=_poll_connectivity,
args=(state, state.channel, bool(try_to_connect)))
+ polling_thread.daemon = True
polling_thread.start()
state.polling = True
state.callbacks_and_connectivities.append([callback, None])
@@ -944,5 +916,28 @@ class Channel(grpc.Channel):
self._channel, _channel_managed_call_management(self._call_state),
_common.encode(method), request_serializer, response_deserializer)
+ def _close(self):
+ self._channel.close(cygrpc.StatusCode.cancelled, 'Channel closed!')
+ _moot(self._connectivity_state)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self._close()
+ return False
+
+ def close(self):
+ self._close()
+
def __del__(self):
+ # TODO(https://github.com/grpc/grpc/issues/12531): Several releases
+ # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call
+ # here (or more likely, call self._close() here). We don't do this today
+ # because many valid use cases today allow the channel to be deleted
+ # immediately after stubs are created. After a sufficient period of time
+ # has passed for all users to be trusted to hang out to their channels
+ # for as long as they are in use and to close them after using them,
+ # then deletion of this grpc._channel.Channel instance can be made to
+ # effect closure of the underlying cygrpc.Channel instance.
_moot(self._connectivity_state)