diff options
author | 2017-03-03 10:57:07 -0800 | |
---|---|---|
committer | 2017-03-03 10:57:07 -0800 | |
commit | 2c7ab675c3595b2a5d80ad8211cdf96f07da4f34 (patch) | |
tree | 243948bcb1593385b49f7bb52153193e94e8a1c7 /src | |
parent | 39b6f27a3570d850076ae65fc4b1f32a621ce96c (diff) | |
parent | 26515ab5badc411117e2eebd4c140bc52cecad11 (diff) |
Merge branch 'master' into cq_create_api_changes
Diffstat (limited to 'src')
-rw-r--r-- | src/core/lib/tsi/test_creds/BUILD | 2 | ||||
-rw-r--r-- | src/python/grpcio/grpc/__init__.py | 26 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_auth.py | 24 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_channel.py | 17 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_common.py | 12 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_plugin_wrapping.py | 6 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_server.py | 22 | ||||
-rw-r--r-- | src/python/grpcio/grpc/beta/_client_adaptations.py | 5 | ||||
-rw-r--r-- | src/python/grpcio/grpc/beta/_connectivity_channel.py | 159 | ||||
-rw-r--r-- | src/python/grpcio/grpc/beta/_server_adaptations.py | 18 | ||||
-rw-r--r-- | src/python/grpcio/grpc/beta/implementations.py | 9 | ||||
-rw-r--r-- | src/python/grpcio/grpc/framework/foundation/logging_pool.py | 2 | ||||
-rw-r--r-- | src/python/grpcio_tests/tests/http2/negative_http2_client.py | 2 |
13 files changed, 77 insertions, 227 deletions
diff --git a/src/core/lib/tsi/test_creds/BUILD b/src/core/lib/tsi/test_creds/BUILD index dcd6d930a8..5cf04caf17 100644 --- a/src/core/lib/tsi/test_creds/BUILD +++ b/src/core/lib/tsi/test_creds/BUILD @@ -27,6 +27,8 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +licenses(["notice"]) # 3-clause BSD + exports_files([ "ca.pem", "server1.key", diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py index b64a708cc7..a4481b2ac3 100644 --- a/src/python/grpcio/grpc/__init__.py +++ b/src/python/grpcio/grpc/__init__.py @@ -1004,7 +1004,7 @@ def unary_unary_rpc_method_handler(behavior, An RpcMethodHandler for a unary-unary RPC method constructed from the given parameters. """ - from grpc import _utilities + from grpc import _utilities # pylint: disable=cyclic-import return _utilities.RpcMethodHandler(False, False, request_deserializer, response_serializer, behavior, None, None, None) @@ -1025,7 +1025,7 @@ def unary_stream_rpc_method_handler(behavior, An RpcMethodHandler for a unary-stream RPC method constructed from the given parameters. """ - from grpc import _utilities + from grpc import _utilities # pylint: disable=cyclic-import return _utilities.RpcMethodHandler(False, True, request_deserializer, response_serializer, None, behavior, None, None) @@ -1046,7 +1046,7 @@ def stream_unary_rpc_method_handler(behavior, An RpcMethodHandler for a stream-unary RPC method constructed from the given parameters. """ - from grpc import _utilities + from grpc import _utilities # pylint: disable=cyclic-import return _utilities.RpcMethodHandler(True, False, request_deserializer, response_serializer, None, None, behavior, None) @@ -1068,7 +1068,7 @@ def stream_stream_rpc_method_handler(behavior, An RpcMethodHandler for a stream-stream RPC method constructed from the given parameters. """ - from grpc import _utilities + from grpc import _utilities # pylint: disable=cyclic-import return _utilities.RpcMethodHandler(True, True, request_deserializer, response_serializer, None, None, None, behavior) @@ -1085,7 +1085,7 @@ def method_handlers_generic_handler(service, method_handlers): Returns: A GenericRpcHandler constructed from the given parameters. """ - from grpc import _utilities + from grpc import _utilities # pylint: disable=cyclic-import return _utilities.DictionaryGenericHandler(service, method_handlers) @@ -1124,7 +1124,7 @@ def metadata_call_credentials(metadata_plugin, name=None): Returns: A CallCredentials. """ - from grpc import _plugin_wrapping + from grpc import _plugin_wrapping # pylint: disable=cyclic-import if name is None: try: effective_name = metadata_plugin.__name__ @@ -1147,7 +1147,7 @@ def access_token_call_credentials(access_token): Returns: A CallCredentials. """ - from grpc import _auth + from grpc import _auth # pylint: disable=cyclic-import return metadata_call_credentials( _auth.AccessTokenCallCredentials(access_token)) @@ -1161,7 +1161,7 @@ def composite_call_credentials(*call_credentials): Returns: A CallCredentials object composed of the given CallCredentials objects. """ - from grpc import _credential_composition + from grpc import _credential_composition # pylint: disable=cyclic-import cygrpc_call_credentials = tuple( single_call_credentials._credentials for single_call_credentials in call_credentials) @@ -1180,7 +1180,7 @@ def composite_channel_credentials(channel_credentials, *call_credentials): A ChannelCredentials composed of the given ChannelCredentials and CallCredentials objects. """ - from grpc import _credential_composition + from grpc import _credential_composition # pylint: disable=cyclic-import cygrpc_call_credentials = tuple( single_call_credentials._credentials for single_call_credentials in call_credentials) @@ -1237,7 +1237,7 @@ def channel_ready_future(channel): A Future that matures when the given Channel has connectivity ChannelConnectivity.READY. """ - from grpc import _utilities + from grpc import _utilities # pylint: disable=cyclic-import return _utilities.channel_ready_future(channel) @@ -1252,7 +1252,7 @@ def insecure_channel(target, options=None): Returns: A Channel to the target through which RPCs may be conducted. """ - from grpc import _channel + from grpc import _channel # pylint: disable=cyclic-import return _channel.Channel(target, () if options is None else options, None) @@ -1268,7 +1268,7 @@ def secure_channel(target, credentials, options=None): Returns: A Channel to the target through which RPCs may be conducted. """ - from grpc import _channel + from grpc import _channel # pylint: disable=cyclic-import return _channel.Channel(target, () if options is None else options, credentials._credentials) @@ -1290,7 +1290,7 @@ def server(thread_pool, handlers=None, options=None): Returns: A Server with which RPCs can be serviced. """ - from grpc import _server + from grpc import _server # pylint: disable=cyclic-import return _server.Server(thread_pool, () if handlers is None else handlers, () if options is None else options) diff --git a/src/python/grpcio/grpc/_auth.py b/src/python/grpcio/grpc/_auth.py index 21131f85f1..cb7c6fe4fd 100644 --- a/src/python/grpcio/grpc/_auth.py +++ b/src/python/grpcio/grpc/_auth.py @@ -39,6 +39,19 @@ def _sign_request(callback, token, error): callback(metadata, error) +def _create_get_token_callback(callback): + + def get_token_callback(future): + try: + access_token = future.result().access_token + except Exception as exception: # pylint: disable=broad-except + _sign_request(callback, None, exception) + else: + _sign_request(callback, access_token, None) + + return get_token_callback + + class GoogleCallCredentials(grpc.AuthMetadataPlugin): """Metadata wrapper for GoogleCredentials from the oauth2client library.""" @@ -59,16 +72,7 @@ class GoogleCallCredentials(grpc.AuthMetadataPlugin): additional_claims={'aud': context.service_url}) else: future = self._pool.submit(self._credentials.get_access_token) - future.add_done_callback( - lambda x: self._get_token_callback(callback, x)) - - def _get_token_callback(self, callback, future): - try: - access_token = future.result().access_token - except Exception as e: - _sign_request(callback, None, e) - else: - _sign_request(callback, access_token, None) + future.add_done_callback(_create_get_token_callback(callback)) def __del__(self): self._pool.shutdown(wait=False) diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index af86f5eabe..c86cb25682 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -200,7 +200,7 @@ def _consume_request_iterator(request_iterator, state, call, request = next(request_iterator) except StopIteration: break - except Exception as e: + except Exception: # pylint: disable=broad-except logging.exception("Exception iterating requests!") call.cancel() _abort(state, grpc.StatusCode.UNKNOWN, @@ -387,13 +387,14 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call): with self._state.condition: while self._state.initial_metadata is None: self._state.condition.wait() - return _common.application_metadata(self._state.initial_metadata) + return _common.to_application_metadata(self._state.initial_metadata) def trailing_metadata(self): with self._state.condition: while self._state.trailing_metadata is None: self._state.condition.wait() - return _common.application_metadata(self._state.trailing_metadata) + return _common.to_application_metadata( + self._state.trailing_metadata) def code(self): with self._state.condition: @@ -473,7 +474,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None) operations = ( cygrpc.operation_send_initial_metadata( - _common.cygrpc_metadata(metadata), _EMPTY_FLAGS), + _common.to_cygrpc_metadata(metadata), _EMPTY_FLAGS), cygrpc.operation_send_message(serialized_request, _EMPTY_FLAGS), cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), @@ -563,7 +564,7 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): )), event_handler) operations = ( cygrpc.operation_send_initial_metadata( - _common.cygrpc_metadata(metadata), + _common.to_cygrpc_metadata(metadata), _EMPTY_FLAGS), cygrpc.operation_send_message( serialized_request, _EMPTY_FLAGS), cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), @@ -603,7 +604,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): None) operations = ( cygrpc.operation_send_initial_metadata( - _common.cygrpc_metadata(metadata), _EMPTY_FLAGS), + _common.to_cygrpc_metadata(metadata), _EMPTY_FLAGS), cygrpc.operation_receive_message(_EMPTY_FLAGS), cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) call_error = call.start_client_batch( @@ -657,7 +658,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): event_handler) operations = ( cygrpc.operation_send_initial_metadata( - _common.cygrpc_metadata(metadata), _EMPTY_FLAGS), + _common.to_cygrpc_metadata(metadata), _EMPTY_FLAGS), cygrpc.operation_receive_message(_EMPTY_FLAGS), cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) call_error = call.start_client_batch( @@ -700,7 +701,7 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): event_handler) operations = ( cygrpc.operation_send_initial_metadata( - _common.cygrpc_metadata(metadata), _EMPTY_FLAGS), + _common.to_cygrpc_metadata(metadata), _EMPTY_FLAGS), cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) call_error = call.start_client_batch( cygrpc.Operations(operations), event_handler) diff --git a/src/python/grpcio/grpc/_common.py b/src/python/grpcio/grpc/_common.py index 6879e1780b..2e369013f5 100644 --- a/src/python/grpcio/grpc/_common.py +++ b/src/python/grpcio/grpc/_common.py @@ -97,22 +97,22 @@ def decode(b): def channel_args(options): - channel_args = [] + cygrpc_args = [] for key, value in options: if isinstance(value, six.string_types): - channel_args.append(cygrpc.ChannelArg(encode(key), encode(value))) + cygrpc_args.append(cygrpc.ChannelArg(encode(key), encode(value))) else: - channel_args.append(cygrpc.ChannelArg(encode(key), value)) - return cygrpc.ChannelArgs(channel_args) + cygrpc_args.append(cygrpc.ChannelArg(encode(key), value)) + return cygrpc.ChannelArgs(cygrpc_args) -def cygrpc_metadata(application_metadata): +def to_cygrpc_metadata(application_metadata): return EMPTY_METADATA if application_metadata is None else cygrpc.Metadata( cygrpc.Metadatum(encode(key), encode(value)) for key, value in application_metadata) -def application_metadata(cygrpc_metadata): +def to_application_metadata(cygrpc_metadata): if cygrpc_metadata is None: return () else: diff --git a/src/python/grpcio/grpc/_plugin_wrapping.py b/src/python/grpcio/grpc/_plugin_wrapping.py index 69c46aa546..1e44561c97 100644 --- a/src/python/grpcio/grpc/_plugin_wrapping.py +++ b/src/python/grpcio/grpc/_plugin_wrapping.py @@ -66,9 +66,9 @@ class _WrappedCygrpcCallback(object): def _invoke_success(self, metadata): try: - cygrpc_metadata = _common.cygrpc_metadata(metadata) - except Exception as error: - self._invoke_failure(error) + cygrpc_metadata = _common.to_cygrpc_metadata(metadata) + except Exception as exception: # pylint: disable=broad-except + self._invoke_failure(exception) return self.cygrpc_callback(cygrpc_metadata, cygrpc.StatusCode.ok, b'') diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index b8e7ea17f7..84e096d4c0 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -142,14 +142,14 @@ def _abort(state, call, code, details): effective_details = details if state.details is None else state.details if state.initial_metadata_allowed: operations = (cygrpc.operation_send_initial_metadata( - _common.EMPTY_METADATA, _EMPTY_FLAGS), - cygrpc.operation_send_status_from_server( - _common.cygrpc_metadata(state.trailing_metadata), - effective_code, effective_details, _EMPTY_FLAGS),) + _common.EMPTY_METADATA, + _EMPTY_FLAGS), cygrpc.operation_send_status_from_server( + _common.to_cygrpc_metadata(state.trailing_metadata), + effective_code, effective_details, _EMPTY_FLAGS),) token = _SEND_INITIAL_METADATA_AND_SEND_STATUS_FROM_SERVER_TOKEN else: operations = (cygrpc.operation_send_status_from_server( - _common.cygrpc_metadata(state.trailing_metadata), + _common.to_cygrpc_metadata(state.trailing_metadata), effective_code, effective_details, _EMPTY_FLAGS),) token = _SEND_STATUS_FROM_SERVER_TOKEN call.start_server_batch( @@ -250,7 +250,7 @@ class _Context(grpc.ServicerContext): self._state.disable_next_compression = True def invocation_metadata(self): - return _common.application_metadata(self._rpc_event.request_metadata) + return _common.to_application_metadata(self._rpc_event.request_metadata) def peer(self): return _common.decode(self._rpc_event.operation_call.peer()) @@ -262,7 +262,8 @@ class _Context(grpc.ServicerContext): else: if self._state.initial_metadata_allowed: operation = cygrpc.operation_send_initial_metadata( - _common.cygrpc_metadata(initial_metadata), _EMPTY_FLAGS) + _common.to_cygrpc_metadata(initial_metadata), + _EMPTY_FLAGS) self._rpc_event.operation_call.start_server_batch( cygrpc.Operations((operation,)), _send_initial_metadata(self._state)) @@ -273,7 +274,7 @@ class _Context(grpc.ServicerContext): def set_trailing_metadata(self, trailing_metadata): with self._state.condition: - self._state.trailing_metadata = _common.cygrpc_metadata( + self._state.trailing_metadata = _common.to_cygrpc_metadata( trailing_metadata) def set_code(self, code): @@ -342,7 +343,7 @@ def _unary_request(rpc_event, state, request_deserializer): if state.client is _CANCELLED or state.statused: return None else: - start_server_batch_result = rpc_event.operation_call.start_server_batch( + rpc_event.operation_call.start_server_batch( cygrpc.Operations( (cygrpc.operation_receive_message(_EMPTY_FLAGS),)), _receive_message(state, rpc_event.operation_call, @@ -436,7 +437,8 @@ def _send_response(rpc_event, state, serialized_response): def _status(rpc_event, state, serialized_response): with state.condition: if state.client is not _CANCELLED: - trailing_metadata = _common.cygrpc_metadata(state.trailing_metadata) + trailing_metadata = _common.to_cygrpc_metadata( + state.trailing_metadata) code = _completion_code(state) details = _details(state) operations = [ diff --git a/src/python/grpcio/grpc/beta/_client_adaptations.py b/src/python/grpcio/grpc/beta/_client_adaptations.py index b53395e2a2..901cb85612 100644 --- a/src/python/grpcio/grpc/beta/_client_adaptations.py +++ b/src/python/grpcio/grpc/beta/_client_adaptations.py @@ -30,7 +30,6 @@ import grpc from grpc import _common -from grpc._cython import cygrpc from grpc.beta import interfaces from grpc.framework.common import cardinality from grpc.framework.foundation import future @@ -621,8 +620,8 @@ class _GenericStub(face.GenericStub): class _DynamicStub(face.DynamicStub): - def __init__(self, generic_stub, group, cardinalities): - self._generic_stub = generic_stub + def __init__(self, backing_generic_stub, group, cardinalities): + self._generic_stub = backing_generic_stub self._group = group self._cardinalities = cardinalities diff --git a/src/python/grpcio/grpc/beta/_connectivity_channel.py b/src/python/grpcio/grpc/beta/_connectivity_channel.py deleted file mode 100644 index bfb847f80a..0000000000 --- a/src/python/grpcio/grpc/beta/_connectivity_channel.py +++ /dev/null @@ -1,159 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -"""Affords a connectivity-state-listenable channel.""" - -import threading -import time - -from grpc._adapter import _low -from grpc._adapter import _types -from grpc.beta import interfaces -from grpc.framework.foundation import callable_util - -_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = ( - 'Exception calling channel subscription callback!') - -_LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY = { - state: connectivity - for state, connectivity in zip(_types.ConnectivityState, - interfaces.ChannelConnectivity) -} - - -class ConnectivityChannel(object): - - def __init__(self, low_channel): - self._lock = threading.Lock() - self._low_channel = low_channel - - self._polling = False - self._connectivity = None - self._try_to_connect = False - self._callbacks_and_connectivities = [] - self._delivering = False - - def _deliveries(self, connectivity): - callbacks_needing_update = [] - for callback_and_connectivity in self._callbacks_and_connectivities: - callback, callback_connectivity = callback_and_connectivity - if callback_connectivity is not connectivity: - callbacks_needing_update.append(callback) - callback_and_connectivity[1] = connectivity - return callbacks_needing_update - - def _deliver(self, initial_connectivity, initial_callbacks): - connectivity = initial_connectivity - callbacks = initial_callbacks - while True: - for callback in callbacks: - callable_util.call_logging_exceptions( - callback, _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE, - connectivity) - with self._lock: - callbacks = self._deliveries(self._connectivity) - if callbacks: - connectivity = self._connectivity - else: - self._delivering = False - return - - def _spawn_delivery(self, connectivity, callbacks): - delivering_thread = threading.Thread( - target=self._deliver, args=(connectivity, callbacks,)) - delivering_thread.start() - self._delivering = True - - # TODO(issue 3064): Don't poll. - def _poll_connectivity(self, low_channel, initial_try_to_connect): - try_to_connect = initial_try_to_connect - low_connectivity = low_channel.check_connectivity_state(try_to_connect) - with self._lock: - self._connectivity = _LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[ - low_connectivity] - callbacks = tuple( - callback - for callback, unused_but_known_to_be_none_connectivity in - self._callbacks_and_connectivities) - for callback_and_connectivity in self._callbacks_and_connectivities: - callback_and_connectivity[1] = self._connectivity - if callbacks: - self._spawn_delivery(self._connectivity, callbacks) - completion_queue = _low.CompletionQueue() - while True: - low_channel.watch_connectivity_state(low_connectivity, - time.time() + 0.2, - completion_queue, None) - event = completion_queue.next() - with self._lock: - if not self._callbacks_and_connectivities and not self._try_to_connect: - self._polling = False - self._connectivity = None - completion_queue.shutdown() - break - try_to_connect = self._try_to_connect - self._try_to_connect = False - if event.success or try_to_connect: - low_connectivity = low_channel.check_connectivity_state( - try_to_connect) - with self._lock: - self._connectivity = _LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[ - low_connectivity] - if not self._delivering: - callbacks = self._deliveries(self._connectivity) - if callbacks: - self._spawn_delivery(self._connectivity, callbacks) - - def subscribe(self, callback, try_to_connect): - with self._lock: - if not self._callbacks_and_connectivities and not self._polling: - polling_thread = threading.Thread( - target=self._poll_connectivity, - args=(self._low_channel, bool(try_to_connect))) - polling_thread.start() - self._polling = True - self._callbacks_and_connectivities.append([callback, None]) - elif not self._delivering and self._connectivity is not None: - self._spawn_delivery(self._connectivity, (callback,)) - self._try_to_connect |= bool(try_to_connect) - self._callbacks_and_connectivities.append( - [callback, self._connectivity]) - else: - self._try_to_connect |= bool(try_to_connect) - self._callbacks_and_connectivities.append([callback, None]) - - def unsubscribe(self, callback): - with self._lock: - for index, (subscribed_callback, unused_connectivity - ) in enumerate(self._callbacks_and_connectivities): - if callback == subscribed_callback: - self._callbacks_and_connectivities.pop(index) - break - - def low_channel(self): - return self._low_channel diff --git a/src/python/grpcio/grpc/beta/_server_adaptations.py b/src/python/grpcio/grpc/beta/_server_adaptations.py index 174af2d642..81348d5d87 100644 --- a/src/python/grpcio/grpc/beta/_server_adaptations.py +++ b/src/python/grpcio/grpc/beta/_server_adaptations.py @@ -78,7 +78,7 @@ class _FaceServicerContext(face.ServicerContext): return _ServerProtocolContext(self._servicer_context) def invocation_metadata(self): - return _common.cygrpc_metadata( + return _common.to_cygrpc_metadata( self._servicer_context.invocation_metadata()) def initial_metadata(self, initial_metadata): @@ -351,27 +351,27 @@ class _GenericRpcHandler(grpc.GenericRpcHandler): class _Server(interfaces.Server): - def __init__(self, server): - self._server = server + def __init__(self, grpc_server): + self._grpc_server = grpc_server def add_insecure_port(self, address): - return self._server.add_insecure_port(address) + return self._grpc_server.add_insecure_port(address) def add_secure_port(self, address, server_credentials): - return self._server.add_secure_port(address, server_credentials) + return self._grpc_server.add_secure_port(address, server_credentials) def start(self): - self._server.start() + self._grpc_server.start() def stop(self, grace): - return self._server.stop(grace) + return self._grpc_server.stop(grace) def __enter__(self): - self._server.start() + self._grpc_server.start() return self def __exit__(self, exc_type, exc_val, exc_tb): - self._server.stop(None) + self._grpc_server.stop(None) return False diff --git a/src/python/grpcio/grpc/beta/implementations.py b/src/python/grpcio/grpc/beta/implementations.py index 7093852278..0b79577689 100644 --- a/src/python/grpcio/grpc/beta/implementations.py +++ b/src/python/grpcio/grpc/beta/implementations.py @@ -29,16 +29,15 @@ """Entry points into the Beta API of gRPC Python.""" # threading is referenced from specification in this module. -import abc -import enum import threading # pylint: disable=unused-import -# cardinality and face are referenced from specification in this module. +# interfaces, cardinality, and face are referenced from specification in this +# module. import grpc from grpc import _auth from grpc.beta import _client_adaptations from grpc.beta import _server_adaptations -from grpc.beta import interfaces +from grpc.beta import interfaces # pylint: disable=unused-import from grpc.framework.common import cardinality # pylint: disable=unused-import from grpc.framework.interfaces.face import face # pylint: disable=unused-import @@ -218,7 +217,7 @@ def dynamic_stub(channel, service, cardinalities, options=None): Returns: A face.DynamicStub with which RPCs can be invoked. """ - effective_options = StubOptions() if options is None else options + effective_options = _EMPTY_STUB_OPTIONS if options is None else options return _client_adaptations.dynamic_stub( channel._channel, # pylint: disable=protected-access service, diff --git a/src/python/grpcio/grpc/framework/foundation/logging_pool.py b/src/python/grpcio/grpc/framework/foundation/logging_pool.py index 7ee37373fa..0912fba139 100644 --- a/src/python/grpcio/grpc/framework/foundation/logging_pool.py +++ b/src/python/grpcio/grpc/framework/foundation/logging_pool.py @@ -39,7 +39,7 @@ def _wrap(behavior): def _wrapping(*args, **kwargs): try: return behavior(*args, **kwargs) - except Exception as e: + except Exception: logging.exception( 'Unexpected exception from %s executed in logging pool!', behavior) diff --git a/src/python/grpcio_tests/tests/http2/negative_http2_client.py b/src/python/grpcio_tests/tests/http2/negative_http2_client.py index b8adf093a5..b184e62cfd 100644 --- a/src/python/grpcio_tests/tests/http2/negative_http2_client.py +++ b/src/python/grpcio_tests/tests/http2/negative_http2_client.py @@ -31,6 +31,7 @@ import argparse import grpc +import time from src.proto.grpc.testing import test_pb2 from src.proto.grpc.testing import messages_pb2 @@ -75,6 +76,7 @@ def _goaway(stub): first_response = stub.UnaryCall(_SIMPLE_REQUEST) _validate_payload_type_and_length(first_response, messages_pb2.COMPRESSABLE, _RESPONSE_SIZE) + time.sleep(1) second_response = stub.UnaryCall(_SIMPLE_REQUEST) _validate_payload_type_and_length(second_response, messages_pb2.COMPRESSABLE, _RESPONSE_SIZE) |