diff options
22 files changed, 145 insertions, 246 deletions
@@ -1,3 +1,14 @@ +[VARIABLES] +# TODO(https://github.com/PyCQA/pylint/issues/1345): How does the inspection +# not include "unused_" and "ignored_" by default? +dummy-variables-rgx=^ignored_|^unused_ + +[MISCELLANEOUS] +# NOTE(nathaniel): We are big fans of "TODO(<issue link>): " and +# "NOTE(<username or issue link>): ". We do not allow "TODO:", +# "TODO(<username>):", "FIXME:", or anything else. +notes=FIXME,XXX + [MESSAGES CONTROL] #TODO: Enable missing-docstring @@ -11,15 +22,11 @@ #TODO: Enable protected-access #TODO: Enable no-name-in-module #TODO: Enable unused-argument -#TODO: Enable fixme #TODO: Enable wrong-import-order -#TODO: Enable no-value-for-parameter -#TODO: Enable cyclic-import -#TODO: Enable unused-variable -#TODO: Enable redefined-outer-name -#TODO: Enable unused-import +# TODO(https://github.com/PyCQA/pylint/issues/59#issuecomment-283774279): +# enable cyclic-import after a 1.7-or-later pylint release that recognizes our +# disable=cyclic-import suppressions. #TODO: Enable too-many-instance-attributes -#TODO: Enable broad-except #TODO: Enable too-many-locals #TODO: Enable too-many-lines #TODO: Enable redefined-variable-type @@ -29,6 +36,5 @@ #TODO: Enable too-many-return-statements #TODO: Enable too-many-nested-blocks #TODO: Enable super-init-not-called -#TODO: Enable no-self-use -disable=missing-docstring,too-few-public-methods,too-many-arguments,no-init,duplicate-code,invalid-name,suppressed-message,locally-disabled,protected-access,no-name-in-module,unused-argument,fixme,wrong-import-order,no-value-for-parameter,cyclic-import,unused-variable,redefined-outer-name,unused-import,too-many-instance-attributes,broad-except,too-many-locals,too-many-lines,redefined-variable-type,next-method-called,import-error,useless-else-on-loop,too-many-return-statements,too-many-nested-blocks,super-init-not-called,no-self-use +disable=missing-docstring,too-few-public-methods,too-many-arguments,no-init,duplicate-code,invalid-name,suppressed-message,locally-disabled,protected-access,no-name-in-module,unused-argument,wrong-import-order,cyclic-import,too-many-instance-attributes,too-many-locals,too-many-lines,redefined-variable-type,next-method-called,import-error,useless-else-on-loop,too-many-return-statements,too-many-nested-blocks,super-init-not-called diff --git a/src/core/lib/tsi/test_creds/BUILD b/src/core/lib/tsi/test_creds/BUILD index dcd6d930a8..5cf04caf17 100644 --- a/src/core/lib/tsi/test_creds/BUILD +++ b/src/core/lib/tsi/test_creds/BUILD @@ -27,6 +27,8 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +licenses(["notice"]) # 3-clause BSD + exports_files([ "ca.pem", "server1.key", diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py index b64a708cc7..a4481b2ac3 100644 --- a/src/python/grpcio/grpc/__init__.py +++ b/src/python/grpcio/grpc/__init__.py @@ -1004,7 +1004,7 @@ def unary_unary_rpc_method_handler(behavior, An RpcMethodHandler for a unary-unary RPC method constructed from the given parameters. """ - from grpc import _utilities + from grpc import _utilities # pylint: disable=cyclic-import return _utilities.RpcMethodHandler(False, False, request_deserializer, response_serializer, behavior, None, None, None) @@ -1025,7 +1025,7 @@ def unary_stream_rpc_method_handler(behavior, An RpcMethodHandler for a unary-stream RPC method constructed from the given parameters. """ - from grpc import _utilities + from grpc import _utilities # pylint: disable=cyclic-import return _utilities.RpcMethodHandler(False, True, request_deserializer, response_serializer, None, behavior, None, None) @@ -1046,7 +1046,7 @@ def stream_unary_rpc_method_handler(behavior, An RpcMethodHandler for a stream-unary RPC method constructed from the given parameters. """ - from grpc import _utilities + from grpc import _utilities # pylint: disable=cyclic-import return _utilities.RpcMethodHandler(True, False, request_deserializer, response_serializer, None, None, behavior, None) @@ -1068,7 +1068,7 @@ def stream_stream_rpc_method_handler(behavior, An RpcMethodHandler for a stream-stream RPC method constructed from the given parameters. """ - from grpc import _utilities + from grpc import _utilities # pylint: disable=cyclic-import return _utilities.RpcMethodHandler(True, True, request_deserializer, response_serializer, None, None, None, behavior) @@ -1085,7 +1085,7 @@ def method_handlers_generic_handler(service, method_handlers): Returns: A GenericRpcHandler constructed from the given parameters. """ - from grpc import _utilities + from grpc import _utilities # pylint: disable=cyclic-import return _utilities.DictionaryGenericHandler(service, method_handlers) @@ -1124,7 +1124,7 @@ def metadata_call_credentials(metadata_plugin, name=None): Returns: A CallCredentials. """ - from grpc import _plugin_wrapping + from grpc import _plugin_wrapping # pylint: disable=cyclic-import if name is None: try: effective_name = metadata_plugin.__name__ @@ -1147,7 +1147,7 @@ def access_token_call_credentials(access_token): Returns: A CallCredentials. """ - from grpc import _auth + from grpc import _auth # pylint: disable=cyclic-import return metadata_call_credentials( _auth.AccessTokenCallCredentials(access_token)) @@ -1161,7 +1161,7 @@ def composite_call_credentials(*call_credentials): Returns: A CallCredentials object composed of the given CallCredentials objects. """ - from grpc import _credential_composition + from grpc import _credential_composition # pylint: disable=cyclic-import cygrpc_call_credentials = tuple( single_call_credentials._credentials for single_call_credentials in call_credentials) @@ -1180,7 +1180,7 @@ def composite_channel_credentials(channel_credentials, *call_credentials): A ChannelCredentials composed of the given ChannelCredentials and CallCredentials objects. """ - from grpc import _credential_composition + from grpc import _credential_composition # pylint: disable=cyclic-import cygrpc_call_credentials = tuple( single_call_credentials._credentials for single_call_credentials in call_credentials) @@ -1237,7 +1237,7 @@ def channel_ready_future(channel): A Future that matures when the given Channel has connectivity ChannelConnectivity.READY. """ - from grpc import _utilities + from grpc import _utilities # pylint: disable=cyclic-import return _utilities.channel_ready_future(channel) @@ -1252,7 +1252,7 @@ def insecure_channel(target, options=None): Returns: A Channel to the target through which RPCs may be conducted. """ - from grpc import _channel + from grpc import _channel # pylint: disable=cyclic-import return _channel.Channel(target, () if options is None else options, None) @@ -1268,7 +1268,7 @@ def secure_channel(target, credentials, options=None): Returns: A Channel to the target through which RPCs may be conducted. """ - from grpc import _channel + from grpc import _channel # pylint: disable=cyclic-import return _channel.Channel(target, () if options is None else options, credentials._credentials) @@ -1290,7 +1290,7 @@ def server(thread_pool, handlers=None, options=None): Returns: A Server with which RPCs can be serviced. """ - from grpc import _server + from grpc import _server # pylint: disable=cyclic-import return _server.Server(thread_pool, () if handlers is None else handlers, () if options is None else options) diff --git a/src/python/grpcio/grpc/_auth.py b/src/python/grpcio/grpc/_auth.py index 21131f85f1..cb7c6fe4fd 100644 --- a/src/python/grpcio/grpc/_auth.py +++ b/src/python/grpcio/grpc/_auth.py @@ -39,6 +39,19 @@ def _sign_request(callback, token, error): callback(metadata, error) +def _create_get_token_callback(callback): + + def get_token_callback(future): + try: + access_token = future.result().access_token + except Exception as exception: # pylint: disable=broad-except + _sign_request(callback, None, exception) + else: + _sign_request(callback, access_token, None) + + return get_token_callback + + class GoogleCallCredentials(grpc.AuthMetadataPlugin): """Metadata wrapper for GoogleCredentials from the oauth2client library.""" @@ -59,16 +72,7 @@ class GoogleCallCredentials(grpc.AuthMetadataPlugin): additional_claims={'aud': context.service_url}) else: future = self._pool.submit(self._credentials.get_access_token) - future.add_done_callback( - lambda x: self._get_token_callback(callback, x)) - - def _get_token_callback(self, callback, future): - try: - access_token = future.result().access_token - except Exception as e: - _sign_request(callback, None, e) - else: - _sign_request(callback, access_token, None) + future.add_done_callback(_create_get_token_callback(callback)) def __del__(self): self._pool.shutdown(wait=False) diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index af86f5eabe..c86cb25682 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -200,7 +200,7 @@ def _consume_request_iterator(request_iterator, state, call, request = next(request_iterator) except StopIteration: break - except Exception as e: + except Exception: # pylint: disable=broad-except logging.exception("Exception iterating requests!") call.cancel() _abort(state, grpc.StatusCode.UNKNOWN, @@ -387,13 +387,14 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call): with self._state.condition: while self._state.initial_metadata is None: self._state.condition.wait() - return _common.application_metadata(self._state.initial_metadata) + return _common.to_application_metadata(self._state.initial_metadata) def trailing_metadata(self): with self._state.condition: while self._state.trailing_metadata is None: self._state.condition.wait() - return _common.application_metadata(self._state.trailing_metadata) + return _common.to_application_metadata( + self._state.trailing_metadata) def code(self): with self._state.condition: @@ -473,7 +474,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None) operations = ( cygrpc.operation_send_initial_metadata( - _common.cygrpc_metadata(metadata), _EMPTY_FLAGS), + _common.to_cygrpc_metadata(metadata), _EMPTY_FLAGS), cygrpc.operation_send_message(serialized_request, _EMPTY_FLAGS), cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), @@ -563,7 +564,7 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): )), event_handler) operations = ( cygrpc.operation_send_initial_metadata( - _common.cygrpc_metadata(metadata), + _common.to_cygrpc_metadata(metadata), _EMPTY_FLAGS), cygrpc.operation_send_message( serialized_request, _EMPTY_FLAGS), cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), @@ -603,7 +604,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): None) operations = ( cygrpc.operation_send_initial_metadata( - _common.cygrpc_metadata(metadata), _EMPTY_FLAGS), + _common.to_cygrpc_metadata(metadata), _EMPTY_FLAGS), cygrpc.operation_receive_message(_EMPTY_FLAGS), cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) call_error = call.start_client_batch( @@ -657,7 +658,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): event_handler) operations = ( cygrpc.operation_send_initial_metadata( - _common.cygrpc_metadata(metadata), _EMPTY_FLAGS), + _common.to_cygrpc_metadata(metadata), _EMPTY_FLAGS), cygrpc.operation_receive_message(_EMPTY_FLAGS), cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) call_error = call.start_client_batch( @@ -700,7 +701,7 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): event_handler) operations = ( cygrpc.operation_send_initial_metadata( - _common.cygrpc_metadata(metadata), _EMPTY_FLAGS), + _common.to_cygrpc_metadata(metadata), _EMPTY_FLAGS), cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) call_error = call.start_client_batch( cygrpc.Operations(operations), event_handler) diff --git a/src/python/grpcio/grpc/_common.py b/src/python/grpcio/grpc/_common.py index 6879e1780b..2e369013f5 100644 --- a/src/python/grpcio/grpc/_common.py +++ b/src/python/grpcio/grpc/_common.py @@ -97,22 +97,22 @@ def decode(b): def channel_args(options): - channel_args = [] + cygrpc_args = [] for key, value in options: if isinstance(value, six.string_types): - channel_args.append(cygrpc.ChannelArg(encode(key), encode(value))) + cygrpc_args.append(cygrpc.ChannelArg(encode(key), encode(value))) else: - channel_args.append(cygrpc.ChannelArg(encode(key), value)) - return cygrpc.ChannelArgs(channel_args) + cygrpc_args.append(cygrpc.ChannelArg(encode(key), value)) + return cygrpc.ChannelArgs(cygrpc_args) -def cygrpc_metadata(application_metadata): +def to_cygrpc_metadata(application_metadata): return EMPTY_METADATA if application_metadata is None else cygrpc.Metadata( cygrpc.Metadatum(encode(key), encode(value)) for key, value in application_metadata) -def application_metadata(cygrpc_metadata): +def to_application_metadata(cygrpc_metadata): if cygrpc_metadata is None: return () else: diff --git a/src/python/grpcio/grpc/_plugin_wrapping.py b/src/python/grpcio/grpc/_plugin_wrapping.py index 69c46aa546..1e44561c97 100644 --- a/src/python/grpcio/grpc/_plugin_wrapping.py +++ b/src/python/grpcio/grpc/_plugin_wrapping.py @@ -66,9 +66,9 @@ class _WrappedCygrpcCallback(object): def _invoke_success(self, metadata): try: - cygrpc_metadata = _common.cygrpc_metadata(metadata) - except Exception as error: - self._invoke_failure(error) + cygrpc_metadata = _common.to_cygrpc_metadata(metadata) + except Exception as exception: # pylint: disable=broad-except + self._invoke_failure(exception) return self.cygrpc_callback(cygrpc_metadata, cygrpc.StatusCode.ok, b'') diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index b8e7ea17f7..84e096d4c0 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -142,14 +142,14 @@ def _abort(state, call, code, details): effective_details = details if state.details is None else state.details if state.initial_metadata_allowed: operations = (cygrpc.operation_send_initial_metadata( - _common.EMPTY_METADATA, _EMPTY_FLAGS), - cygrpc.operation_send_status_from_server( - _common.cygrpc_metadata(state.trailing_metadata), - effective_code, effective_details, _EMPTY_FLAGS),) + _common.EMPTY_METADATA, + _EMPTY_FLAGS), cygrpc.operation_send_status_from_server( + _common.to_cygrpc_metadata(state.trailing_metadata), + effective_code, effective_details, _EMPTY_FLAGS),) token = _SEND_INITIAL_METADATA_AND_SEND_STATUS_FROM_SERVER_TOKEN else: operations = (cygrpc.operation_send_status_from_server( - _common.cygrpc_metadata(state.trailing_metadata), + _common.to_cygrpc_metadata(state.trailing_metadata), effective_code, effective_details, _EMPTY_FLAGS),) token = _SEND_STATUS_FROM_SERVER_TOKEN call.start_server_batch( @@ -250,7 +250,7 @@ class _Context(grpc.ServicerContext): self._state.disable_next_compression = True def invocation_metadata(self): - return _common.application_metadata(self._rpc_event.request_metadata) + return _common.to_application_metadata(self._rpc_event.request_metadata) def peer(self): return _common.decode(self._rpc_event.operation_call.peer()) @@ -262,7 +262,8 @@ class _Context(grpc.ServicerContext): else: if self._state.initial_metadata_allowed: operation = cygrpc.operation_send_initial_metadata( - _common.cygrpc_metadata(initial_metadata), _EMPTY_FLAGS) + _common.to_cygrpc_metadata(initial_metadata), + _EMPTY_FLAGS) self._rpc_event.operation_call.start_server_batch( cygrpc.Operations((operation,)), _send_initial_metadata(self._state)) @@ -273,7 +274,7 @@ class _Context(grpc.ServicerContext): def set_trailing_metadata(self, trailing_metadata): with self._state.condition: - self._state.trailing_metadata = _common.cygrpc_metadata( + self._state.trailing_metadata = _common.to_cygrpc_metadata( trailing_metadata) def set_code(self, code): @@ -342,7 +343,7 @@ def _unary_request(rpc_event, state, request_deserializer): if state.client is _CANCELLED or state.statused: return None else: - start_server_batch_result = rpc_event.operation_call.start_server_batch( + rpc_event.operation_call.start_server_batch( cygrpc.Operations( (cygrpc.operation_receive_message(_EMPTY_FLAGS),)), _receive_message(state, rpc_event.operation_call, @@ -436,7 +437,8 @@ def _send_response(rpc_event, state, serialized_response): def _status(rpc_event, state, serialized_response): with state.condition: if state.client is not _CANCELLED: - trailing_metadata = _common.cygrpc_metadata(state.trailing_metadata) + trailing_metadata = _common.to_cygrpc_metadata( + state.trailing_metadata) code = _completion_code(state) details = _details(state) operations = [ diff --git a/src/python/grpcio/grpc/beta/_client_adaptations.py b/src/python/grpcio/grpc/beta/_client_adaptations.py index b53395e2a2..901cb85612 100644 --- a/src/python/grpcio/grpc/beta/_client_adaptations.py +++ b/src/python/grpcio/grpc/beta/_client_adaptations.py @@ -30,7 +30,6 @@ import grpc from grpc import _common -from grpc._cython import cygrpc from grpc.beta import interfaces from grpc.framework.common import cardinality from grpc.framework.foundation import future @@ -621,8 +620,8 @@ class _GenericStub(face.GenericStub): class _DynamicStub(face.DynamicStub): - def __init__(self, generic_stub, group, cardinalities): - self._generic_stub = generic_stub + def __init__(self, backing_generic_stub, group, cardinalities): + self._generic_stub = backing_generic_stub self._group = group self._cardinalities = cardinalities diff --git a/src/python/grpcio/grpc/beta/_connectivity_channel.py b/src/python/grpcio/grpc/beta/_connectivity_channel.py deleted file mode 100644 index bfb847f80a..0000000000 --- a/src/python/grpcio/grpc/beta/_connectivity_channel.py +++ /dev/null @@ -1,159 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -"""Affords a connectivity-state-listenable channel.""" - -import threading -import time - -from grpc._adapter import _low -from grpc._adapter import _types -from grpc.beta import interfaces -from grpc.framework.foundation import callable_util - -_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = ( - 'Exception calling channel subscription callback!') - -_LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY = { - state: connectivity - for state, connectivity in zip(_types.ConnectivityState, - interfaces.ChannelConnectivity) -} - - -class ConnectivityChannel(object): - - def __init__(self, low_channel): - self._lock = threading.Lock() - self._low_channel = low_channel - - self._polling = False - self._connectivity = None - self._try_to_connect = False - self._callbacks_and_connectivities = [] - self._delivering = False - - def _deliveries(self, connectivity): - callbacks_needing_update = [] - for callback_and_connectivity in self._callbacks_and_connectivities: - callback, callback_connectivity = callback_and_connectivity - if callback_connectivity is not connectivity: - callbacks_needing_update.append(callback) - callback_and_connectivity[1] = connectivity - return callbacks_needing_update - - def _deliver(self, initial_connectivity, initial_callbacks): - connectivity = initial_connectivity - callbacks = initial_callbacks - while True: - for callback in callbacks: - callable_util.call_logging_exceptions( - callback, _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE, - connectivity) - with self._lock: - callbacks = self._deliveries(self._connectivity) - if callbacks: - connectivity = self._connectivity - else: - self._delivering = False - return - - def _spawn_delivery(self, connectivity, callbacks): - delivering_thread = threading.Thread( - target=self._deliver, args=(connectivity, callbacks,)) - delivering_thread.start() - self._delivering = True - - # TODO(issue 3064): Don't poll. - def _poll_connectivity(self, low_channel, initial_try_to_connect): - try_to_connect = initial_try_to_connect - low_connectivity = low_channel.check_connectivity_state(try_to_connect) - with self._lock: - self._connectivity = _LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[ - low_connectivity] - callbacks = tuple( - callback - for callback, unused_but_known_to_be_none_connectivity in - self._callbacks_and_connectivities) - for callback_and_connectivity in self._callbacks_and_connectivities: - callback_and_connectivity[1] = self._connectivity - if callbacks: - self._spawn_delivery(self._connectivity, callbacks) - completion_queue = _low.CompletionQueue() - while True: - low_channel.watch_connectivity_state(low_connectivity, - time.time() + 0.2, - completion_queue, None) - event = completion_queue.next() - with self._lock: - if not self._callbacks_and_connectivities and not self._try_to_connect: - self._polling = False - self._connectivity = None - completion_queue.shutdown() - break - try_to_connect = self._try_to_connect - self._try_to_connect = False - if event.success or try_to_connect: - low_connectivity = low_channel.check_connectivity_state( - try_to_connect) - with self._lock: - self._connectivity = _LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[ - low_connectivity] - if not self._delivering: - callbacks = self._deliveries(self._connectivity) - if callbacks: - self._spawn_delivery(self._connectivity, callbacks) - - def subscribe(self, callback, try_to_connect): - with self._lock: - if not self._callbacks_and_connectivities and not self._polling: - polling_thread = threading.Thread( - target=self._poll_connectivity, - args=(self._low_channel, bool(try_to_connect))) - polling_thread.start() - self._polling = True - self._callbacks_and_connectivities.append([callback, None]) - elif not self._delivering and self._connectivity is not None: - self._spawn_delivery(self._connectivity, (callback,)) - self._try_to_connect |= bool(try_to_connect) - self._callbacks_and_connectivities.append( - [callback, self._connectivity]) - else: - self._try_to_connect |= bool(try_to_connect) - self._callbacks_and_connectivities.append([callback, None]) - - def unsubscribe(self, callback): - with self._lock: - for index, (subscribed_callback, unused_connectivity - ) in enumerate(self._callbacks_and_connectivities): - if callback == subscribed_callback: - self._callbacks_and_connectivities.pop(index) - break - - def low_channel(self): - return self._low_channel diff --git a/src/python/grpcio/grpc/beta/_server_adaptations.py b/src/python/grpcio/grpc/beta/_server_adaptations.py index 174af2d642..81348d5d87 100644 --- a/src/python/grpcio/grpc/beta/_server_adaptations.py +++ b/src/python/grpcio/grpc/beta/_server_adaptations.py @@ -78,7 +78,7 @@ class _FaceServicerContext(face.ServicerContext): return _ServerProtocolContext(self._servicer_context) def invocation_metadata(self): - return _common.cygrpc_metadata( + return _common.to_cygrpc_metadata( self._servicer_context.invocation_metadata()) def initial_metadata(self, initial_metadata): @@ -351,27 +351,27 @@ class _GenericRpcHandler(grpc.GenericRpcHandler): class _Server(interfaces.Server): - def __init__(self, server): - self._server = server + def __init__(self, grpc_server): + self._grpc_server = grpc_server def add_insecure_port(self, address): - return self._server.add_insecure_port(address) + return self._grpc_server.add_insecure_port(address) def add_secure_port(self, address, server_credentials): - return self._server.add_secure_port(address, server_credentials) + return self._grpc_server.add_secure_port(address, server_credentials) def start(self): - self._server.start() + self._grpc_server.start() def stop(self, grace): - return self._server.stop(grace) + return self._grpc_server.stop(grace) def __enter__(self): - self._server.start() + self._grpc_server.start() return self def __exit__(self, exc_type, exc_val, exc_tb): - self._server.stop(None) + self._grpc_server.stop(None) return False diff --git a/src/python/grpcio/grpc/beta/implementations.py b/src/python/grpcio/grpc/beta/implementations.py index 7093852278..0b79577689 100644 --- a/src/python/grpcio/grpc/beta/implementations.py +++ b/src/python/grpcio/grpc/beta/implementations.py @@ -29,16 +29,15 @@ """Entry points into the Beta API of gRPC Python.""" # threading is referenced from specification in this module. -import abc -import enum import threading # pylint: disable=unused-import -# cardinality and face are referenced from specification in this module. +# interfaces, cardinality, and face are referenced from specification in this +# module. import grpc from grpc import _auth from grpc.beta import _client_adaptations from grpc.beta import _server_adaptations -from grpc.beta import interfaces +from grpc.beta import interfaces # pylint: disable=unused-import from grpc.framework.common import cardinality # pylint: disable=unused-import from grpc.framework.interfaces.face import face # pylint: disable=unused-import @@ -218,7 +217,7 @@ def dynamic_stub(channel, service, cardinalities, options=None): Returns: A face.DynamicStub with which RPCs can be invoked. """ - effective_options = StubOptions() if options is None else options + effective_options = _EMPTY_STUB_OPTIONS if options is None else options return _client_adaptations.dynamic_stub( channel._channel, # pylint: disable=protected-access service, diff --git a/src/python/grpcio/grpc/framework/foundation/logging_pool.py b/src/python/grpcio/grpc/framework/foundation/logging_pool.py index 7ee37373fa..0912fba139 100644 --- a/src/python/grpcio/grpc/framework/foundation/logging_pool.py +++ b/src/python/grpcio/grpc/framework/foundation/logging_pool.py @@ -39,7 +39,7 @@ def _wrap(behavior): def _wrapping(*args, **kwargs): try: return behavior(*args, **kwargs) - except Exception as e: + except Exception: logging.exception( 'Unexpected exception from %s executed in logging pool!', behavior) diff --git a/src/python/grpcio_tests/tests/http2/negative_http2_client.py b/src/python/grpcio_tests/tests/http2/negative_http2_client.py index b8adf093a5..b184e62cfd 100644 --- a/src/python/grpcio_tests/tests/http2/negative_http2_client.py +++ b/src/python/grpcio_tests/tests/http2/negative_http2_client.py @@ -31,6 +31,7 @@ import argparse import grpc +import time from src.proto.grpc.testing import test_pb2 from src.proto.grpc.testing import messages_pb2 @@ -75,6 +76,7 @@ def _goaway(stub): first_response = stub.UnaryCall(_SIMPLE_REQUEST) _validate_payload_type_and_length(first_response, messages_pb2.COMPRESSABLE, _RESPONSE_SIZE) + time.sleep(1) second_response = stub.UnaryCall(_SIMPLE_REQUEST) _validate_payload_type_and_length(second_response, messages_pb2.COMPRESSABLE, _RESPONSE_SIZE) diff --git a/test/core/census/BUILD b/test/core/census/BUILD index 9ec48bdfe2..49680ab91f 100644 --- a/test/core/census/BUILD +++ b/test/core/census/BUILD @@ -27,6 +27,8 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +licenses(["notice"]) # 3-clause BSD + cc_test( name = "context_test", srcs = ["context_test.c"], diff --git a/test/core/channel/BUILD b/test/core/channel/BUILD index 42cb468485..c6590465f1 100644 --- a/test/core/channel/BUILD +++ b/test/core/channel/BUILD @@ -27,6 +27,8 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +licenses(["notice"]) # 3-clause BSD + cc_test( name = "channel_args_test", srcs = ["channel_args_test.c"], diff --git a/test/core/compression/BUILD b/test/core/compression/BUILD index a243a72029..9ddb4c52b4 100644 --- a/test/core/compression/BUILD +++ b/test/core/compression/BUILD @@ -27,6 +27,8 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +licenses(["notice"]) # 3-clause BSD + cc_test( name = "algorithm_test", srcs = ["algorithm_test.c"], diff --git a/test/core/handshake/BUILD b/test/core/handshake/BUILD index 864e0db00b..eb8f3a9beb 100644 --- a/test/core/handshake/BUILD +++ b/test/core/handshake/BUILD @@ -27,6 +27,8 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +licenses(["notice"]) # 3-clause BSD + cc_test( name = "client_ssl", srcs = ["client_ssl.c"], diff --git a/test/core/support/BUILD b/test/core/support/BUILD index 08cee1441b..3183510db9 100644 --- a/test/core/support/BUILD +++ b/test/core/support/BUILD @@ -27,6 +27,8 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +licenses(["notice"]) # 3-clause BSD + cc_test( name = "alloc_test", srcs = ["alloc_test.c"], diff --git a/test/core/surface/BUILD b/test/core/surface/BUILD index c158413122..3d5e26ced3 100644 --- a/test/core/surface/BUILD +++ b/test/core/surface/BUILD @@ -27,6 +27,8 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +licenses(["notice"]) # 3-clause BSD + cc_test( name = "alarm_test", srcs = ["alarm_test.c"], diff --git a/test/cpp/microbenchmarks/bm_call_create.cc b/test/cpp/microbenchmarks/bm_call_create.cc index ecfa20aa90..21e3374f91 100644 --- a/test/cpp/microbenchmarks/bm_call_create.cc +++ b/test/cpp/microbenchmarks/bm_call_create.cc @@ -62,22 +62,49 @@ static struct Init { ~Init() { grpc_shutdown(); } } g_init; -static void BM_InsecureChannelWithDefaults(benchmark::State &state) { - grpc_channel *channel = - grpc_insecure_channel_create("localhost:12345", NULL, NULL); +class BaseChannelFixture { + public: + BaseChannelFixture(grpc_channel *channel) : channel_(channel) {} + ~BaseChannelFixture() { grpc_channel_destroy(channel_); } + + grpc_channel *channel() const { return channel_; } + + private: + grpc_channel *const channel_; +}; + +class InsecureChannel : public BaseChannelFixture { + public: + InsecureChannel() + : BaseChannelFixture( + grpc_insecure_channel_create("localhost:1234", NULL, NULL)) {} +}; + +class LameChannel : public BaseChannelFixture { + public: + LameChannel() + : BaseChannelFixture(grpc_lame_client_channel_create( + "localhost:1234", GRPC_STATUS_UNAUTHENTICATED, "blah")) {} +}; + +template <class Fixture> +static void BM_CallCreateDestroy(benchmark::State &state) { + Fixture fixture; grpc_completion_queue *cq = - grpc_completion_queue_create(GRPC_CQ_PLUCK, DEFAULT_POLLING, NULL); - grpc_slice method = grpc_slice_from_static_string("/foo/bar"); + grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + void *method_hdl = + grpc_channel_register_call(fixture.channel(), "/foo/bar", NULL, NULL); while (state.KeepRunning()) { - grpc_call_destroy(grpc_channel_create_call(channel, NULL, - GRPC_PROPAGATE_DEFAULTS, cq, - method, NULL, deadline, NULL)); + grpc_call_destroy(grpc_channel_create_registered_call( + fixture.channel(), NULL, GRPC_PROPAGATE_DEFAULTS, cq, method_hdl, + deadline, NULL)); } - grpc_channel_destroy(channel); grpc_completion_queue_destroy(cq); } -BENCHMARK(BM_InsecureChannelWithDefaults); + +BENCHMARK_TEMPLATE(BM_CallCreateDestroy, InsecureChannel); +BENCHMARK_TEMPLATE(BM_CallCreateDestroy, LameChannel); static void FilterDestroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { diff --git a/tools/profiling/microbenchmarks/bm2bq.py b/tools/profiling/microbenchmarks/bm2bq.py index 76ed0fef0d..a83b3be89c 100755 --- a/tools/profiling/microbenchmarks/bm2bq.py +++ b/tools/profiling/microbenchmarks/bm2bq.py @@ -149,6 +149,10 @@ bm_specs = { 'tpl': ['fixture'], 'dyn': [], }, + 'BM_CallCreateDestroy' : { + 'tpl': ['fixture'], + 'dyn': [], + }, } def numericalize(s): |