aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python
diff options
context:
space:
mode:
Diffstat (limited to 'src/python')
-rw-r--r--src/python/grpcio/grpc/__init__.py26
-rw-r--r--src/python/grpcio/grpc/_auth.py24
-rw-r--r--src/python/grpcio/grpc/_channel.py27
-rw-r--r--src/python/grpcio/grpc/_common.py12
-rw-r--r--src/python/grpcio/grpc/_plugin_wrapping.py6
-rw-r--r--src/python/grpcio/grpc/_server.py22
-rw-r--r--src/python/grpcio/grpc/beta/_client_adaptations.py7
-rw-r--r--src/python/grpcio/grpc/beta/_connectivity_channel.py159
-rw-r--r--src/python/grpcio/grpc/beta/_server_adaptations.py22
-rw-r--r--src/python/grpcio/grpc/beta/implementations.py11
-rw-r--r--src/python/grpcio/grpc/framework/foundation/logging_pool.py2
-rw-r--r--src/python/grpcio/grpc/framework/interfaces/base/base.py25
-rw-r--r--src/python/grpcio/grpc/framework/interfaces/face/face.py2
-rw-r--r--src/python/grpcio_health_checking/setup.py2
-rw-r--r--src/python/grpcio_reflection/setup.py2
-rw-r--r--src/python/grpcio_tests/setup.py2
-rw-r--r--src/python/grpcio_tests/tests/http2/negative_http2_client.py2
-rw-r--r--src/python/grpcio_tests/tests/unit/_invocation_defects_test.py4
-rw-r--r--src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py4
19 files changed, 110 insertions, 251 deletions
diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py
index b64a708cc7..a4481b2ac3 100644
--- a/src/python/grpcio/grpc/__init__.py
+++ b/src/python/grpcio/grpc/__init__.py
@@ -1004,7 +1004,7 @@ def unary_unary_rpc_method_handler(behavior,
An RpcMethodHandler for a unary-unary RPC method constructed from the given
parameters.
"""
- from grpc import _utilities
+ from grpc import _utilities # pylint: disable=cyclic-import
return _utilities.RpcMethodHandler(False, False, request_deserializer,
response_serializer, behavior, None,
None, None)
@@ -1025,7 +1025,7 @@ def unary_stream_rpc_method_handler(behavior,
An RpcMethodHandler for a unary-stream RPC method constructed from the
given parameters.
"""
- from grpc import _utilities
+ from grpc import _utilities # pylint: disable=cyclic-import
return _utilities.RpcMethodHandler(False, True, request_deserializer,
response_serializer, None, behavior,
None, None)
@@ -1046,7 +1046,7 @@ def stream_unary_rpc_method_handler(behavior,
An RpcMethodHandler for a stream-unary RPC method constructed from the
given parameters.
"""
- from grpc import _utilities
+ from grpc import _utilities # pylint: disable=cyclic-import
return _utilities.RpcMethodHandler(True, False, request_deserializer,
response_serializer, None, None,
behavior, None)
@@ -1068,7 +1068,7 @@ def stream_stream_rpc_method_handler(behavior,
An RpcMethodHandler for a stream-stream RPC method constructed from the
given parameters.
"""
- from grpc import _utilities
+ from grpc import _utilities # pylint: disable=cyclic-import
return _utilities.RpcMethodHandler(True, True, request_deserializer,
response_serializer, None, None, None,
behavior)
@@ -1085,7 +1085,7 @@ def method_handlers_generic_handler(service, method_handlers):
Returns:
A GenericRpcHandler constructed from the given parameters.
"""
- from grpc import _utilities
+ from grpc import _utilities # pylint: disable=cyclic-import
return _utilities.DictionaryGenericHandler(service, method_handlers)
@@ -1124,7 +1124,7 @@ def metadata_call_credentials(metadata_plugin, name=None):
Returns:
A CallCredentials.
"""
- from grpc import _plugin_wrapping
+ from grpc import _plugin_wrapping # pylint: disable=cyclic-import
if name is None:
try:
effective_name = metadata_plugin.__name__
@@ -1147,7 +1147,7 @@ def access_token_call_credentials(access_token):
Returns:
A CallCredentials.
"""
- from grpc import _auth
+ from grpc import _auth # pylint: disable=cyclic-import
return metadata_call_credentials(
_auth.AccessTokenCallCredentials(access_token))
@@ -1161,7 +1161,7 @@ def composite_call_credentials(*call_credentials):
Returns:
A CallCredentials object composed of the given CallCredentials objects.
"""
- from grpc import _credential_composition
+ from grpc import _credential_composition # pylint: disable=cyclic-import
cygrpc_call_credentials = tuple(
single_call_credentials._credentials
for single_call_credentials in call_credentials)
@@ -1180,7 +1180,7 @@ def composite_channel_credentials(channel_credentials, *call_credentials):
A ChannelCredentials composed of the given ChannelCredentials and
CallCredentials objects.
"""
- from grpc import _credential_composition
+ from grpc import _credential_composition # pylint: disable=cyclic-import
cygrpc_call_credentials = tuple(
single_call_credentials._credentials
for single_call_credentials in call_credentials)
@@ -1237,7 +1237,7 @@ def channel_ready_future(channel):
A Future that matures when the given Channel has connectivity
ChannelConnectivity.READY.
"""
- from grpc import _utilities
+ from grpc import _utilities # pylint: disable=cyclic-import
return _utilities.channel_ready_future(channel)
@@ -1252,7 +1252,7 @@ def insecure_channel(target, options=None):
Returns:
A Channel to the target through which RPCs may be conducted.
"""
- from grpc import _channel
+ from grpc import _channel # pylint: disable=cyclic-import
return _channel.Channel(target, () if options is None else options, None)
@@ -1268,7 +1268,7 @@ def secure_channel(target, credentials, options=None):
Returns:
A Channel to the target through which RPCs may be conducted.
"""
- from grpc import _channel
+ from grpc import _channel # pylint: disable=cyclic-import
return _channel.Channel(target, () if options is None else options,
credentials._credentials)
@@ -1290,7 +1290,7 @@ def server(thread_pool, handlers=None, options=None):
Returns:
A Server with which RPCs can be serviced.
"""
- from grpc import _server
+ from grpc import _server # pylint: disable=cyclic-import
return _server.Server(thread_pool, () if handlers is None else handlers, ()
if options is None else options)
diff --git a/src/python/grpcio/grpc/_auth.py b/src/python/grpcio/grpc/_auth.py
index 21131f85f1..cb7c6fe4fd 100644
--- a/src/python/grpcio/grpc/_auth.py
+++ b/src/python/grpcio/grpc/_auth.py
@@ -39,6 +39,19 @@ def _sign_request(callback, token, error):
callback(metadata, error)
+def _create_get_token_callback(callback):
+
+ def get_token_callback(future):
+ try:
+ access_token = future.result().access_token
+ except Exception as exception: # pylint: disable=broad-except
+ _sign_request(callback, None, exception)
+ else:
+ _sign_request(callback, access_token, None)
+
+ return get_token_callback
+
+
class GoogleCallCredentials(grpc.AuthMetadataPlugin):
"""Metadata wrapper for GoogleCredentials from the oauth2client library."""
@@ -59,16 +72,7 @@ class GoogleCallCredentials(grpc.AuthMetadataPlugin):
additional_claims={'aud': context.service_url})
else:
future = self._pool.submit(self._credentials.get_access_token)
- future.add_done_callback(
- lambda x: self._get_token_callback(callback, x))
-
- def _get_token_callback(self, callback, future):
- try:
- access_token = future.result().access_token
- except Exception as e:
- _sign_request(callback, None, e)
- else:
- _sign_request(callback, access_token, None)
+ future.add_done_callback(_create_get_token_callback(callback))
def __del__(self):
self._pool.shutdown(wait=False)
diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py
index af86f5eabe..4316449ac6 100644
--- a/src/python/grpcio/grpc/_channel.py
+++ b/src/python/grpcio/grpc/_channel.py
@@ -200,7 +200,7 @@ def _consume_request_iterator(request_iterator, state, call,
request = next(request_iterator)
except StopIteration:
break
- except Exception as e:
+ except Exception: # pylint: disable=broad-except
logging.exception("Exception iterating requests!")
call.cancel()
_abort(state, grpc.StatusCode.UNKNOWN,
@@ -237,7 +237,7 @@ def _consume_request_iterator(request_iterator, state, call,
cygrpc.Operations(operations), event_handler)
state.due.add(cygrpc.OperationType.send_close_from_client)
- def stop_consumption_thread(timeout):
+ def stop_consumption_thread(timeout): # pylint: disable=unused-argument
with state.condition:
if state.code is None:
call.cancel()
@@ -387,13 +387,14 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call):
with self._state.condition:
while self._state.initial_metadata is None:
self._state.condition.wait()
- return _common.application_metadata(self._state.initial_metadata)
+ return _common.to_application_metadata(self._state.initial_metadata)
def trailing_metadata(self):
with self._state.condition:
while self._state.trailing_metadata is None:
self._state.condition.wait()
- return _common.application_metadata(self._state.trailing_metadata)
+ return _common.to_application_metadata(
+ self._state.trailing_metadata)
def code(self):
with self._state.condition:
@@ -473,7 +474,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None)
operations = (
cygrpc.operation_send_initial_metadata(
- _common.cygrpc_metadata(metadata), _EMPTY_FLAGS),
+ _common.to_cygrpc_metadata(metadata), _EMPTY_FLAGS),
cygrpc.operation_send_message(serialized_request, _EMPTY_FLAGS),
cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),
@@ -563,7 +564,7 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
)), event_handler)
operations = (
cygrpc.operation_send_initial_metadata(
- _common.cygrpc_metadata(metadata),
+ _common.to_cygrpc_metadata(metadata),
_EMPTY_FLAGS), cygrpc.operation_send_message(
serialized_request, _EMPTY_FLAGS),
cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
@@ -603,7 +604,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
None)
operations = (
cygrpc.operation_send_initial_metadata(
- _common.cygrpc_metadata(metadata), _EMPTY_FLAGS),
+ _common.to_cygrpc_metadata(metadata), _EMPTY_FLAGS),
cygrpc.operation_receive_message(_EMPTY_FLAGS),
cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),)
call_error = call.start_client_batch(
@@ -657,7 +658,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
event_handler)
operations = (
cygrpc.operation_send_initial_metadata(
- _common.cygrpc_metadata(metadata), _EMPTY_FLAGS),
+ _common.to_cygrpc_metadata(metadata), _EMPTY_FLAGS),
cygrpc.operation_receive_message(_EMPTY_FLAGS),
cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),)
call_error = call.start_client_batch(
@@ -700,7 +701,7 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
event_handler)
operations = (
cygrpc.operation_send_initial_metadata(
- _common.cygrpc_metadata(metadata), _EMPTY_FLAGS),
+ _common.to_cygrpc_metadata(metadata), _EMPTY_FLAGS),
cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),)
call_error = call.start_client_batch(
cygrpc.Operations(operations), event_handler)
@@ -735,7 +736,7 @@ def _run_channel_spin_thread(state):
state.managed_calls = None
return
- def stop_channel_spin(timeout):
+ def stop_channel_spin(timeout): # pylint: disable=unused-argument
with state.lock:
if state.managed_calls is not None:
for call in state.managed_calls:
@@ -876,12 +877,8 @@ def _moot(state):
def _subscribe(state, callback, try_to_connect):
with state.lock:
if not state.callbacks_and_connectivities and not state.polling:
-
- def cancel_all_subscriptions(timeout):
- _moot(state)
-
polling_thread = _common.CleanupThread(
- cancel_all_subscriptions,
+ lambda timeout: _moot(state),
target=_poll_connectivity,
args=(state, state.channel, bool(try_to_connect)))
polling_thread.start()
diff --git a/src/python/grpcio/grpc/_common.py b/src/python/grpcio/grpc/_common.py
index 6879e1780b..2e369013f5 100644
--- a/src/python/grpcio/grpc/_common.py
+++ b/src/python/grpcio/grpc/_common.py
@@ -97,22 +97,22 @@ def decode(b):
def channel_args(options):
- channel_args = []
+ cygrpc_args = []
for key, value in options:
if isinstance(value, six.string_types):
- channel_args.append(cygrpc.ChannelArg(encode(key), encode(value)))
+ cygrpc_args.append(cygrpc.ChannelArg(encode(key), encode(value)))
else:
- channel_args.append(cygrpc.ChannelArg(encode(key), value))
- return cygrpc.ChannelArgs(channel_args)
+ cygrpc_args.append(cygrpc.ChannelArg(encode(key), value))
+ return cygrpc.ChannelArgs(cygrpc_args)
-def cygrpc_metadata(application_metadata):
+def to_cygrpc_metadata(application_metadata):
return EMPTY_METADATA if application_metadata is None else cygrpc.Metadata(
cygrpc.Metadatum(encode(key), encode(value))
for key, value in application_metadata)
-def application_metadata(cygrpc_metadata):
+def to_application_metadata(cygrpc_metadata):
if cygrpc_metadata is None:
return ()
else:
diff --git a/src/python/grpcio/grpc/_plugin_wrapping.py b/src/python/grpcio/grpc/_plugin_wrapping.py
index 69c46aa546..1e44561c97 100644
--- a/src/python/grpcio/grpc/_plugin_wrapping.py
+++ b/src/python/grpcio/grpc/_plugin_wrapping.py
@@ -66,9 +66,9 @@ class _WrappedCygrpcCallback(object):
def _invoke_success(self, metadata):
try:
- cygrpc_metadata = _common.cygrpc_metadata(metadata)
- except Exception as error:
- self._invoke_failure(error)
+ cygrpc_metadata = _common.to_cygrpc_metadata(metadata)
+ except Exception as exception: # pylint: disable=broad-except
+ self._invoke_failure(exception)
return
self.cygrpc_callback(cygrpc_metadata, cygrpc.StatusCode.ok, b'')
diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py
index b8e7ea17f7..84e096d4c0 100644
--- a/src/python/grpcio/grpc/_server.py
+++ b/src/python/grpcio/grpc/_server.py
@@ -142,14 +142,14 @@ def _abort(state, call, code, details):
effective_details = details if state.details is None else state.details
if state.initial_metadata_allowed:
operations = (cygrpc.operation_send_initial_metadata(
- _common.EMPTY_METADATA, _EMPTY_FLAGS),
- cygrpc.operation_send_status_from_server(
- _common.cygrpc_metadata(state.trailing_metadata),
- effective_code, effective_details, _EMPTY_FLAGS),)
+ _common.EMPTY_METADATA,
+ _EMPTY_FLAGS), cygrpc.operation_send_status_from_server(
+ _common.to_cygrpc_metadata(state.trailing_metadata),
+ effective_code, effective_details, _EMPTY_FLAGS),)
token = _SEND_INITIAL_METADATA_AND_SEND_STATUS_FROM_SERVER_TOKEN
else:
operations = (cygrpc.operation_send_status_from_server(
- _common.cygrpc_metadata(state.trailing_metadata),
+ _common.to_cygrpc_metadata(state.trailing_metadata),
effective_code, effective_details, _EMPTY_FLAGS),)
token = _SEND_STATUS_FROM_SERVER_TOKEN
call.start_server_batch(
@@ -250,7 +250,7 @@ class _Context(grpc.ServicerContext):
self._state.disable_next_compression = True
def invocation_metadata(self):
- return _common.application_metadata(self._rpc_event.request_metadata)
+ return _common.to_application_metadata(self._rpc_event.request_metadata)
def peer(self):
return _common.decode(self._rpc_event.operation_call.peer())
@@ -262,7 +262,8 @@ class _Context(grpc.ServicerContext):
else:
if self._state.initial_metadata_allowed:
operation = cygrpc.operation_send_initial_metadata(
- _common.cygrpc_metadata(initial_metadata), _EMPTY_FLAGS)
+ _common.to_cygrpc_metadata(initial_metadata),
+ _EMPTY_FLAGS)
self._rpc_event.operation_call.start_server_batch(
cygrpc.Operations((operation,)),
_send_initial_metadata(self._state))
@@ -273,7 +274,7 @@ class _Context(grpc.ServicerContext):
def set_trailing_metadata(self, trailing_metadata):
with self._state.condition:
- self._state.trailing_metadata = _common.cygrpc_metadata(
+ self._state.trailing_metadata = _common.to_cygrpc_metadata(
trailing_metadata)
def set_code(self, code):
@@ -342,7 +343,7 @@ def _unary_request(rpc_event, state, request_deserializer):
if state.client is _CANCELLED or state.statused:
return None
else:
- start_server_batch_result = rpc_event.operation_call.start_server_batch(
+ rpc_event.operation_call.start_server_batch(
cygrpc.Operations(
(cygrpc.operation_receive_message(_EMPTY_FLAGS),)),
_receive_message(state, rpc_event.operation_call,
@@ -436,7 +437,8 @@ def _send_response(rpc_event, state, serialized_response):
def _status(rpc_event, state, serialized_response):
with state.condition:
if state.client is not _CANCELLED:
- trailing_metadata = _common.cygrpc_metadata(state.trailing_metadata)
+ trailing_metadata = _common.to_cygrpc_metadata(
+ state.trailing_metadata)
code = _completion_code(state)
details = _details(state)
operations = [
diff --git a/src/python/grpcio/grpc/beta/_client_adaptations.py b/src/python/grpcio/grpc/beta/_client_adaptations.py
index b53395e2a2..3c69acc019 100644
--- a/src/python/grpcio/grpc/beta/_client_adaptations.py
+++ b/src/python/grpcio/grpc/beta/_client_adaptations.py
@@ -30,12 +30,13 @@
import grpc
from grpc import _common
-from grpc._cython import cygrpc
from grpc.beta import interfaces
from grpc.framework.common import cardinality
from grpc.framework.foundation import future
from grpc.framework.interfaces.face import face
+# pylint: disable=too-many-arguments,too-many-locals,unused-argument
+
_STATUS_CODE_TO_ABORTION_KIND_AND_ABORTION_ERROR_CLASS = {
grpc.StatusCode.CANCELLED: (face.Abortion.Kind.CANCELLED,
face.CancellationError),
@@ -621,8 +622,8 @@ class _GenericStub(face.GenericStub):
class _DynamicStub(face.DynamicStub):
- def __init__(self, generic_stub, group, cardinalities):
- self._generic_stub = generic_stub
+ def __init__(self, backing_generic_stub, group, cardinalities):
+ self._generic_stub = backing_generic_stub
self._group = group
self._cardinalities = cardinalities
diff --git a/src/python/grpcio/grpc/beta/_connectivity_channel.py b/src/python/grpcio/grpc/beta/_connectivity_channel.py
deleted file mode 100644
index bfb847f80a..0000000000
--- a/src/python/grpcio/grpc/beta/_connectivity_channel.py
+++ /dev/null
@@ -1,159 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-"""Affords a connectivity-state-listenable channel."""
-
-import threading
-import time
-
-from grpc._adapter import _low
-from grpc._adapter import _types
-from grpc.beta import interfaces
-from grpc.framework.foundation import callable_util
-
-_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
- 'Exception calling channel subscription callback!')
-
-_LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY = {
- state: connectivity
- for state, connectivity in zip(_types.ConnectivityState,
- interfaces.ChannelConnectivity)
-}
-
-
-class ConnectivityChannel(object):
-
- def __init__(self, low_channel):
- self._lock = threading.Lock()
- self._low_channel = low_channel
-
- self._polling = False
- self._connectivity = None
- self._try_to_connect = False
- self._callbacks_and_connectivities = []
- self._delivering = False
-
- def _deliveries(self, connectivity):
- callbacks_needing_update = []
- for callback_and_connectivity in self._callbacks_and_connectivities:
- callback, callback_connectivity = callback_and_connectivity
- if callback_connectivity is not connectivity:
- callbacks_needing_update.append(callback)
- callback_and_connectivity[1] = connectivity
- return callbacks_needing_update
-
- def _deliver(self, initial_connectivity, initial_callbacks):
- connectivity = initial_connectivity
- callbacks = initial_callbacks
- while True:
- for callback in callbacks:
- callable_util.call_logging_exceptions(
- callback, _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE,
- connectivity)
- with self._lock:
- callbacks = self._deliveries(self._connectivity)
- if callbacks:
- connectivity = self._connectivity
- else:
- self._delivering = False
- return
-
- def _spawn_delivery(self, connectivity, callbacks):
- delivering_thread = threading.Thread(
- target=self._deliver, args=(connectivity, callbacks,))
- delivering_thread.start()
- self._delivering = True
-
- # TODO(issue 3064): Don't poll.
- def _poll_connectivity(self, low_channel, initial_try_to_connect):
- try_to_connect = initial_try_to_connect
- low_connectivity = low_channel.check_connectivity_state(try_to_connect)
- with self._lock:
- self._connectivity = _LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
- low_connectivity]
- callbacks = tuple(
- callback
- for callback, unused_but_known_to_be_none_connectivity in
- self._callbacks_and_connectivities)
- for callback_and_connectivity in self._callbacks_and_connectivities:
- callback_and_connectivity[1] = self._connectivity
- if callbacks:
- self._spawn_delivery(self._connectivity, callbacks)
- completion_queue = _low.CompletionQueue()
- while True:
- low_channel.watch_connectivity_state(low_connectivity,
- time.time() + 0.2,
- completion_queue, None)
- event = completion_queue.next()
- with self._lock:
- if not self._callbacks_and_connectivities and not self._try_to_connect:
- self._polling = False
- self._connectivity = None
- completion_queue.shutdown()
- break
- try_to_connect = self._try_to_connect
- self._try_to_connect = False
- if event.success or try_to_connect:
- low_connectivity = low_channel.check_connectivity_state(
- try_to_connect)
- with self._lock:
- self._connectivity = _LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
- low_connectivity]
- if not self._delivering:
- callbacks = self._deliveries(self._connectivity)
- if callbacks:
- self._spawn_delivery(self._connectivity, callbacks)
-
- def subscribe(self, callback, try_to_connect):
- with self._lock:
- if not self._callbacks_and_connectivities and not self._polling:
- polling_thread = threading.Thread(
- target=self._poll_connectivity,
- args=(self._low_channel, bool(try_to_connect)))
- polling_thread.start()
- self._polling = True
- self._callbacks_and_connectivities.append([callback, None])
- elif not self._delivering and self._connectivity is not None:
- self._spawn_delivery(self._connectivity, (callback,))
- self._try_to_connect |= bool(try_to_connect)
- self._callbacks_and_connectivities.append(
- [callback, self._connectivity])
- else:
- self._try_to_connect |= bool(try_to_connect)
- self._callbacks_and_connectivities.append([callback, None])
-
- def unsubscribe(self, callback):
- with self._lock:
- for index, (subscribed_callback, unused_connectivity
- ) in enumerate(self._callbacks_and_connectivities):
- if callback == subscribed_callback:
- self._callbacks_and_connectivities.pop(index)
- break
-
- def low_channel(self):
- return self._low_channel
diff --git a/src/python/grpcio/grpc/beta/_server_adaptations.py b/src/python/grpcio/grpc/beta/_server_adaptations.py
index 174af2d642..cf10c26d2f 100644
--- a/src/python/grpcio/grpc/beta/_server_adaptations.py
+++ b/src/python/grpcio/grpc/beta/_server_adaptations.py
@@ -41,6 +41,8 @@ from grpc.framework.foundation import logging_pool
from grpc.framework.foundation import stream
from grpc.framework.interfaces.face import face
+# pylint: disable=too-many-return-statements
+
_DEFAULT_POOL_SIZE = 8
@@ -78,7 +80,7 @@ class _FaceServicerContext(face.ServicerContext):
return _ServerProtocolContext(self._servicer_context)
def invocation_metadata(self):
- return _common.cygrpc_metadata(
+ return _common.to_cygrpc_metadata(
self._servicer_context.invocation_metadata())
def initial_metadata(self, initial_metadata):
@@ -179,7 +181,7 @@ def _run_request_pipe_thread(request_iterator, request_consumer,
return
request_consumer.terminate()
- def stop_request_pipe(timeout):
+ def stop_request_pipe(timeout): # pylint: disable=unused-argument
thread_joined.set()
request_pipe_thread = _common.CleanupThread(
@@ -351,27 +353,27 @@ class _GenericRpcHandler(grpc.GenericRpcHandler):
class _Server(interfaces.Server):
- def __init__(self, server):
- self._server = server
+ def __init__(self, grpc_server):
+ self._grpc_server = grpc_server
def add_insecure_port(self, address):
- return self._server.add_insecure_port(address)
+ return self._grpc_server.add_insecure_port(address)
def add_secure_port(self, address, server_credentials):
- return self._server.add_secure_port(address, server_credentials)
+ return self._grpc_server.add_secure_port(address, server_credentials)
def start(self):
- self._server.start()
+ self._grpc_server.start()
def stop(self, grace):
- return self._server.stop(grace)
+ return self._grpc_server.stop(grace)
def __enter__(self):
- self._server.start()
+ self._grpc_server.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
- self._server.stop(None)
+ self._grpc_server.stop(None)
return False
diff --git a/src/python/grpcio/grpc/beta/implementations.py b/src/python/grpcio/grpc/beta/implementations.py
index 7093852278..113fd38f8a 100644
--- a/src/python/grpcio/grpc/beta/implementations.py
+++ b/src/python/grpcio/grpc/beta/implementations.py
@@ -29,19 +29,20 @@
"""Entry points into the Beta API of gRPC Python."""
# threading is referenced from specification in this module.
-import abc
-import enum
import threading # pylint: disable=unused-import
-# cardinality and face are referenced from specification in this module.
+# interfaces, cardinality, and face are referenced from specification in this
+# module.
import grpc
from grpc import _auth
from grpc.beta import _client_adaptations
from grpc.beta import _server_adaptations
-from grpc.beta import interfaces
+from grpc.beta import interfaces # pylint: disable=unused-import
from grpc.framework.common import cardinality # pylint: disable=unused-import
from grpc.framework.interfaces.face import face # pylint: disable=unused-import
+# pylint: disable=too-many-arguments
+
ChannelCredentials = grpc.ChannelCredentials
ssl_channel_credentials = grpc.ssl_channel_credentials
CallCredentials = grpc.CallCredentials
@@ -218,7 +219,7 @@ def dynamic_stub(channel, service, cardinalities, options=None):
Returns:
A face.DynamicStub with which RPCs can be invoked.
"""
- effective_options = StubOptions() if options is None else options
+ effective_options = _EMPTY_STUB_OPTIONS if options is None else options
return _client_adaptations.dynamic_stub(
channel._channel, # pylint: disable=protected-access
service,
diff --git a/src/python/grpcio/grpc/framework/foundation/logging_pool.py b/src/python/grpcio/grpc/framework/foundation/logging_pool.py
index 7ee37373fa..0912fba139 100644
--- a/src/python/grpcio/grpc/framework/foundation/logging_pool.py
+++ b/src/python/grpcio/grpc/framework/foundation/logging_pool.py
@@ -39,7 +39,7 @@ def _wrap(behavior):
def _wrapping(*args, **kwargs):
try:
return behavior(*args, **kwargs)
- except Exception as e:
+ except Exception:
logging.exception(
'Unexpected exception from %s executed in logging pool!',
behavior)
diff --git a/src/python/grpcio/grpc/framework/interfaces/base/base.py b/src/python/grpcio/grpc/framework/interfaces/base/base.py
index cb3328296c..aa80e65f57 100644
--- a/src/python/grpcio/grpc/framework/interfaces/base/base.py
+++ b/src/python/grpcio/grpc/framework/interfaces/base/base.py
@@ -46,26 +46,29 @@ import six
# abandonment is referenced from specification in this module.
from grpc.framework.foundation import abandonment # pylint: disable=unused-import
+# pylint: disable=too-many-arguments
+
class NoSuchMethodError(Exception):
"""Indicates that an unrecognized operation has been called.
- Attributes:
- code: A code value to communicate to the other side of the operation along
- with indication of operation termination. May be None.
- details: A details value to communicate to the other side of the operation
- along with indication of operation termination. May be None.
- """
-
- def __init__(self, code, details):
- """Constructor.
-
- Args:
+ Attributes:
code: A code value to communicate to the other side of the operation
along with indication of operation termination. May be None.
details: A details value to communicate to the other side of the
operation along with indication of operation termination. May be None.
"""
+
+ def __init__(self, code, details):
+ """Constructor.
+
+ Args:
+ code: A code value to communicate to the other side of the operation
+ along with indication of operation termination. May be None.
+ details: A details value to communicate to the other side of the
+ operation along with indication of operation termination. May be None.
+ """
+ super(NoSuchMethodError, self).__init__()
self.code = code
self.details = details
diff --git a/src/python/grpcio/grpc/framework/interfaces/face/face.py b/src/python/grpcio/grpc/framework/interfaces/face/face.py
index 6c7e2a3af6..c6c44fe4e4 100644
--- a/src/python/grpcio/grpc/framework/interfaces/face/face.py
+++ b/src/python/grpcio/grpc/framework/interfaces/face/face.py
@@ -42,6 +42,8 @@ from grpc.framework.foundation import abandonment # pylint: disable=unused-impo
from grpc.framework.foundation import future # pylint: disable=unused-import
from grpc.framework.foundation import stream # pylint: disable=unused-import
+# pylint: disable=too-many-arguments
+
class NoSuchMethodError(Exception):
"""Raised by customer code to indicate an unrecognized method.
diff --git a/src/python/grpcio_health_checking/setup.py b/src/python/grpcio_health_checking/setup.py
index 072c3263c6..52ee98a2d5 100644
--- a/src/python/grpcio_health_checking/setup.py
+++ b/src/python/grpcio_health_checking/setup.py
@@ -47,7 +47,7 @@ PACKAGE_DIRECTORIES = {
SETUP_REQUIRES = (
'grpcio-tools>={version}'.format(version=grpc_version.VERSION),)
-INSTALL_REQUIRES = ('protobuf>=3.0.0',
+INSTALL_REQUIRES = ('protobuf>=3.2.0',
'grpcio>={version}'.format(version=grpc_version.VERSION),)
COMMAND_CLASS = {
diff --git a/src/python/grpcio_reflection/setup.py b/src/python/grpcio_reflection/setup.py
index 19aafe443a..e85092db57 100644
--- a/src/python/grpcio_reflection/setup.py
+++ b/src/python/grpcio_reflection/setup.py
@@ -47,7 +47,7 @@ PACKAGE_DIRECTORIES = {
SETUP_REQUIRES = (
'grpcio-tools>={version}'.format(version=grpc_version.VERSION),)
-INSTALL_REQUIRES = ('protobuf>=3.0.0',
+INSTALL_REQUIRES = ('protobuf>=3.2.0',
'grpcio>={version}'.format(version=grpc_version.VERSION),)
COMMAND_CLASS = {
diff --git a/src/python/grpcio_tests/setup.py b/src/python/grpcio_tests/setup.py
index b0c73fc575..b9f0264dae 100644
--- a/src/python/grpcio_tests/setup.py
+++ b/src/python/grpcio_tests/setup.py
@@ -56,7 +56,7 @@ INSTALL_REQUIRES = (
'grpcio>={version}'.format(version=grpc_version.VERSION),
'grpcio-tools>={version}'.format(version=grpc_version.VERSION),
'grpcio-health-checking>={version}'.format(version=grpc_version.VERSION),
- 'oauth2client>=1.4.7', 'protobuf>=3.0.0', 'six>=1.10',)
+ 'oauth2client>=1.4.7', 'protobuf>=3.2.0', 'six>=1.10',)
COMMAND_CLASS = {
# Run `preprocess` *before* doing any packaging!
diff --git a/src/python/grpcio_tests/tests/http2/negative_http2_client.py b/src/python/grpcio_tests/tests/http2/negative_http2_client.py
index b8adf093a5..b184e62cfd 100644
--- a/src/python/grpcio_tests/tests/http2/negative_http2_client.py
+++ b/src/python/grpcio_tests/tests/http2/negative_http2_client.py
@@ -31,6 +31,7 @@
import argparse
import grpc
+import time
from src.proto.grpc.testing import test_pb2
from src.proto.grpc.testing import messages_pb2
@@ -75,6 +76,7 @@ def _goaway(stub):
first_response = stub.UnaryCall(_SIMPLE_REQUEST)
_validate_payload_type_and_length(first_response, messages_pb2.COMPRESSABLE,
_RESPONSE_SIZE)
+ time.sleep(1)
second_response = stub.UnaryCall(_SIMPLE_REQUEST)
_validate_payload_type_and_length(second_response,
messages_pb2.COMPRESSABLE, _RESPONSE_SIZE)
diff --git a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py
index f2e3898ed6..ee235032f0 100644
--- a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py
+++ b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py
@@ -249,3 +249,7 @@ class InvocationDefectsTest(unittest.TestCase):
with self.assertRaises(grpc.RpcError):
for _ in range(test_constants.STREAM_LENGTH // 2 + 1):
next(response_iterator)
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py b/src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py
index be3522f46f..eb5f459848 100644
--- a/src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py
+++ b/src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py
@@ -35,8 +35,8 @@ import unittest
from grpc import _common
_SHORT_TIME = 0.5
-_LONG_TIME = 2.0
-_EPSILON = 0.1
+_LONG_TIME = 5.0
+_EPSILON = 0.5
def cleanup(timeout):