From 40750ef4846847648a964d62a6cd1c87eff2f959 Mon Sep 17 00:00:00 2001 From: Daniel Joos Date: Mon, 29 Oct 2018 17:26:20 +0100 Subject: Enable compilation with OpenSSL 1.1 API This adds the possibility to compile the core library against OpenSSL 1.1 without backwards-compatible/deprecated API enabled. It also adds missing include statements in `jwt_verifier.cc` that are required to build with OpenSSL 1.1 (as the `BN_` and `RSA_` APIs are used there). --- src/core/lib/security/credentials/jwt/jwt_verifier.cc | 2 ++ src/core/tsi/ssl_transport_security.cc | 12 ++++++++++++ 2 files changed, 14 insertions(+) (limited to 'src') diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.cc b/src/core/lib/security/credentials/jwt/jwt_verifier.cc index c7d1b36ff0..cdef0f322a 100644 --- a/src/core/lib/security/credentials/jwt/jwt_verifier.cc +++ b/src/core/lib/security/credentials/jwt/jwt_verifier.cc @@ -31,7 +31,9 @@ #include extern "C" { +#include #include +#include } #include "src/core/lib/gpr/string.h" diff --git a/src/core/tsi/ssl_transport_security.cc b/src/core/tsi/ssl_transport_security.cc index d6a72ada0d..9243d845cb 100644 --- a/src/core/tsi/ssl_transport_security.cc +++ b/src/core/tsi/ssl_transport_security.cc @@ -156,9 +156,13 @@ static unsigned long openssl_thread_id_cb(void) { #endif static void init_openssl(void) { +#if OPENSSL_API_COMPAT >= 0x10100000L + OPENSSL_init_ssl(0, NULL); +#else SSL_library_init(); SSL_load_error_strings(); OpenSSL_add_all_algorithms(); +#endif #if OPENSSL_VERSION_NUMBER < 0x10100000 if (!CRYPTO_get_locking_callback()) { int num_locks = CRYPTO_num_locks(); @@ -1649,7 +1653,11 @@ tsi_result tsi_create_ssl_client_handshaker_factory_with_options( return TSI_INVALID_ARGUMENT; } +#if defined(OPENSSL_NO_TLS1_2_METHOD) || OPENSSL_API_COMPAT >= 0x10100000L + ssl_context = SSL_CTX_new(TLS_method()); +#else ssl_context = SSL_CTX_new(TLSv1_2_method()); +#endif if (ssl_context == nullptr) { gpr_log(GPR_ERROR, "Could not create ssl context."); return TSI_INVALID_ARGUMENT; @@ -1806,7 +1814,11 @@ tsi_result tsi_create_ssl_server_handshaker_factory_with_options( for (i = 0; i < options->num_key_cert_pairs; i++) { do { +#if defined(OPENSSL_NO_TLS1_2_METHOD) || OPENSSL_API_COMPAT >= 0x10100000L + impl->ssl_contexts[i] = SSL_CTX_new(TLS_method()); +#else impl->ssl_contexts[i] = SSL_CTX_new(TLSv1_2_method()); +#endif if (impl->ssl_contexts[i] == nullptr) { gpr_log(GPR_ERROR, "Could not create ssl context."); result = TSI_OUT_OF_RESOURCES; -- cgit v1.2.3 From de0249ecaf362ed04f1351c57a7c7be9be79bdf8 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Thu, 29 Nov 2018 16:08:52 -0800 Subject: Fallback instead of failing for cases where are not able to set the socket options --- src/core/lib/iomgr/tcp_posix.cc | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) (limited to 'src') diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 606bfce6e7..6f66551a7e 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -589,7 +589,7 @@ ssize_t tcp_send(int fd, const struct msghdr* msg) { */ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, size_t sending_length, - ssize_t* sent_length, grpc_error** error); + ssize_t* sent_length); /** The callback function to be invoked when we get an error on the socket. */ static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error); @@ -597,13 +597,11 @@ static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error); #ifdef GRPC_LINUX_ERRQUEUE static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, size_t sending_length, - ssize_t* sent_length, - grpc_error** error) { + ssize_t* sent_length) { if (!tcp->socket_ts_enabled) { uint32_t opt = grpc_core::kTimestampingSocketOptions; if (setsockopt(tcp->fd, SOL_SOCKET, SO_TIMESTAMPING, static_cast(&opt), sizeof(opt)) != 0) { - *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "setsockopt"), tcp); grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer); if (grpc_tcp_trace.enabled()) { gpr_log(GPR_ERROR, "Failed to set timestamping options on the socket."); @@ -781,8 +779,7 @@ static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) { #else /* GRPC_LINUX_ERRQUEUE */ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, size_t sending_length, - ssize_t* sent_length, - grpc_error** error) { + ssize_t* sent_length) { gpr_log(GPR_ERROR, "Write with timestamps not supported for this platform"); GPR_ASSERT(0); return false; @@ -801,7 +798,7 @@ void tcp_shutdown_buffer_list(grpc_tcp* tcp) { gpr_mu_lock(&tcp->tb_mu); grpc_core::TracedBuffer::Shutdown( &tcp->tb_head, tcp->outgoing_buffer_arg, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("endpoint destroyed")); + GRPC_ERROR_CREATE_FROM_STATIC_STRING("TracedBuffer list shutdown")); gpr_mu_unlock(&tcp->tb_mu); tcp->outgoing_buffer_arg = nullptr; } @@ -852,13 +849,17 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) { msg.msg_iov = iov; msg.msg_iovlen = iov_size; msg.msg_flags = 0; + bool tried_sending_message = false; if (tcp->outgoing_buffer_arg != nullptr) { - if (!tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length, - error)) { + if (!tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length)) { + /* We could not set socket options to collect Fathom timestamps. + * Fallback on writing without timestamps. */ tcp_shutdown_buffer_list(tcp); - return true; /* something went wrong with timestamps */ + } else { + tried_sending_message = true; } - } else { + } + if (!tried_sending_message) { msg.msg_control = nullptr; msg.msg_controllen = 0; -- cgit v1.2.3 From 95f71d8d7fb25c4ca948b0ac58f00c565c95e27e Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Thu, 29 Nov 2018 17:36:00 -0800 Subject: Cache result of failing to set timestamping options --- src/core/lib/iomgr/tcp_posix.cc | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 6f66551a7e..90827033bd 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -126,6 +126,7 @@ struct grpc_tcp { int bytes_counter; bool socket_ts_enabled; /* True if timestamping options are set on the socket */ + bool ts_capable; /* Cache whether we can set timestamping options */ gpr_atm stop_error_notification; /* Set to 1 if we do not want to be notified on errors anymore */ @@ -851,9 +852,11 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) { msg.msg_flags = 0; bool tried_sending_message = false; if (tcp->outgoing_buffer_arg != nullptr) { - if (!tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length)) { + if (!tcp->ts_capable || + !tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length)) { /* We could not set socket options to collect Fathom timestamps. * Fallback on writing without timestamps. */ + tcp->ts_capable = false; tcp_shutdown_buffer_list(tcp); } else { tried_sending_message = true; @@ -1115,6 +1118,7 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd, tcp->is_first_read = true; tcp->bytes_counter = -1; tcp->socket_ts_enabled = false; + tcp->ts_capable = true; tcp->outgoing_buffer_arg = nullptr; /* paired with unref in grpc_tcp_destroy */ gpr_ref_init(&tcp->refcount, 1); -- cgit v1.2.3 From 8cb2d0546d14574977c4943d33bb103954078bbd Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Mon, 10 Dec 2018 16:19:35 -0800 Subject: Upgrade sanity Docker image to debian:stretch * Use latest pylint in Python 3.7 (they dropped support for PY2) * Make latest pylint happy * Forced to upgrade to shellcheck 0.4.4 * Make shellcheck 0.4.4 happy * Adopt reviewers' advice to reduce global disabled rules --- .pylintrc | 10 ++++++ .pylintrc-tests | 6 ++++ src/python/grpcio/grpc/_auth.py | 2 +- src/python/grpcio/grpc/_channel.py | 14 ++++---- src/python/grpcio/grpc/_utilities.py | 3 -- .../grpc_testing/_server/_handler.py | 2 +- src/python/grpcio_tests/tests/_runner.py | 2 +- .../tests/protoc_plugin/_split_definitions_test.py | 8 ++--- .../tests/protoc_plugin/beta_python_plugin_test.py | 2 +- .../grpcio_tests/tests/qps/benchmark_client.py | 2 +- src/python/grpcio_tests/tests/qps/client_runner.py | 2 +- src/python/grpcio_tests/tests/qps/worker_server.py | 2 +- src/python/grpcio_tests/tests/stress/client.py | 4 +-- .../tests/testing/_client_application.py | 4 +-- .../tests/unit/_from_grpc_import_star.py | 2 +- .../dockerfile/test/sanity/Dockerfile.template | 22 +++++-------- tools/distrib/pylint_code.sh | 6 ++-- tools/dockerfile/test/sanity/Dockerfile | 38 ++++++++++------------ tools/run_tests/artifacts/build_artifact_protoc.sh | 1 + tools/run_tests/artifacts/build_artifact_ruby.sh | 2 ++ tools/run_tests/artifacts/run_in_workspace.sh | 3 +- tools/run_tests/dockerize/build_and_run_docker.sh | 2 +- .../dockerize/build_docker_and_run_tests.sh | 4 +-- tools/run_tests/dockerize/build_interop_image.sh | 2 +- tools/run_tests/dockerize/docker_run_tests.sh | 1 + tools/run_tests/helper_scripts/run_grpc-node.sh | 2 ++ .../helper_scripts/run_tests_in_workspace.sh | 3 +- tools/run_tests/interop/with_nvm.sh | 1 + tools/run_tests/interop/with_rvm.sh | 1 + tools/run_tests/performance/build_performance.sh | 1 + .../run_tests/performance/build_performance_go.sh | 3 +- .../performance/build_performance_node.sh | 1 + tools/run_tests/performance/run_worker_go.sh | 3 +- tools/run_tests/performance/run_worker_node.sh | 1 + tools/run_tests/performance/run_worker_php.sh | 1 + tools/run_tests/performance/run_worker_ruby.sh | 1 + 36 files changed, 94 insertions(+), 70 deletions(-) (limited to 'src') diff --git a/.pylintrc b/.pylintrc index 90e8989ffc..ba74decb04 100644 --- a/.pylintrc +++ b/.pylintrc @@ -1,3 +1,11 @@ +[MASTER] +ignore= + src/python/grpcio/grpc/beta, + src/python/grpcio/grpc/framework, + src/python/grpcio/grpc/framework/common, + src/python/grpcio/grpc/framework/foundation, + src/python/grpcio/grpc/framework/interfaces, + [VARIABLES] # TODO(https://github.com/PyCQA/pylint/issues/1345): How does the inspection @@ -82,3 +90,5 @@ disable= # if:/else: and for:/else:. useless-else-on-loop, no-else-return, + # NOTE(lidiz): Python 3 make object inheritance default, but not PY2 + useless-object-inheritance, diff --git a/.pylintrc-tests b/.pylintrc-tests index e68755c674..0d408fbb56 100644 --- a/.pylintrc-tests +++ b/.pylintrc-tests @@ -1,3 +1,7 @@ +[MASTER] +ignore-patterns= + .+?(beta|framework).+?\.py + [VARIABLES] # TODO(https://github.com/PyCQA/pylint/issues/1345): How does the inspection @@ -115,3 +119,5 @@ disable= # if:/else: and for:/else:. useless-else-on-loop, no-else-return, + # NOTE(lidiz): Python 3 make object inheritance default, but not PY2 + useless-object-inheritance, diff --git a/src/python/grpcio/grpc/_auth.py b/src/python/grpcio/grpc/_auth.py index c17824563d..9b990f490d 100644 --- a/src/python/grpcio/grpc/_auth.py +++ b/src/python/grpcio/grpc/_auth.py @@ -46,7 +46,7 @@ class GoogleCallCredentials(grpc.AuthMetadataPlugin): # Hack to determine if these are JWT creds and we need to pass # additional_claims when getting a token - self._is_jwt = 'additional_claims' in inspect.getargspec( + self._is_jwt = 'additional_claims' in inspect.getargspec( # pylint: disable=deprecated-method credentials.get_access_token).args def __call__(self, context, callback): diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index 35fa82d56b..951c6f33ff 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -525,7 +525,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): state, operations, deadline, rendezvous = self._prepare( request, timeout, metadata, wait_for_ready) if state is None: - raise rendezvous + raise rendezvous # pylint: disable-msg=raising-bad-type else: call = self._channel.segregated_call( 0, self._method, None, deadline, metadata, None @@ -535,7 +535,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): ),)) event = call.next_event() _handle_event(event, state, self._response_deserializer) - return state, call, + return state, call def __call__(self, request, @@ -566,7 +566,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): state, operations, deadline, rendezvous = self._prepare( request, timeout, metadata, wait_for_ready) if state is None: - raise rendezvous + raise rendezvous # pylint: disable-msg=raising-bad-type else: event_handler = _event_handler(state, self._response_deserializer) call = self._managed_call( @@ -599,7 +599,7 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( wait_for_ready) if serialized_request is None: - raise rendezvous + raise rendezvous # pylint: disable-msg=raising-bad-type else: state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) operationses = ( @@ -653,7 +653,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): state.condition.notify_all() if not state.due: break - return state, call, + return state, call def __call__(self, request_iterator, @@ -745,10 +745,10 @@ class _InitialMetadataFlags(int): def with_wait_for_ready(self, wait_for_ready): if wait_for_ready is not None: if wait_for_ready: - self = self.__class__(self | cygrpc.InitialMetadataFlags.wait_for_ready | \ + return self.__class__(self | cygrpc.InitialMetadataFlags.wait_for_ready | \ cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set) elif not wait_for_ready: - self = self.__class__(self & ~cygrpc.InitialMetadataFlags.wait_for_ready | \ + return self.__class__(self & ~cygrpc.InitialMetadataFlags.wait_for_ready | \ cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set) return self diff --git a/src/python/grpcio/grpc/_utilities.py b/src/python/grpcio/grpc/_utilities.py index d90b34bcbd..2938a38b44 100644 --- a/src/python/grpcio/grpc/_utilities.py +++ b/src/python/grpcio/grpc/_utilities.py @@ -132,15 +132,12 @@ class _ChannelReadyFuture(grpc.Future): def result(self, timeout=None): self._block(timeout) - return None def exception(self, timeout=None): self._block(timeout) - return None def traceback(self, timeout=None): self._block(timeout) - return None def add_done_callback(self, fn): with self._condition: diff --git a/src/python/grpcio_testing/grpc_testing/_server/_handler.py b/src/python/grpcio_testing/grpc_testing/_server/_handler.py index 0e3404b0d0..100d8195f6 100644 --- a/src/python/grpcio_testing/grpc_testing/_server/_handler.py +++ b/src/python/grpcio_testing/grpc_testing/_server/_handler.py @@ -185,7 +185,7 @@ class _Handler(Handler): elif self._code is None: self._condition.wait() else: - return self._trailing_metadata, self._code, self._details, + return self._trailing_metadata, self._code, self._details def expire(self): with self._condition: diff --git a/src/python/grpcio_tests/tests/_runner.py b/src/python/grpcio_tests/tests/_runner.py index eaaa027e61..9ef0f17684 100644 --- a/src/python/grpcio_tests/tests/_runner.py +++ b/src/python/grpcio_tests/tests/_runner.py @@ -203,7 +203,7 @@ class Runner(object): check_kill_self() time.sleep(0) case_thread.join() - except: + except: # pylint: disable=try-except-raise # re-raise the exception after forcing the with-block to end raise result.set_output(augmented_case.case, stdout_pipe.output(), diff --git a/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py b/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py index e21ea0010a..2b735526cb 100644 --- a/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py +++ b/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py @@ -144,7 +144,7 @@ class _ProtoBeforeGrpcProtocStyle(object): absolute_proto_file_names) pb2_grpc_protoc_exit_code = _protoc( proto_path, None, 'grpc_2_0', python_out, absolute_proto_file_names) - return pb2_protoc_exit_code, pb2_grpc_protoc_exit_code, + return pb2_protoc_exit_code, pb2_grpc_protoc_exit_code class _GrpcBeforeProtoProtocStyle(object): @@ -160,7 +160,7 @@ class _GrpcBeforeProtoProtocStyle(object): proto_path, None, 'grpc_2_0', python_out, absolute_proto_file_names) pb2_protoc_exit_code = _protoc(proto_path, python_out, None, None, absolute_proto_file_names) - return pb2_grpc_protoc_exit_code, pb2_protoc_exit_code, + return pb2_grpc_protoc_exit_code, pb2_protoc_exit_code _PROTOC_STYLES = ( @@ -243,9 +243,9 @@ class _Test(six.with_metaclass(abc.ABCMeta, unittest.TestCase)): def _services_modules(self): if self.PROTOC_STYLE.grpc_in_pb2_expected(): - return self._services_pb2, self._services_pb2_grpc, + return self._services_pb2, self._services_pb2_grpc else: - return self._services_pb2_grpc, + return (self._services_pb2_grpc,) def test_imported_attributes(self): self._protoc() diff --git a/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py b/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py index b46e53315e..43c90af6a7 100644 --- a/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py +++ b/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py @@ -223,7 +223,7 @@ def _CreateService(payload_pb2, responses_pb2, service_pb2): server.start() channel = implementations.insecure_channel('localhost', port) stub = getattr(service_pb2, STUB_FACTORY_IDENTIFIER)(channel) - yield servicer_methods, stub, + yield servicer_methods, stub server.stop(0) diff --git a/src/python/grpcio_tests/tests/qps/benchmark_client.py b/src/python/grpcio_tests/tests/qps/benchmark_client.py index 0488450740..fac0e44e5a 100644 --- a/src/python/grpcio_tests/tests/qps/benchmark_client.py +++ b/src/python/grpcio_tests/tests/qps/benchmark_client.py @@ -180,7 +180,7 @@ class StreamingSyncBenchmarkClient(BenchmarkClient): self._streams = [ _SyncStream(self._stub, self._generic, self._request, self._handle_response) - for _ in xrange(config.outstanding_rpcs_per_channel) + for _ in range(config.outstanding_rpcs_per_channel) ] self._curr_stream = 0 diff --git a/src/python/grpcio_tests/tests/qps/client_runner.py b/src/python/grpcio_tests/tests/qps/client_runner.py index e79abab3c7..a57524c74e 100644 --- a/src/python/grpcio_tests/tests/qps/client_runner.py +++ b/src/python/grpcio_tests/tests/qps/client_runner.py @@ -77,7 +77,7 @@ class ClosedLoopClientRunner(ClientRunner): def start(self): self._is_running = True self._client.start() - for _ in xrange(self._request_count): + for _ in range(self._request_count): self._client.send_request() def stop(self): diff --git a/src/python/grpcio_tests/tests/qps/worker_server.py b/src/python/grpcio_tests/tests/qps/worker_server.py index 337a94b546..a03367ec63 100644 --- a/src/python/grpcio_tests/tests/qps/worker_server.py +++ b/src/python/grpcio_tests/tests/qps/worker_server.py @@ -109,7 +109,7 @@ class WorkerServer(worker_service_pb2_grpc.WorkerServiceServicer): start_time = time.time() # Create a client for each channel - for i in xrange(config.client_channels): + for i in range(config.client_channels): server = config.server_targets[i % len(config.server_targets)] runner = self._create_client_runner(server, config, qps_data) client_runners.append(runner) diff --git a/src/python/grpcio_tests/tests/stress/client.py b/src/python/grpcio_tests/tests/stress/client.py index 41f2e1b6c2..a318b308e6 100644 --- a/src/python/grpcio_tests/tests/stress/client.py +++ b/src/python/grpcio_tests/tests/stress/client.py @@ -132,9 +132,9 @@ def run_test(args): server.start() for test_server_target in test_server_targets: - for _ in xrange(args.num_channels_per_server): + for _ in range(args.num_channels_per_server): channel = _get_channel(test_server_target, args) - for _ in xrange(args.num_stubs_per_channel): + for _ in range(args.num_stubs_per_channel): stub = test_pb2_grpc.TestServiceStub(channel) runner = test_runner.TestRunner(stub, test_cases, hist, exception_queue, stop_event) diff --git a/src/python/grpcio_tests/tests/testing/_client_application.py b/src/python/grpcio_tests/tests/testing/_client_application.py index 3ddeba2373..4d42df0389 100644 --- a/src/python/grpcio_tests/tests/testing/_client_application.py +++ b/src/python/grpcio_tests/tests/testing/_client_application.py @@ -130,9 +130,9 @@ def _run_stream_stream(stub): request_pipe = _Pipe() response_iterator = stub.StreStre(iter(request_pipe)) request_pipe.add(_application_common.STREAM_STREAM_REQUEST) - first_responses = next(response_iterator), next(response_iterator), + first_responses = next(response_iterator), next(response_iterator) request_pipe.add(_application_common.STREAM_STREAM_REQUEST) - second_responses = next(response_iterator), next(response_iterator), + second_responses = next(response_iterator), next(response_iterator) request_pipe.close() try: next(response_iterator) diff --git a/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py b/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py index ad847ae03e..1ada25382d 100644 --- a/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py +++ b/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py @@ -14,7 +14,7 @@ _BEFORE_IMPORT = tuple(globals()) -from grpc import * # pylint: disable=wildcard-import +from grpc import * # pylint: disable=wildcard-import,unused-wildcard-import _AFTER_IMPORT = tuple(globals()) diff --git a/templates/tools/dockerfile/test/sanity/Dockerfile.template b/templates/tools/dockerfile/test/sanity/Dockerfile.template index eac7f2ab01..a4f9183bea 100644 --- a/templates/tools/dockerfile/test/sanity/Dockerfile.template +++ b/templates/tools/dockerfile/test/sanity/Dockerfile.template @@ -14,28 +14,24 @@ # See the License for the specific language governing permissions and # limitations under the License. - FROM debian:jessie - - <%include file="../../apt_get_basic.include"/> - <%include file="../../gcp_api_libraries.include"/> - <%include file="../../python_deps.include"/> + <%include file="../../python_stretch.include"/> <%include file="../../cxx_deps.include"/> #======================== # Sanity test dependencies + RUN apt-get update && apt-get -t testing install -y python3.7 python3-all-dev + RUN curl https://bootstrap.pypa.io/get-pip.py | python3.7 + # Make Python 3.7 the default Python 3 version + RUN update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.7 1 RUN apt-get update && apt-get install -y ${"\\"} - python-pip ${"\\"} autoconf ${"\\"} automake ${"\\"} libtool ${"\\"} curl ${"\\"} - python-virtualenv ${"\\"} - python-lxml ${"\\"} shellcheck - RUN pip install simplejson mako - + RUN python2 -m pip install simplejson mako virtualenv lxml + RUN python3 -m pip install simplejson mako virtualenv lxml + <%include file="../../clang5.include"/> - <%include file="../../run_tests_addons.include"/> - + # Define the default command. CMD ["bash"] - diff --git a/tools/distrib/pylint_code.sh b/tools/distrib/pylint_code.sh index d17eb9fdb8..cb3966d691 100755 --- a/tools/distrib/pylint_code.sh +++ b/tools/distrib/pylint_code.sh @@ -31,12 +31,12 @@ TEST_DIRS=( ) VIRTUALENV=python_pylint_venv -python -m virtualenv $VIRTUALENV +python3 -m virtualenv $VIRTUALENV PYTHON=$VIRTUALENV/bin/python -$PYTHON -m pip install --upgrade pip==10.0.1 -$PYTHON -m pip install pylint==1.9.2 +$PYTHON -m pip install --upgrade pip==18.1 +$PYTHON -m pip install --upgrade pylint==2.2.2 EXIT=0 for dir in "${DIRS[@]}"; do diff --git a/tools/dockerfile/test/sanity/Dockerfile b/tools/dockerfile/test/sanity/Dockerfile index e6bdb4ee03..aeee02a50f 100644 --- a/tools/dockerfile/test/sanity/Dockerfile +++ b/tools/dockerfile/test/sanity/Dockerfile @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -FROM debian:jessie - +FROM debian:stretch + # Install Git and basic packages. RUN apt-get update && apt-get install -y \ autoconf \ @@ -53,20 +53,19 @@ RUN apt-get update && apt-get install -y time && apt-get clean RUN apt-get update && apt-get install -y python-pip && apt-get clean RUN pip install --upgrade google-api-python-client oauth2client -#==================== -# Python dependencies +# Install Python 2.7 +RUN apt-get update && apt-get install -y python2.7 python-all-dev +RUN curl https://bootstrap.pypa.io/get-pip.py | python2.7 -# Install dependencies +# Add Debian 'testing' repository +RUN echo 'deb http://ftp.de.debian.org/debian testing main' >> /etc/apt/sources.list +RUN echo 'APT::Default-Release "stable";' | tee -a /etc/apt/apt.conf.d/00local -RUN apt-get update && apt-get install -y \ - python-all-dev \ - python3-all-dev \ - python-pip -# Install Python packages from PyPI -RUN pip install --upgrade pip==10.0.1 -RUN pip install virtualenv -RUN pip install futures==2.2.0 enum34==1.0.4 protobuf==3.5.2.post1 six==1.10.0 twisted==17.5.0 +RUN mkdir /var/local/jenkins + +# Define the default command. +CMD ["bash"] #================= # C++ dependencies @@ -74,16 +73,18 @@ RUN apt-get update && apt-get -y install libgflags-dev libgtest-dev libc++-dev c #======================== # Sanity test dependencies +RUN apt-get update && apt-get -t testing install -y python3.7 python3-all-dev +RUN curl https://bootstrap.pypa.io/get-pip.py | python3.7 +# Make Python 3.7 the default Python 3 version +RUN update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.7 1 RUN apt-get update && apt-get install -y \ - python-pip \ autoconf \ automake \ libtool \ curl \ - python-virtualenv \ - python-lxml \ shellcheck -RUN pip install simplejson mako +RUN python2 -m pip install simplejson mako virtualenv lxml +RUN python3 -m pip install simplejson mako virtualenv lxml RUN apt-get update && apt-get -y install wget xz-utils RUN wget http://releases.llvm.org/5.0.0/clang+llvm-5.0.0-linux-x86_64-ubuntu14.04.tar.xz @@ -94,8 +95,5 @@ RUN ln -s /clang+llvm-5.0.0-linux-x86_64-ubuntu14.04/bin/clang-tidy /usr/local/b ENV CLANG_TIDY=clang-tidy -RUN mkdir /var/local/jenkins - - # Define the default command. CMD ["bash"] diff --git a/tools/run_tests/artifacts/build_artifact_protoc.sh b/tools/run_tests/artifacts/build_artifact_protoc.sh index 6d433f2dad..a5b6e2f348 100755 --- a/tools/run_tests/artifacts/build_artifact_protoc.sh +++ b/tools/run_tests/artifacts/build_artifact_protoc.sh @@ -14,6 +14,7 @@ # limitations under the License. # Use devtoolset environment that has GCC 4.8 before set -ex +# shellcheck disable=SC1091 source scl_source enable devtoolset-2 set -ex diff --git a/tools/run_tests/artifacts/build_artifact_ruby.sh b/tools/run_tests/artifacts/build_artifact_ruby.sh index 5ab4cf21b4..c910374376 100755 --- a/tools/run_tests/artifacts/build_artifact_ruby.sh +++ b/tools/run_tests/artifacts/build_artifact_ruby.sh @@ -18,7 +18,9 @@ SYSTEM=$(uname | cut -f 1 -d_) cd "$(dirname "$0")/../../.." set +ex +# shellcheck disable=SC1091 [[ -s /etc/profile.d/rvm.sh ]] && . /etc/profile.d/rvm.sh +# shellcheck disable=SC1090 [[ -s "$HOME/.rvm/scripts/rvm" ]] && source "$HOME/.rvm/scripts/rvm" set -ex diff --git a/tools/run_tests/artifacts/run_in_workspace.sh b/tools/run_tests/artifacts/run_in_workspace.sh index 20181e077c..f4719b0a4a 100755 --- a/tools/run_tests/artifacts/run_in_workspace.sh +++ b/tools/run_tests/artifacts/run_in_workspace.sh @@ -19,7 +19,8 @@ set -ex cd "$(dirname "$0")/../../.." -export repo_root=$(pwd) +repo_root=$(pwd) +export repo_root # TODO: fix file to pass shellcheck diff --git a/tools/run_tests/dockerize/build_and_run_docker.sh b/tools/run_tests/dockerize/build_and_run_docker.sh index 3f01fbc7b7..9d0efc8b40 100755 --- a/tools/run_tests/dockerize/build_and_run_docker.sh +++ b/tools/run_tests/dockerize/build_and_run_docker.sh @@ -20,7 +20,7 @@ set -ex cd "$(dirname "$0")/../../.." git_root=$(pwd) -cd - +cd - # shellcheck disable=SC2103 # Inputs # DOCKERFILE_DIR - Directory in which Dockerfile file is located. diff --git a/tools/run_tests/dockerize/build_docker_and_run_tests.sh b/tools/run_tests/dockerize/build_docker_and_run_tests.sh index 1741b3268b..a63e1be96d 100755 --- a/tools/run_tests/dockerize/build_docker_and_run_tests.sh +++ b/tools/run_tests/dockerize/build_docker_and_run_tests.sh @@ -20,7 +20,7 @@ set -ex cd "$(dirname "$0")/../../.." git_root=$(pwd) -cd - +cd - # shellcheck disable=SC2103 # Inputs # DOCKERFILE_DIR - Directory in which Dockerfile file is located. @@ -48,7 +48,7 @@ docker_instance_git_root=/var/local/jenkins/grpc # Run tests inside docker DOCKER_EXIT_CODE=0 # TODO: silence complaint about $TTY_FLAG expansion in some other way -# shellcheck disable=SC2086 +# shellcheck disable=SC2086,SC2154 docker run \ --cap-add SYS_PTRACE \ -e "RUN_TESTS_COMMAND=$RUN_TESTS_COMMAND" \ diff --git a/tools/run_tests/dockerize/build_interop_image.sh b/tools/run_tests/dockerize/build_interop_image.sh index 126dd4065e..0f88787639 100755 --- a/tools/run_tests/dockerize/build_interop_image.sh +++ b/tools/run_tests/dockerize/build_interop_image.sh @@ -16,7 +16,7 @@ # This script is invoked by run_interop_tests.py to build the docker image # for interop testing. You should never need to call this script on your own. -set -x +set -ex # Params: # INTEROP_IMAGE - name of tag of the final interop image diff --git a/tools/run_tests/dockerize/docker_run_tests.sh b/tools/run_tests/dockerize/docker_run_tests.sh index b7686e48ba..5214938208 100755 --- a/tools/run_tests/dockerize/docker_run_tests.sh +++ b/tools/run_tests/dockerize/docker_run_tests.sh @@ -18,6 +18,7 @@ set -e +# shellcheck disable=SC2154 export CONFIG=$config export ASAN_SYMBOLIZER_PATH=/usr/bin/llvm-symbolizer export PATH=$PATH:/usr/bin/llvm-symbolizer diff --git a/tools/run_tests/helper_scripts/run_grpc-node.sh b/tools/run_tests/helper_scripts/run_grpc-node.sh index 747aae7fd5..d14753a4d5 100755 --- a/tools/run_tests/helper_scripts/run_grpc-node.sh +++ b/tools/run_tests/helper_scripts/run_grpc-node.sh @@ -16,6 +16,8 @@ # This script runs grpc/grpc-node tests with their grpc submodule updated # to this reference +set -ex + # cd to gRPC root directory cd "$(dirname "$0")/../../.." diff --git a/tools/run_tests/helper_scripts/run_tests_in_workspace.sh b/tools/run_tests/helper_scripts/run_tests_in_workspace.sh index 790c041881..fa7a7aac0a 100755 --- a/tools/run_tests/helper_scripts/run_tests_in_workspace.sh +++ b/tools/run_tests/helper_scripts/run_tests_in_workspace.sh @@ -20,7 +20,8 @@ set -ex cd "$(dirname "$0")/../../.." -export repo_root="$(pwd)" +repo_root="$(pwd)" +export repo_root rm -rf "${WORKSPACE_NAME}" git clone . "${WORKSPACE_NAME}" diff --git a/tools/run_tests/interop/with_nvm.sh b/tools/run_tests/interop/with_nvm.sh index 887f9f6a9f..55f4b2b875 100755 --- a/tools/run_tests/interop/with_nvm.sh +++ b/tools/run_tests/interop/with_nvm.sh @@ -15,5 +15,6 @@ # limitations under the License. # Makes sure NVM is loaded before executing the command passed as an argument +# shellcheck disable=SC1090 source ~/.nvm/nvm.sh "$@" diff --git a/tools/run_tests/interop/with_rvm.sh b/tools/run_tests/interop/with_rvm.sh index 41e6efcb56..82bce9da12 100755 --- a/tools/run_tests/interop/with_rvm.sh +++ b/tools/run_tests/interop/with_rvm.sh @@ -15,5 +15,6 @@ # limitations under the License. # Makes sure RVM is loaded before executing the command passed as an argument +# shellcheck disable=SC1091 source /usr/local/rvm/scripts/rvm "$@" diff --git a/tools/run_tests/performance/build_performance.sh b/tools/run_tests/performance/build_performance.sh index ab6bffdc34..6c22e2d37d 100755 --- a/tools/run_tests/performance/build_performance.sh +++ b/tools/run_tests/performance/build_performance.sh @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +# shellcheck disable=SC1090 source ~/.rvm/scripts/rvm set -ex diff --git a/tools/run_tests/performance/build_performance_go.sh b/tools/run_tests/performance/build_performance_go.sh index 812728d4ce..3aa203a6ee 100755 --- a/tools/run_tests/performance/build_performance_go.sh +++ b/tools/run_tests/performance/build_performance_go.sh @@ -17,7 +17,8 @@ set -ex cd "$(dirname "$0")/../../.." -export GOPATH=$(pwd)/../gopath +GOPATH=$(pwd)/../gopath +export GOPATH # Get grpc-go and the dependencies but get rid of the upstream/master version go get google.golang.org/grpc diff --git a/tools/run_tests/performance/build_performance_node.sh b/tools/run_tests/performance/build_performance_node.sh index 12e0872264..b5b5d9a1a2 100755 --- a/tools/run_tests/performance/build_performance_node.sh +++ b/tools/run_tests/performance/build_performance_node.sh @@ -15,6 +15,7 @@ set +ex +# shellcheck disable=SC1090 . "$HOME/.nvm/nvm.sh" nvm install 10 diff --git a/tools/run_tests/performance/run_worker_go.sh b/tools/run_tests/performance/run_worker_go.sh index f8e821a265..1127f4f25a 100755 --- a/tools/run_tests/performance/run_worker_go.sh +++ b/tools/run_tests/performance/run_worker_go.sh @@ -17,6 +17,7 @@ set -ex cd "$(dirname "$0")/../../.." -export GOPATH=$(pwd)/../gopath +GOPATH=$(pwd)/../gopath +export GOPATH "${GOPATH}/bin/worker" "$@" diff --git a/tools/run_tests/performance/run_worker_node.sh b/tools/run_tests/performance/run_worker_node.sh index 3e5dd18edb..658cd50811 100755 --- a/tools/run_tests/performance/run_worker_node.sh +++ b/tools/run_tests/performance/run_worker_node.sh @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +# shellcheck disable=SC1090 . "$HOME/.nvm/nvm.sh" nvm use 10 diff --git a/tools/run_tests/performance/run_worker_php.sh b/tools/run_tests/performance/run_worker_php.sh index 2fe2493e60..97292a9978 100755 --- a/tools/run_tests/performance/run_worker_php.sh +++ b/tools/run_tests/performance/run_worker_php.sh @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +# shellcheck disable=SC1090 source ~/.rvm/scripts/rvm set -ex diff --git a/tools/run_tests/performance/run_worker_ruby.sh b/tools/run_tests/performance/run_worker_ruby.sh index 729c5cec97..ed44846336 100755 --- a/tools/run_tests/performance/run_worker_ruby.sh +++ b/tools/run_tests/performance/run_worker_ruby.sh @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +# shellcheck disable=SC1090 source ~/.rvm/scripts/rvm set -ex -- cgit v1.2.3 From e8d6d4785410154a64554651898807063ce41e20 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Wed, 12 Dec 2018 17:58:52 -0800 Subject: Update README for #16821 --- src/objective-c/README.md | 9 +++++++++ 1 file changed, 9 insertions(+) (limited to 'src') diff --git a/src/objective-c/README.md b/src/objective-c/README.md index 32e3956a1e..83775f86e1 100644 --- a/src/objective-c/README.md +++ b/src/objective-c/README.md @@ -242,3 +242,12 @@ pod `gRPC-Core`, :podspec => "." # assuming gRPC-Core.podspec is in the same dir These steps should allow gRPC to use OpenSSL and drop BoringSSL dependency. If you see any issue, file an issue to us. + +## Upgrade issue with BoringSSL +If you were using an old version of gRPC (<= v1.14) which depended on pod `BoringSSL` rather than +`BoringSSL-GRPC` and meet issue with the library like: +``` +ld: framework not found openssl +``` +updating `-framework openssl` in Other Linker Flags to `-framework openssl_grpc` in your project +may resolve this issue (see [#16821](https://github.com/grpc/grpc/issues/16821)). -- cgit v1.2.3 From 191e8f1c9cbf0080910e5c43a0997f90a5f5bd0f Mon Sep 17 00:00:00 2001 From: kkm Date: Fri, 7 Dec 2018 00:29:32 -0800 Subject: Use x64 tools on 64-bit Windows in Grpc.Tools The Windows Nano Server Docker image does not support 32-bit executables. See: https://docs.microsoft.com/windows-server/get-started/getting-started-with-nano-server#important-differences-in-nano-server See: https://github.com/grpc/grpc/pull/13207#issuecomment-444846082 Close: #13098 (wontfix) --- src/csharp/Grpc.Tools/build/_grpc/_Grpc.Tools.targets | 3 +-- src/csharp/Grpc.Tools/build/_protobuf/Google.Protobuf.Tools.targets | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) (limited to 'src') diff --git a/src/csharp/Grpc.Tools/build/_grpc/_Grpc.Tools.targets b/src/csharp/Grpc.Tools/build/_grpc/_Grpc.Tools.targets index 5f76c03ce5..3fe1ccc918 100644 --- a/src/csharp/Grpc.Tools/build/_grpc/_Grpc.Tools.targets +++ b/src/csharp/Grpc.Tools/build/_grpc/_Grpc.Tools.targets @@ -22,9 +22,8 @@ - $(Protobuf_PackagedToolsPath)\$(Protobuf_ToolsOs)_x86\$(gRPC_PluginFileName).exe + >$(Protobuf_PackagedToolsPath)\$(Protobuf_ToolsOs)_$(Protobuf_ToolsCpu)\$(gRPC_PluginFileName).exe $(Protobuf_PackagedToolsPath)/$(Protobuf_ToolsOs)_$(Protobuf_ToolsCpu)/$(gRPC_PluginFileName) diff --git a/src/csharp/Grpc.Tools/build/_protobuf/Google.Protobuf.Tools.targets b/src/csharp/Grpc.Tools/build/_protobuf/Google.Protobuf.Tools.targets index 1d233d23a8..26f9efb5a8 100644 --- a/src/csharp/Grpc.Tools/build/_protobuf/Google.Protobuf.Tools.targets +++ b/src/csharp/Grpc.Tools/build/_protobuf/Google.Protobuf.Tools.targets @@ -74,9 +74,8 @@ $(_Protobuf_ToolsOs) $(_Protobuf_ToolsCpu) - $(Protobuf_PackagedToolsPath)\$(Protobuf_ToolsOs)_x86\protoc.exe + >$(Protobuf_PackagedToolsPath)\$(Protobuf_ToolsOs)_$(Protobuf_ToolsCpu)\protoc.exe $(Protobuf_PackagedToolsPath)/$(Protobuf_ToolsOs)_$(Protobuf_ToolsCpu)/protoc -- cgit v1.2.3 From 082b63e09592502b0900a5dff214dc70efb9de56 Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Fri, 7 Dec 2018 09:23:54 -0800 Subject: Refactor server deallocation --- .../grpc/_cython/_cygrpc/completion_queue.pyx.pxi | 2 +- .../grpcio/grpc/_cython/_cygrpc/server.pyx.pxi | 10 +- src/python/grpcio/grpc/_server.py | 106 +++++++++++++-------- .../tests/channelz/_channelz_servicer_test.py | 2 - 4 files changed, 73 insertions(+), 47 deletions(-) (limited to 'src') diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi index 141116df5d..3c33b46dbb 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi @@ -49,7 +49,7 @@ cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline): cdef _interpret_event(grpc_event c_event): cdef _Tag tag if c_event.type == GRPC_QUEUE_TIMEOUT: - # NOTE(nathaniel): For now we coopt ConnectivityEvent here. + # TODO(ericgribkoff) Do not coopt ConnectivityEvent here. return None, ConnectivityEvent(GRPC_QUEUE_TIMEOUT, False, None) elif c_event.type == GRPC_QUEUE_SHUTDOWN: # NOTE(nathaniel): For now we coopt ConnectivityEvent here. diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index ce701724fd..1b8d6790fb 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -128,7 +128,9 @@ cdef class Server: with nogil: grpc_server_cancel_all_calls(self.c_server) - def __dealloc__(self): + # TODO(ericgribkoff) Determine what, if any, portion of this is safe to call + # from __dealloc__, and potentially remove backup_shutdown_queue. + def destroy(self): if self.c_server != NULL: if not self.is_started: pass @@ -146,4 +148,8 @@ cdef class Server: while not self.is_shutdown: time.sleep(0) grpc_server_destroy(self.c_server) - grpc_shutdown() + self.c_server = NULL + + def __dealloc(self): + if self.c_server == NULL: + grpc_shutdown() diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index 3bbfa47da5..eb750ef1a8 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -48,7 +48,7 @@ _CANCELLED = 'cancelled' _EMPTY_FLAGS = 0 -_UNEXPECTED_EXIT_SERVER_GRACE = 1.0 +_DEALLOCATED_SERVER_CHECK_PERIOD_S = 1.0 def _serialized_request(request_event): @@ -676,6 +676,9 @@ class _ServerState(object): self.rpc_states = set() self.due = set() + # A "volatile" flag to interrupt the daemon serving thread + self.server_deallocated = False + def _add_generic_handlers(state, generic_handlers): with state.lock: @@ -702,6 +705,7 @@ def _request_call(state): # TODO(https://github.com/grpc/grpc/issues/6597): delete this function. def _stop_serving(state): if not state.rpc_states and not state.due: + state.server.destroy() for shutdown_event in state.shutdown_events: shutdown_event.set() state.stage = _ServerStage.STOPPED @@ -715,49 +719,69 @@ def _on_call_completed(state): state.active_rpc_count -= 1 -def _serve(state): - while True: - event = state.completion_queue.poll() - if event.tag is _SHUTDOWN_TAG: +def _process_event_and_continue(state, event): + should_continue = True + if event.tag is _SHUTDOWN_TAG: + with state.lock: + state.due.remove(_SHUTDOWN_TAG) + if _stop_serving(state): + should_continue = False + elif event.tag is _REQUEST_CALL_TAG: + with state.lock: + state.due.remove(_REQUEST_CALL_TAG) + concurrency_exceeded = ( + state.maximum_concurrent_rpcs is not None and + state.active_rpc_count >= state.maximum_concurrent_rpcs) + rpc_state, rpc_future = _handle_call( + event, state.generic_handlers, state.interceptor_pipeline, + state.thread_pool, concurrency_exceeded) + if rpc_state is not None: + state.rpc_states.add(rpc_state) + if rpc_future is not None: + state.active_rpc_count += 1 + rpc_future.add_done_callback( + lambda unused_future: _on_call_completed(state)) + if state.stage is _ServerStage.STARTED: + _request_call(state) + elif _stop_serving(state): + should_continue = False + else: + rpc_state, callbacks = event.tag(event) + for callback in callbacks: + callable_util.call_logging_exceptions(callback, + 'Exception calling callback!') + if rpc_state is not None: with state.lock: - state.due.remove(_SHUTDOWN_TAG) + state.rpc_states.remove(rpc_state) if _stop_serving(state): - return - elif event.tag is _REQUEST_CALL_TAG: - with state.lock: - state.due.remove(_REQUEST_CALL_TAG) - concurrency_exceeded = ( - state.maximum_concurrent_rpcs is not None and - state.active_rpc_count >= state.maximum_concurrent_rpcs) - rpc_state, rpc_future = _handle_call( - event, state.generic_handlers, state.interceptor_pipeline, - state.thread_pool, concurrency_exceeded) - if rpc_state is not None: - state.rpc_states.add(rpc_state) - if rpc_future is not None: - state.active_rpc_count += 1 - rpc_future.add_done_callback( - lambda unused_future: _on_call_completed(state)) - if state.stage is _ServerStage.STARTED: - _request_call(state) - elif _stop_serving(state): - return - else: - rpc_state, callbacks = event.tag(event) - for callback in callbacks: - callable_util.call_logging_exceptions( - callback, 'Exception calling callback!') - if rpc_state is not None: - with state.lock: - state.rpc_states.remove(rpc_state) - if _stop_serving(state): - return + should_continue = False + return should_continue + + +def _serve(state): + while True: + timeout = time.time() + _DEALLOCATED_SERVER_CHECK_PERIOD_S + event = state.completion_queue.poll(timeout) + if state.server_deallocated: + _begin_shutdown_once(state) + if event.completion_type != cygrpc.CompletionType.queue_timeout: + if not _process_event_and_continue(state, event): + return # We want to force the deletion of the previous event # ~before~ we poll again; if the event has a reference # to a shutdown Call object, this can induce spinlock. event = None +def _begin_shutdown_once(state): + with state.lock: + if state.stage is _ServerStage.STARTED: + state.server.shutdown(state.completion_queue, _SHUTDOWN_TAG) + state.stage = _ServerStage.GRACE + state.shutdown_events = [] + state.due.add(_SHUTDOWN_TAG) + + def _stop(state, grace): with state.lock: if state.stage is _ServerStage.STOPPED: @@ -765,11 +789,7 @@ def _stop(state, grace): shutdown_event.set() return shutdown_event else: - if state.stage is _ServerStage.STARTED: - state.server.shutdown(state.completion_queue, _SHUTDOWN_TAG) - state.stage = _ServerStage.GRACE - state.shutdown_events = [] - state.due.add(_SHUTDOWN_TAG) + _begin_shutdown_once(state) shutdown_event = threading.Event() state.shutdown_events.append(shutdown_event) if grace is None: @@ -840,7 +860,9 @@ class _Server(grpc.Server): return _stop(self._state, grace) def __del__(self): - _stop(self._state, None) + # We can not grab a lock in __del__(), so set a flag to signal the + # serving daemon thread (if it exists) to initiate shutdown. + self._state.server_deallocated = True def create_server(thread_pool, generic_rpc_handlers, interceptors, options, diff --git a/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py b/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py index 8ca5189522..5265911a0b 100644 --- a/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py +++ b/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py @@ -88,8 +88,6 @@ def _generate_channel_server_pairs(n): def _close_channel_server_pairs(pairs): for pair in pairs: pair.server.stop(None) - # TODO(ericgribkoff) This del should not be required - del pair.server pair.channel.close() -- cgit v1.2.3 From a76d72e0a62d512f919f0fa79e24809427c9e4e7 Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Fri, 14 Dec 2018 17:41:02 -0800 Subject: add tests --- src/python/grpcio_tests/tests/tests.json | 1 + .../tests/unit/_server_shutdown_scenarios.py | 113 +++++++++++++++++++++ .../tests/unit/_server_shutdown_test.py | 87 ++++++++++++++++ 3 files changed, 201 insertions(+) create mode 100644 src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py create mode 100644 src/python/grpcio_tests/tests/unit/_server_shutdown_test.py (limited to 'src') diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json index b27e6f2693..f202a3f932 100644 --- a/src/python/grpcio_tests/tests/tests.json +++ b/src/python/grpcio_tests/tests/tests.json @@ -57,6 +57,7 @@ "unit._reconnect_test.ReconnectTest", "unit._resource_exhausted_test.ResourceExhaustedTest", "unit._rpc_test.RPCTest", + "unit._server_shutdown_test.ServerShutdown", "unit._server_ssl_cert_config_test.ServerSSLCertConfigFetcherParamsChecks", "unit._server_ssl_cert_config_test.ServerSSLCertReloadTestCertConfigReuse", "unit._server_ssl_cert_config_test.ServerSSLCertReloadTestWithClientAuth", diff --git a/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py b/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py new file mode 100644 index 0000000000..da8cef0c43 --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py @@ -0,0 +1,113 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Defines a number of module-scope gRPC scenarios to test server shutdown.""" + +import argparse +import os +import threading +import time +import logging + +import grpc + +from concurrent import futures +from six.moves import queue + +WAIT_TIME = 1000 + +REQUEST = b'request' +RESPONSE = b'response' + +SERVER_RAISES_EXCEPTION = 'server_raises_exception' +SERVER_DEALLOCATED = 'server_deallocated' +SERVER_FORK_CAN_EXIT = 'server_fork_can_exit' + +FORK_EXIT = '/test/ForkExit' + + +class ForkExitHandler(object): + + def unary_unary(self, request, servicer_context): + pid = os.fork() + if pid == 0: + os._exit(0) + return RESPONSE + + def __init__(self): + self.request_streaming = None + self.response_streaming = None + self.request_deserializer = None + self.response_serializer = None + self.unary_stream = None + self.stream_unary = None + self.stream_stream = None + + +class GenericHandler(grpc.GenericRpcHandler): + + def service(self, handler_call_details): + if handler_call_details.method == FORK_EXIT: + return ForkExitHandler() + else: + return None + + +def run_server(port_queue,): + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=10), + options=(('grpc.so_reuseport', 0),)) + port = server.add_insecure_port('[::]:0') + port_queue.put(port) + server.add_generic_rpc_handlers((GenericHandler(),)) + server.start() + # threading.Event.wait() does not exhibit the bug identified in + # https://github.com/grpc/grpc/issues/17093, sleep instead + time.sleep(WAIT_TIME) + + +def run_test(args): + if args.scenario == SERVER_RAISES_EXCEPTION: + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=1), + options=(('grpc.so_reuseport', 0),)) + server.start() + raise Exception() + elif args.scenario == SERVER_DEALLOCATED: + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=1), + options=(('grpc.so_reuseport', 0),)) + server.start() + server.__del__() + while server._state.stage != grpc._server._ServerStage.STOPPED: + pass + elif args.scenario == SERVER_FORK_CAN_EXIT: + port_queue = queue.Queue() + thread = threading.Thread(target=run_server, args=(port_queue,)) + thread.daemon = True + thread.start() + port = port_queue.get() + channel = grpc.insecure_channel('[::]:%d' % port) + multi_callable = channel.unary_unary(FORK_EXIT) + result, call = multi_callable.with_call(REQUEST, wait_for_ready=True) + os.wait() + else: + raise ValueError('unknown test scenario') + + +if __name__ == '__main__': + logging.basicConfig() + parser = argparse.ArgumentParser() + parser.add_argument('scenario', type=str) + args = parser.parse_args() + run_test(args) diff --git a/src/python/grpcio_tests/tests/unit/_server_shutdown_test.py b/src/python/grpcio_tests/tests/unit/_server_shutdown_test.py new file mode 100644 index 0000000000..25d7f5e819 --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_server_shutdown_test.py @@ -0,0 +1,87 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests clean shutdown of server on various interpreter exit conditions. + +The tests in this module spawn a subprocess for each test case, the +test is considered successful if it doesn't hang/timeout. +""" + +import atexit +import os +import subprocess +import sys +import threading +import unittest +import logging + +from tests.unit import _server_shutdown_scenarios + +SCENARIO_FILE = os.path.abspath( + os.path.join( + os.path.dirname(os.path.realpath(__file__)), + '_server_shutdown_scenarios.py')) +INTERPRETER = sys.executable +BASE_COMMAND = [INTERPRETER, SCENARIO_FILE] + +processes = [] +process_lock = threading.Lock() + + +# Make sure we attempt to clean up any +# processes we may have left running +def cleanup_processes(): + with process_lock: + for process in processes: + try: + process.kill() + except Exception: # pylint: disable=broad-except + pass + + +atexit.register(cleanup_processes) + + +def wait(process): + with process_lock: + processes.append(process) + process.wait() + + +class ServerShutdown(unittest.TestCase): + + def test_deallocated_server_stops(self): + process = subprocess.Popen( + BASE_COMMAND + [_server_shutdown_scenarios.SERVER_DEALLOCATED], + stdout=sys.stdout, + stderr=sys.stderr) + wait(process) + + def test_server_exception_exits(self): + process = subprocess.Popen( + BASE_COMMAND + [_server_shutdown_scenarios.SERVER_RAISES_EXCEPTION], + stdout=sys.stdout, + stderr=sys.stderr) + wait(process) + + def test_server_fork_can_exit(self): + process = subprocess.Popen( + BASE_COMMAND + [_server_shutdown_scenarios.SERVER_FORK_CAN_EXIT], + stdout=sys.stdout, + stderr=sys.stderr) + wait(process) + + +if __name__ == '__main__': + logging.basicConfig() + unittest.main(verbosity=2) -- cgit v1.2.3 From 468ae0f48615a58d8f09b76ca142fee27563964a Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Sat, 15 Dec 2018 08:36:32 -0800 Subject: add tracking issue --- src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index 1b8d6790fb..e89e02b171 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -128,8 +128,9 @@ cdef class Server: with nogil: grpc_server_cancel_all_calls(self.c_server) - # TODO(ericgribkoff) Determine what, if any, portion of this is safe to call - # from __dealloc__, and potentially remove backup_shutdown_queue. + # TODO(https://github.com/grpc/grpc/issues/17515) Determine what, if any, + # portion of this is safe to call from __dealloc__, and potentially remove + # backup_shutdown_queue. def destroy(self): if self.c_server != NULL: if not self.is_started: -- cgit v1.2.3 From 718084c6b4aac0099d52a0dcf4c529e5b46ee686 Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Sun, 16 Dec 2018 19:03:18 -0800 Subject: disable fork test on windows --- src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py | 2 +- src/python/grpcio_tests/tests/unit/_server_shutdown_test.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py b/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py index da8cef0c43..acbec0f77d 100644 --- a/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py +++ b/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py @@ -63,7 +63,7 @@ class GenericHandler(grpc.GenericRpcHandler): return None -def run_server(port_queue,): +def run_server(port_queue): server = grpc.server( futures.ThreadPoolExecutor(max_workers=10), options=(('grpc.so_reuseport', 0),)) diff --git a/src/python/grpcio_tests/tests/unit/_server_shutdown_test.py b/src/python/grpcio_tests/tests/unit/_server_shutdown_test.py index 25d7f5e819..0e4591c5e5 100644 --- a/src/python/grpcio_tests/tests/unit/_server_shutdown_test.py +++ b/src/python/grpcio_tests/tests/unit/_server_shutdown_test.py @@ -74,6 +74,7 @@ class ServerShutdown(unittest.TestCase): stderr=sys.stderr) wait(process) + @unittest.skipIf(os.name == 'nt', 'fork not supported on windows') def test_server_fork_can_exit(self): process = subprocess.Popen( BASE_COMMAND + [_server_shutdown_scenarios.SERVER_FORK_CAN_EXIT], -- cgit v1.2.3 From b1407a95c3183714557d753d704bd6b98e603258 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Mon, 17 Dec 2018 14:51:20 -0800 Subject: Improve grpclb trace logging. --- .../client_channel/lb_policy/grpclb/grpclb.cc | 57 ++++++++++++---------- 1 file changed, 31 insertions(+), 26 deletions(-) (limited to 'src') diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index a9a5965ed1..6671e708c8 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -525,8 +525,7 @@ void GrpcLb::BalancerCallState::Orphan() { void GrpcLb::BalancerCallState::StartQuery() { GPR_ASSERT(lb_call_ != nullptr); if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, - "[grpclb %p] Starting LB call (lb_calld: %p, lb_call: %p)", + gpr_log(GPR_INFO, "[grpclb %p] lb_calld=%p: Starting LB call %p", grpclb_policy_.get(), this, lb_call_); } // Create the ops. @@ -670,8 +669,9 @@ void GrpcLb::BalancerCallState::SendClientLoadReportLocked() { grpc_call_error call_error = grpc_call_start_batch_and_execute( lb_call_, &op, 1, &client_load_report_closure_); if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) { - gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", grpclb_policy_.get(), - call_error); + gpr_log(GPR_ERROR, + "[grpclb %p] lb_calld=%p call_error=%d sending client load report", + grpclb_policy_.get(), this, call_error); GPR_ASSERT(GRPC_CALL_OK == call_error); } } @@ -732,15 +732,17 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked( &initial_response->client_stats_report_interval)); if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, - "[grpclb %p] Received initial LB response message; " - "client load reporting interval = %" PRId64 " milliseconds", - grpclb_policy, lb_calld->client_stats_report_interval_); + "[grpclb %p] lb_calld=%p: Received initial LB response " + "message; client load reporting interval = %" PRId64 + " milliseconds", + grpclb_policy, lb_calld, + lb_calld->client_stats_report_interval_); } } else if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, - "[grpclb %p] Received initial LB response message; client load " - "reporting NOT enabled", - grpclb_policy); + "[grpclb %p] lb_calld=%p: Received initial LB response message; " + "client load reporting NOT enabled", + grpclb_policy, lb_calld); } grpc_grpclb_initial_response_destroy(initial_response); lb_calld->seen_initial_response_ = true; @@ -750,15 +752,17 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked( GPR_ASSERT(lb_calld->lb_call_ != nullptr); if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, - "[grpclb %p] Serverlist with %" PRIuPTR " servers received", - grpclb_policy, serverlist->num_servers); + "[grpclb %p] lb_calld=%p: Serverlist with %" PRIuPTR + " servers received", + grpclb_policy, lb_calld, serverlist->num_servers); for (size_t i = 0; i < serverlist->num_servers; ++i) { grpc_resolved_address addr; ParseServer(serverlist->servers[i], &addr); char* ipport; grpc_sockaddr_to_string(&ipport, &addr, false); - gpr_log(GPR_INFO, "[grpclb %p] Serverlist[%" PRIuPTR "]: %s", - grpclb_policy, i, ipport); + gpr_log(GPR_INFO, + "[grpclb %p] lb_calld=%p: Serverlist[%" PRIuPTR "]: %s", + grpclb_policy, lb_calld, i, ipport); gpr_free(ipport); } } @@ -778,9 +782,9 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked( if (grpc_grpclb_serverlist_equals(grpclb_policy->serverlist_, serverlist)) { if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, - "[grpclb %p] Incoming server list identical to current, " - "ignoring.", - grpclb_policy); + "[grpclb %p] lb_calld=%p: Incoming server list identical to " + "current, ignoring.", + grpclb_policy, lb_calld); } grpc_grpclb_destroy_serverlist(serverlist); } else { // New serverlist. @@ -806,8 +810,9 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked( char* response_slice_str = grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX); gpr_log(GPR_ERROR, - "[grpclb %p] Invalid LB response received: '%s'. Ignoring.", - grpclb_policy, response_slice_str); + "[grpclb %p] lb_calld=%p: Invalid LB response received: '%s'. " + "Ignoring.", + grpclb_policy, lb_calld, response_slice_str); gpr_free(response_slice_str); } grpc_slice_unref_internal(response_slice); @@ -838,9 +843,9 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked( char* status_details = grpc_slice_to_c_string(lb_calld->lb_call_status_details_); gpr_log(GPR_INFO, - "[grpclb %p] Status from LB server received. Status = %d, details " - "= '%s', (lb_calld: %p, lb_call: %p), error '%s'", - grpclb_policy, lb_calld->lb_call_status_, status_details, lb_calld, + "[grpclb %p] lb_calld=%p: Status from LB server received. " + "Status = %d, details = '%s', (lb_call: %p), error '%s'", + grpclb_policy, lb_calld, lb_calld->lb_call_status_, status_details, lb_calld->lb_call_, grpc_error_string(error)); gpr_free(status_details); } @@ -1592,6 +1597,10 @@ void GrpcLb::CreateRoundRobinPolicyLocked(const Args& args) { this); return; } + if (grpc_lb_glb_trace.enabled()) { + gpr_log(GPR_INFO, "[grpclb %p] Created new RR policy %p", this, + rr_policy_.get()); + } // TODO(roth): We currently track this ref manually. Once the new // ClosureRef API is done, pass the RefCountedPtr<> along with the closure. auto self = Ref(DEBUG_LOCATION, "on_rr_reresolution_requested"); @@ -1685,10 +1694,6 @@ void GrpcLb::CreateOrUpdateRoundRobinPolicyLocked() { lb_policy_args.client_channel_factory = client_channel_factory(); lb_policy_args.args = args; CreateRoundRobinPolicyLocked(lb_policy_args); - if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, "[grpclb %p] Created new RR policy %p", this, - rr_policy_.get()); - } } grpc_channel_args_destroy(args); } -- cgit v1.2.3 From 05d3ab2852cb2d7ce13f7ca059b36884e37175fd Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Mon, 17 Dec 2018 14:54:10 -0800 Subject: Address comments, improve tests --- src/python/grpcio_tests/tests/unit/BUILD.bazel | 1 + .../tests/unit/_server_shutdown_scenarios.py | 36 ++++++---------------- .../tests/unit/_server_shutdown_test.py | 2 ++ 3 files changed, 13 insertions(+), 26 deletions(-) (limited to 'src') diff --git a/src/python/grpcio_tests/tests/unit/BUILD.bazel b/src/python/grpcio_tests/tests/unit/BUILD.bazel index 4f850220f8..7a910fd9fe 100644 --- a/src/python/grpcio_tests/tests/unit/BUILD.bazel +++ b/src/python/grpcio_tests/tests/unit/BUILD.bazel @@ -28,6 +28,7 @@ GRPCIO_TESTS_UNIT = [ # TODO(ghostwriternr): To be added later. # "_server_ssl_cert_config_test.py", "_server_test.py", + "_server_shutdown_test.py", "_session_cache_test.py", ] diff --git a/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py b/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py index acbec0f77d..1797e9bbfe 100644 --- a/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py +++ b/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py @@ -20,6 +20,7 @@ import time import logging import grpc +from tests.unit import test_common from concurrent import futures from six.moves import queue @@ -36,37 +37,24 @@ SERVER_FORK_CAN_EXIT = 'server_fork_can_exit' FORK_EXIT = '/test/ForkExit' -class ForkExitHandler(object): - - def unary_unary(self, request, servicer_context): - pid = os.fork() - if pid == 0: - os._exit(0) - return RESPONSE - - def __init__(self): - self.request_streaming = None - self.response_streaming = None - self.request_deserializer = None - self.response_serializer = None - self.unary_stream = None - self.stream_unary = None - self.stream_stream = None +def fork_and_exit(request, servicer_context): + pid = os.fork() + if pid == 0: + os._exit(0) + return RESPONSE class GenericHandler(grpc.GenericRpcHandler): def service(self, handler_call_details): if handler_call_details.method == FORK_EXIT: - return ForkExitHandler() + return grpc.unary_unary_rpc_method_handler(fork_and_exit) else: return None def run_server(port_queue): - server = grpc.server( - futures.ThreadPoolExecutor(max_workers=10), - options=(('grpc.so_reuseport', 0),)) + server = test_common.test_server() port = server.add_insecure_port('[::]:0') port_queue.put(port) server.add_generic_rpc_handlers((GenericHandler(),)) @@ -78,15 +66,11 @@ def run_server(port_queue): def run_test(args): if args.scenario == SERVER_RAISES_EXCEPTION: - server = grpc.server( - futures.ThreadPoolExecutor(max_workers=1), - options=(('grpc.so_reuseport', 0),)) + server = test_common.test_server() server.start() raise Exception() elif args.scenario == SERVER_DEALLOCATED: - server = grpc.server( - futures.ThreadPoolExecutor(max_workers=1), - options=(('grpc.so_reuseport', 0),)) + server = test_common.test_server() server.start() server.__del__() while server._state.stage != grpc._server._ServerStage.STOPPED: diff --git a/src/python/grpcio_tests/tests/unit/_server_shutdown_test.py b/src/python/grpcio_tests/tests/unit/_server_shutdown_test.py index 0e4591c5e5..47446d65a5 100644 --- a/src/python/grpcio_tests/tests/unit/_server_shutdown_test.py +++ b/src/python/grpcio_tests/tests/unit/_server_shutdown_test.py @@ -60,6 +60,8 @@ def wait(process): class ServerShutdown(unittest.TestCase): + # Currently we shut down a server (if possible) after the Python server + # instance is garbage collected. This behavior may change in the future. def test_deallocated_server_stops(self): process = subprocess.Popen( BASE_COMMAND + [_server_shutdown_scenarios.SERVER_DEALLOCATED], -- cgit v1.2.3 From 08a90f03d4d0ac846f517c9aeab7aeb05b1cfdca Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Mon, 17 Dec 2018 15:09:59 -0800 Subject: Remove the fake package dependency && temporarily skip the Channelz tests --- src/python/grpcio_tests/setup.py | 12 +++++++++--- .../grpcio_tests/tests/channelz/_channelz_servicer_test.py | 1 + 2 files changed, 10 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/python/grpcio_tests/setup.py b/src/python/grpcio_tests/setup.py index f9cb9d0cec..800b865da6 100644 --- a/src/python/grpcio_tests/setup.py +++ b/src/python/grpcio_tests/setup.py @@ -37,13 +37,19 @@ PACKAGE_DIRECTORIES = { } INSTALL_REQUIRES = ( - 'coverage>=4.0', 'enum34>=1.0.4', + 'coverage>=4.0', + 'enum34>=1.0.4', 'grpcio>={version}'.format(version=grpc_version.VERSION), - 'grpcio-channelz>={version}'.format(version=grpc_version.VERSION), + # TODO(https://github.com/pypa/warehouse/issues/5196) + # Re-enable it once we got the name back + # 'grpcio-channelz>={version}'.format(version=grpc_version.VERSION), 'grpcio-status>={version}'.format(version=grpc_version.VERSION), 'grpcio-tools>={version}'.format(version=grpc_version.VERSION), 'grpcio-health-checking>={version}'.format(version=grpc_version.VERSION), - 'oauth2client>=1.4.7', 'protobuf>=3.6.0', 'six>=1.10', 'google-auth>=1.0.0', + 'oauth2client>=1.4.7', + 'protobuf>=3.6.0', + 'six>=1.10', + 'google-auth>=1.0.0', 'requests>=2.14.2') if not PY3: diff --git a/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py b/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py index 8ca5189522..d060a4973e 100644 --- a/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py +++ b/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py @@ -93,6 +93,7 @@ def _close_channel_server_pairs(pairs): pair.channel.close() +@unittest.skip('https://github.com/pypa/warehouse/issues/5196') class ChannelzServicerTest(unittest.TestCase): def _send_successful_unary_unary(self, idx): -- cgit v1.2.3 From 8183fe12992ac36fd99b637422d6bf158e760036 Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Mon, 17 Dec 2018 18:29:25 -0800 Subject: fix BUILD.bazel --- src/python/grpcio_tests/tests/unit/BUILD.bazel | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'src') diff --git a/src/python/grpcio_tests/tests/unit/BUILD.bazel b/src/python/grpcio_tests/tests/unit/BUILD.bazel index 7a910fd9fe..1b462ec67a 100644 --- a/src/python/grpcio_tests/tests/unit/BUILD.bazel +++ b/src/python/grpcio_tests/tests/unit/BUILD.bazel @@ -50,6 +50,11 @@ py_library( srcs = ["_exit_scenarios.py"], ) +py_library( + name = "_server_shutdown_scenarios", + srcs = ["_server_shutdown_scenarios.py"], +) + py_library( name = "_thread_pool", srcs = ["_thread_pool.py"], @@ -71,6 +76,7 @@ py_library( ":resources", ":test_common", ":_exit_scenarios", + ":_server_shutdown_scenarios", ":_thread_pool", ":_from_grpc_import_star", "//src/python/grpcio_tests/tests/unit/framework/common", -- cgit v1.2.3 From 3c49252d47f149c8b3262f9a1db83f11340e368b Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Mon, 17 Dec 2018 21:05:13 -0800 Subject: bazel docker image does not support ipv6 --- src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src') diff --git a/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py b/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py index 1797e9bbfe..1d1fdba11e 100644 --- a/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py +++ b/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py @@ -81,7 +81,7 @@ def run_test(args): thread.daemon = True thread.start() port = port_queue.get() - channel = grpc.insecure_channel('[::]:%d' % port) + channel = grpc.insecure_channel('localhost:%d' % port) multi_callable = channel.unary_unary(FORK_EXIT) result, call = multi_callable.with_call(REQUEST, wait_for_ready=True) os.wait() -- cgit v1.2.3 From 9a7eec0c455b3f6db9c1d06e73398a330e2c0729 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Tue, 18 Dec 2018 11:36:09 -0800 Subject: Suppress compiler error by initializing sent_length --- src/core/lib/iomgr/tcp_posix.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src') diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 211ae011cb..c268c18664 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -818,7 +818,7 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) { struct msghdr msg; struct iovec iov[MAX_WRITE_IOVEC]; msg_iovlen_type iov_size; - ssize_t sent_length; + ssize_t sent_length = 0; size_t sending_length; size_t trailing; size_t unwind_slice_idx; -- cgit v1.2.3 From 1438b17890b56f57b21e49bc7bcd4e5d589425cc Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Tue, 18 Dec 2018 14:14:42 -0800 Subject: Fix bug that was breaking subchannel reuse in grpclb. --- .../client_channel/lb_policy/grpclb/grpclb.cc | 18 +++++----- test/cpp/end2end/grpclb_end2end_test.cc | 41 ++++++++++++++++++++++ 2 files changed, 51 insertions(+), 8 deletions(-) (limited to 'src') diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index a9a5965ed1..49a1b2d692 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -361,7 +361,9 @@ void lb_token_destroy(void* token) { } } int lb_token_cmp(void* token1, void* token2) { - return GPR_ICMP(token1, token2); + // Always indicate a match, since we don't want this channel arg to + // affect the subchannel's key in the index. + return 0; } const grpc_arg_pointer_vtable lb_token_arg_vtable = { lb_token_copy, lb_token_destroy, lb_token_cmp}; @@ -422,7 +424,7 @@ ServerAddressList ProcessServerlist(const grpc_grpclb_serverlist* serverlist) { grpc_resolved_address addr; ParseServer(server, &addr); // LB token processing. - void* lb_token; + grpc_mdelem lb_token; if (server->has_load_balance_token) { const size_t lb_token_max_length = GPR_ARRAY_SIZE(server->load_balance_token); @@ -430,9 +432,7 @@ ServerAddressList ProcessServerlist(const grpc_grpclb_serverlist* serverlist) { strnlen(server->load_balance_token, lb_token_max_length); grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer( server->load_balance_token, lb_token_length); - lb_token = - (void*)grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr) - .payload; + lb_token = grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr); } else { char* uri = grpc_sockaddr_to_uri(&addr); gpr_log(GPR_INFO, @@ -440,14 +440,16 @@ ServerAddressList ProcessServerlist(const grpc_grpclb_serverlist* serverlist) { "be used instead", uri); gpr_free(uri); - lb_token = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload; + lb_token = GRPC_MDELEM_LB_TOKEN_EMPTY; } // Add address. grpc_arg arg = grpc_channel_arg_pointer_create( - const_cast(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN), lb_token, - &lb_token_arg_vtable); + const_cast(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN), + (void*)lb_token.payload, &lb_token_arg_vtable); grpc_channel_args* args = grpc_channel_args_copy_and_add(nullptr, &arg, 1); addresses.emplace_back(addr, args); + // Clean up. + GRPC_MDELEM_UNREF(lb_token); } return addresses; } diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index 2eaacd429d..f739ed032b 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -18,6 +18,7 @@ #include #include +#include #include #include @@ -145,6 +146,7 @@ class BackendServiceImpl : public BackendService { IncreaseRequestCount(); const auto status = TestServiceImpl::Echo(context, request, response); IncreaseResponseCount(); + AddClient(context->peer()); return status; } @@ -157,9 +159,21 @@ class BackendServiceImpl : public BackendService { return prev; } + std::set clients() { + std::unique_lock lock(clients_mu_); + return clients_; + } + private: + void AddClient(const grpc::string& client) { + std::unique_lock lock(clients_mu_); + clients_.insert(client); + } + std::mutex mu_; bool shutdown_ = false; + std::mutex clients_mu_; + std::set clients_; }; grpc::string Ip4ToPackedString(const char* ip_str) { @@ -303,6 +317,11 @@ class BalancerServiceImpl : public BalancerService { auto* server = response.mutable_server_list()->add_servers(); server->set_ip_address(Ip4ToPackedString("127.0.0.1")); server->set_port(backend_port); + static int token_count = 0; + char* token; + gpr_asprintf(&token, "token%03d", ++token_count); + server->set_load_balance_token(token); + gpr_free(token); } return response; } @@ -675,6 +694,28 @@ TEST_F(SingleBalancerTest, Vanilla) { EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } +TEST_F(SingleBalancerTest, SameBackendListedMultipleTimes) { + SetNextResolutionAllBalancers(); + // Same backend listed twice. + std::vector ports; + ports.push_back(backend_servers_[0].port_); + ports.push_back(backend_servers_[0].port_); + const size_t kNumRpcsPerAddress = 10; + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends(ports, {}), 0); + // We need to wait for the backend to come online. + WaitForBackend(0); + // Send kNumRpcsPerAddress RPCs per server. + CheckRpcSendOk(kNumRpcsPerAddress * ports.size()); + // Backend should have gotten 20 requests. + EXPECT_EQ(kNumRpcsPerAddress * 2, + backend_servers_[0].service_->request_count()); + // And they should have come from a single client port, because of + // subchannel sharing. + EXPECT_EQ(1UL, backends_[0]->clients().size()); + balancers_[0]->NotifyDoneWithServerlists(); +} + TEST_F(SingleBalancerTest, SecureNaming) { ResetStub(0, kApplicationTargetName_ + ";lb"); SetNextResolution({AddressData{balancer_servers_[0].port_, true, "lb"}}); -- cgit v1.2.3 From 09a173b4b612d1cb27d82490d83dd95e0aece203 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Tue, 18 Dec 2018 12:51:48 -0800 Subject: Remove now-unnecessary workarounds for alignment issues. --- src/core/ext/filters/client_channel/lb_policy.h | 6 --- .../credentials/composite/composite_credentials.cc | 61 ++++------------------ .../credentials/composite/composite_credentials.h | 43 +++------------ test/core/security/credentials_test.cc | 22 ++++---- 4 files changed, 29 insertions(+), 103 deletions(-) (limited to 'src') diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index 6b76fe5d5d..29b8c28be6 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -205,12 +205,6 @@ class LoadBalancingPolicy : public InternallyRefCounted { grpc_pollset_set* interested_parties_; /// Callback to force a re-resolution. grpc_closure* request_reresolution_; - - // Dummy classes needed for alignment issues. - // See https://github.com/grpc/grpc/issues/16032 for context. - // TODO(ncteisen): remove this as soon as the issue is resolved. - channelz::ChildRefsList dummy_list_foo; - channelz::ChildRefsList dummy_list_bar; }; } // namespace grpc_core diff --git a/src/core/lib/security/credentials/composite/composite_credentials.cc b/src/core/lib/security/credentials/composite/composite_credentials.cc index 85dcd4693b..586bbed778 100644 --- a/src/core/lib/security/credentials/composite/composite_credentials.cc +++ b/src/core/lib/security/credentials/composite/composite_credentials.cc @@ -35,44 +35,6 @@ static void composite_call_metadata_cb(void* arg, grpc_error* error); -grpc_call_credentials_array::~grpc_call_credentials_array() { - for (size_t i = 0; i < num_creds_; ++i) { - creds_array_[i].~RefCountedPtr(); - } - if (creds_array_ != nullptr) { - gpr_free(creds_array_); - } -} - -grpc_call_credentials_array::grpc_call_credentials_array( - const grpc_call_credentials_array& that) - : num_creds_(that.num_creds_) { - reserve(that.capacity_); - for (size_t i = 0; i < num_creds_; ++i) { - new (&creds_array_[i]) - grpc_core::RefCountedPtr(that.creds_array_[i]); - } -} - -void grpc_call_credentials_array::reserve(size_t capacity) { - if (capacity_ >= capacity) { - return; - } - grpc_core::RefCountedPtr* new_arr = - static_cast*>(gpr_malloc( - sizeof(grpc_core::RefCountedPtr) * capacity)); - if (creds_array_ != nullptr) { - for (size_t i = 0; i < num_creds_; ++i) { - new (&new_arr[i]) grpc_core::RefCountedPtr( - std::move(creds_array_[i])); - creds_array_[i].~RefCountedPtr(); - } - gpr_free(creds_array_); - } - creds_array_ = new_arr; - capacity_ = capacity; -} - namespace { struct grpc_composite_call_credentials_metadata_context { grpc_composite_call_credentials_metadata_context( @@ -103,13 +65,13 @@ static void composite_call_metadata_cb(void* arg, grpc_error* error) { grpc_composite_call_credentials_metadata_context* ctx = static_cast(arg); if (error == GRPC_ERROR_NONE) { - const grpc_call_credentials_array& inner = ctx->composite_creds->inner(); + const grpc_composite_call_credentials::CallCredentialsList& inner = + ctx->composite_creds->inner(); /* See if we need to get some more metadata. */ if (ctx->creds_index < inner.size()) { - if (inner.get(ctx->creds_index++) - ->get_request_metadata( - ctx->pollent, ctx->auth_md_context, ctx->md_array, - &ctx->internal_on_request_metadata, &error)) { + if (inner[ctx->creds_index++]->get_request_metadata( + ctx->pollent, ctx->auth_md_context, ctx->md_array, + &ctx->internal_on_request_metadata, &error)) { // Synchronous response, so call ourselves recursively. composite_call_metadata_cb(arg, error); GRPC_ERROR_UNREF(error); @@ -130,12 +92,11 @@ bool grpc_composite_call_credentials::get_request_metadata( ctx = grpc_core::New( this, pollent, auth_md_context, md_array, on_request_metadata); bool synchronous = true; - const grpc_call_credentials_array& inner = ctx->composite_creds->inner(); + const CallCredentialsList& inner = ctx->composite_creds->inner(); while (ctx->creds_index < inner.size()) { - if (inner.get(ctx->creds_index++) - ->get_request_metadata(ctx->pollent, ctx->auth_md_context, - ctx->md_array, - &ctx->internal_on_request_metadata, error)) { + if (inner[ctx->creds_index++]->get_request_metadata( + ctx->pollent, ctx->auth_md_context, ctx->md_array, + &ctx->internal_on_request_metadata, error)) { if (*error != GRPC_ERROR_NONE) break; } else { synchronous = false; // Async return. @@ -149,7 +110,7 @@ bool grpc_composite_call_credentials::get_request_metadata( void grpc_composite_call_credentials::cancel_get_request_metadata( grpc_credentials_mdelem_array* md_array, grpc_error* error) { for (size_t i = 0; i < inner_.size(); ++i) { - inner_.get(i)->cancel_get_request_metadata(md_array, GRPC_ERROR_REF(error)); + inner_[i]->cancel_get_request_metadata(md_array, GRPC_ERROR_REF(error)); } GRPC_ERROR_UNREF(error); } @@ -172,7 +133,7 @@ void grpc_composite_call_credentials::push_to_inner( auto composite_creds = static_cast(creds.get()); for (size_t i = 0; i < composite_creds->inner().size(); ++i) { - inner_.push_back(std::move(composite_creds->inner_.get_mutable(i))); + inner_.push_back(std::move(composite_creds->inner_[i])); } } diff --git a/src/core/lib/security/credentials/composite/composite_credentials.h b/src/core/lib/security/credentials/composite/composite_credentials.h index 6b7fca1370..7a1c7d5e42 100644 --- a/src/core/lib/security/credentials/composite/composite_credentials.h +++ b/src/core/lib/security/credentials/composite/composite_credentials.h @@ -21,43 +21,10 @@ #include +#include "src/core/lib/gprpp/inlined_vector.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/security/credentials/credentials.h" -// TODO(soheil): Replace this with InlinedVector once #16032 is resolved. -class grpc_call_credentials_array { - public: - grpc_call_credentials_array() = default; - grpc_call_credentials_array(const grpc_call_credentials_array& that); - - ~grpc_call_credentials_array(); - - void reserve(size_t capacity); - - // Must reserve before pushing any data. - void push_back(grpc_core::RefCountedPtr cred) { - GPR_DEBUG_ASSERT(capacity_ > num_creds_); - new (&creds_array_[num_creds_++]) - grpc_core::RefCountedPtr(std::move(cred)); - } - - const grpc_core::RefCountedPtr& get(size_t i) const { - GPR_DEBUG_ASSERT(i < num_creds_); - return creds_array_[i]; - } - grpc_core::RefCountedPtr& get_mutable(size_t i) { - GPR_DEBUG_ASSERT(i < num_creds_); - return creds_array_[i]; - } - - size_t size() const { return num_creds_; } - - private: - grpc_core::RefCountedPtr* creds_array_ = nullptr; - size_t num_creds_ = 0; - size_t capacity_ = 0; -}; - /* -- Composite channel credentials. -- */ class grpc_composite_channel_credentials : public grpc_channel_credentials { @@ -97,6 +64,10 @@ class grpc_composite_channel_credentials : public grpc_channel_credentials { class grpc_composite_call_credentials : public grpc_call_credentials { public: + using CallCredentialsList = + grpc_core::InlinedVector, + 2>; + grpc_composite_call_credentials( grpc_core::RefCountedPtr creds1, grpc_core::RefCountedPtr creds2); @@ -111,13 +82,13 @@ class grpc_composite_call_credentials : public grpc_call_credentials { void cancel_get_request_metadata(grpc_credentials_mdelem_array* md_array, grpc_error* error) override; - const grpc_call_credentials_array& inner() const { return inner_; } + const CallCredentialsList& inner() const { return inner_; } private: void push_to_inner(grpc_core::RefCountedPtr creds, bool is_composite); - grpc_call_credentials_array inner_; + CallCredentialsList inner_; }; #endif /* GRPC_CORE_LIB_SECURITY_CREDENTIALS_COMPOSITE_COMPOSITE_CREDENTIALS_H \ diff --git a/test/core/security/credentials_test.cc b/test/core/security/credentials_test.cc index b3a8161786..b655535335 100644 --- a/test/core/security/credentials_test.cc +++ b/test/core/security/credentials_test.cc @@ -465,14 +465,14 @@ static void test_oauth2_google_iam_composite_creds(void) { google_iam_creds->Unref(); GPR_ASSERT(strcmp(composite_creds->type(), GRPC_CALL_CREDENTIALS_TYPE_COMPOSITE) == 0); - const grpc_call_credentials_array& creds_array = + const grpc_composite_call_credentials::CallCredentialsList& creds_list = static_cast(composite_creds) ->inner(); - GPR_ASSERT(creds_array.size() == 2); - GPR_ASSERT(strcmp(creds_array.get(0)->type(), - GRPC_CALL_CREDENTIALS_TYPE_OAUTH2) == 0); - GPR_ASSERT( - strcmp(creds_array.get(1)->type(), GRPC_CALL_CREDENTIALS_TYPE_IAM) == 0); + GPR_ASSERT(creds_list.size() == 2); + GPR_ASSERT(strcmp(creds_list[0]->type(), GRPC_CALL_CREDENTIALS_TYPE_OAUTH2) == + 0); + GPR_ASSERT(strcmp(creds_list[1]->type(), GRPC_CALL_CREDENTIALS_TYPE_IAM) == + 0); run_request_metadata_test(composite_creds, auth_md_ctx, state); composite_creds->Unref(); } @@ -492,13 +492,13 @@ class check_channel_oauth2_google_iam final : public grpc_channel_credentials { GPR_ASSERT(call_creds != nullptr); GPR_ASSERT( strcmp(call_creds->type(), GRPC_CALL_CREDENTIALS_TYPE_COMPOSITE) == 0); - const grpc_call_credentials_array& creds_array = + const grpc_composite_call_credentials::CallCredentialsList& creds_list = static_cast(call_creds.get()) ->inner(); - GPR_ASSERT(strcmp(creds_array.get(0)->type(), - GRPC_CALL_CREDENTIALS_TYPE_OAUTH2) == 0); - GPR_ASSERT(strcmp(creds_array.get(1)->type(), - GRPC_CALL_CREDENTIALS_TYPE_IAM) == 0); + GPR_ASSERT( + strcmp(creds_list[0]->type(), GRPC_CALL_CREDENTIALS_TYPE_OAUTH2) == 0); + GPR_ASSERT(strcmp(creds_list[1]->type(), GRPC_CALL_CREDENTIALS_TYPE_IAM) == + 0); return nullptr; } }; -- cgit v1.2.3 From 82797771679c93045876f48707c5cb46bfae4f53 Mon Sep 17 00:00:00 2001 From: easy Date: Wed, 19 Dec 2018 12:22:38 +1100 Subject: Destruct CensusContext to avoid leaking memory. Otherwise, the placement-new leaks context -> span_ -> impl_ which is a std::shared_ptr. --- src/cpp/ext/filters/census/context.cc | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'src') diff --git a/src/cpp/ext/filters/census/context.cc b/src/cpp/ext/filters/census/context.cc index 78fc69a805..160590353a 100644 --- a/src/cpp/ext/filters/census/context.cc +++ b/src/cpp/ext/filters/census/context.cc @@ -28,6 +28,9 @@ using ::opencensus::trace::SpanContext; void GenerateServerContext(absl::string_view tracing, absl::string_view stats, absl::string_view primary_role, absl::string_view method, CensusContext* context) { + // Destruct the current CensusContext to free the Span memory before + // overwriting it below. + context->~CensusContext(); GrpcTraceContext trace_ctxt; if (TraceContextEncoding::Decode(tracing, &trace_ctxt) != TraceContextEncoding::kEncodeDecodeFailure) { @@ -42,6 +45,9 @@ void GenerateServerContext(absl::string_view tracing, absl::string_view stats, void GenerateClientContext(absl::string_view method, CensusContext* ctxt, CensusContext* parent_ctxt) { + // Destruct the current CensusContext to free the Span memory before + // overwriting it below. + ctxt->~CensusContext(); if (parent_ctxt != nullptr) { SpanContext span_ctxt = parent_ctxt->Context(); Span span = parent_ctxt->Span(); -- cgit v1.2.3 From 6211f4589b47620440ea60c614116398a7a1ab43 Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Wed, 19 Dec 2018 08:34:13 -0800 Subject: removed unused traceback import --- src/python/grpcio_tests/commands.py | 1 - 1 file changed, 1 deletion(-) (limited to 'src') diff --git a/src/python/grpcio_tests/commands.py b/src/python/grpcio_tests/commands.py index 18413abab0..496bcfbcbf 100644 --- a/src/python/grpcio_tests/commands.py +++ b/src/python/grpcio_tests/commands.py @@ -22,7 +22,6 @@ import re import shutil import subprocess import sys -import traceback import setuptools from setuptools.command import build_ext -- cgit v1.2.3 From 09f57c17ee22e344adad9d81be5747305a02d33e Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 20 Dec 2018 10:10:19 -0800 Subject: Refactor request routing code out of client_channel. --- BUILD | 2 + CMakeLists.txt | 6 + Makefile | 6 + build.yaml | 2 + config.m4 | 1 + config.w32 | 1 + gRPC-C++.podspec | 1 + gRPC-Core.podspec | 3 + grpc.gemspec | 2 + grpc.gyp | 4 + package.xml | 2 + .../ext/filters/client_channel/client_channel.cc | 1035 +++----------------- src/core/ext/filters/client_channel/lb_policy.h | 7 +- .../client_channel/lb_policy/grpclb/grpclb.cc | 8 +- .../lb_policy/pick_first/pick_first.cc | 8 +- .../lb_policy/round_robin/round_robin.cc | 8 +- .../filters/client_channel/lb_policy/xds/xds.cc | 8 +- .../ext/filters/client_channel/request_routing.cc | 936 ++++++++++++++++++ .../ext/filters/client_channel/request_routing.h | 177 ++++ .../client_channel/resolver_result_parsing.cc | 14 +- .../client_channel/resolver_result_parsing.h | 30 +- src/python/grpcio/grpc_core_dependencies.py | 1 + tools/doxygen/Doxyfile.core.internal | 2 + tools/run_tests/generated/sources_and_headers.json | 3 + 24 files changed, 1361 insertions(+), 906 deletions(-) create mode 100644 src/core/ext/filters/client_channel/request_routing.cc create mode 100644 src/core/ext/filters/client_channel/request_routing.h (limited to 'src') diff --git a/BUILD b/BUILD index b9b39a37a7..e3c765198b 100644 --- a/BUILD +++ b/BUILD @@ -1056,6 +1056,7 @@ grpc_cc_library( "src/core/ext/filters/client_channel/parse_address.cc", "src/core/ext/filters/client_channel/proxy_mapper.cc", "src/core/ext/filters/client_channel/proxy_mapper_registry.cc", + "src/core/ext/filters/client_channel/request_routing.cc", "src/core/ext/filters/client_channel/resolver.cc", "src/core/ext/filters/client_channel/resolver_registry.cc", "src/core/ext/filters/client_channel/resolver_result_parsing.cc", @@ -1079,6 +1080,7 @@ grpc_cc_library( "src/core/ext/filters/client_channel/parse_address.h", "src/core/ext/filters/client_channel/proxy_mapper.h", "src/core/ext/filters/client_channel/proxy_mapper_registry.h", + "src/core/ext/filters/client_channel/request_routing.h", "src/core/ext/filters/client_channel/resolver.h", "src/core/ext/filters/client_channel/resolver_factory.h", "src/core/ext/filters/client_channel/resolver_registry.h", diff --git a/CMakeLists.txt b/CMakeLists.txt index 1f34bf841d..7688630781 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1210,6 +1210,7 @@ add_library(grpc src/core/ext/filters/client_channel/parse_address.cc src/core/ext/filters/client_channel/proxy_mapper.cc src/core/ext/filters/client_channel/proxy_mapper_registry.cc + src/core/ext/filters/client_channel/request_routing.cc src/core/ext/filters/client_channel/resolver.cc src/core/ext/filters/client_channel/resolver_registry.cc src/core/ext/filters/client_channel/resolver_result_parsing.cc @@ -1562,6 +1563,7 @@ add_library(grpc_cronet src/core/ext/filters/client_channel/parse_address.cc src/core/ext/filters/client_channel/proxy_mapper.cc src/core/ext/filters/client_channel/proxy_mapper_registry.cc + src/core/ext/filters/client_channel/request_routing.cc src/core/ext/filters/client_channel/resolver.cc src/core/ext/filters/client_channel/resolver_registry.cc src/core/ext/filters/client_channel/resolver_result_parsing.cc @@ -1935,6 +1937,7 @@ add_library(grpc_test_util src/core/ext/filters/client_channel/parse_address.cc src/core/ext/filters/client_channel/proxy_mapper.cc src/core/ext/filters/client_channel/proxy_mapper_registry.cc + src/core/ext/filters/client_channel/request_routing.cc src/core/ext/filters/client_channel/resolver.cc src/core/ext/filters/client_channel/resolver_registry.cc src/core/ext/filters/client_channel/resolver_result_parsing.cc @@ -2256,6 +2259,7 @@ add_library(grpc_test_util_unsecure src/core/ext/filters/client_channel/parse_address.cc src/core/ext/filters/client_channel/proxy_mapper.cc src/core/ext/filters/client_channel/proxy_mapper_registry.cc + src/core/ext/filters/client_channel/request_routing.cc src/core/ext/filters/client_channel/resolver.cc src/core/ext/filters/client_channel/resolver_registry.cc src/core/ext/filters/client_channel/resolver_result_parsing.cc @@ -2589,6 +2593,7 @@ add_library(grpc_unsecure src/core/ext/filters/client_channel/parse_address.cc src/core/ext/filters/client_channel/proxy_mapper.cc src/core/ext/filters/client_channel/proxy_mapper_registry.cc + src/core/ext/filters/client_channel/request_routing.cc src/core/ext/filters/client_channel/resolver.cc src/core/ext/filters/client_channel/resolver_registry.cc src/core/ext/filters/client_channel/resolver_result_parsing.cc @@ -3443,6 +3448,7 @@ add_library(grpc++_cronet src/core/ext/filters/client_channel/parse_address.cc src/core/ext/filters/client_channel/proxy_mapper.cc src/core/ext/filters/client_channel/proxy_mapper_registry.cc + src/core/ext/filters/client_channel/request_routing.cc src/core/ext/filters/client_channel/resolver.cc src/core/ext/filters/client_channel/resolver_registry.cc src/core/ext/filters/client_channel/resolver_result_parsing.cc diff --git a/Makefile b/Makefile index 2ad3eb4c14..147e9505a3 100644 --- a/Makefile +++ b/Makefile @@ -3723,6 +3723,7 @@ LIBGRPC_SRC = \ src/core/ext/filters/client_channel/parse_address.cc \ src/core/ext/filters/client_channel/proxy_mapper.cc \ src/core/ext/filters/client_channel/proxy_mapper_registry.cc \ + src/core/ext/filters/client_channel/request_routing.cc \ src/core/ext/filters/client_channel/resolver.cc \ src/core/ext/filters/client_channel/resolver_registry.cc \ src/core/ext/filters/client_channel/resolver_result_parsing.cc \ @@ -4069,6 +4070,7 @@ LIBGRPC_CRONET_SRC = \ src/core/ext/filters/client_channel/parse_address.cc \ src/core/ext/filters/client_channel/proxy_mapper.cc \ src/core/ext/filters/client_channel/proxy_mapper_registry.cc \ + src/core/ext/filters/client_channel/request_routing.cc \ src/core/ext/filters/client_channel/resolver.cc \ src/core/ext/filters/client_channel/resolver_registry.cc \ src/core/ext/filters/client_channel/resolver_result_parsing.cc \ @@ -4435,6 +4437,7 @@ LIBGRPC_TEST_UTIL_SRC = \ src/core/ext/filters/client_channel/parse_address.cc \ src/core/ext/filters/client_channel/proxy_mapper.cc \ src/core/ext/filters/client_channel/proxy_mapper_registry.cc \ + src/core/ext/filters/client_channel/request_routing.cc \ src/core/ext/filters/client_channel/resolver.cc \ src/core/ext/filters/client_channel/resolver_registry.cc \ src/core/ext/filters/client_channel/resolver_result_parsing.cc \ @@ -4743,6 +4746,7 @@ LIBGRPC_TEST_UTIL_UNSECURE_SRC = \ src/core/ext/filters/client_channel/parse_address.cc \ src/core/ext/filters/client_channel/proxy_mapper.cc \ src/core/ext/filters/client_channel/proxy_mapper_registry.cc \ + src/core/ext/filters/client_channel/request_routing.cc \ src/core/ext/filters/client_channel/resolver.cc \ src/core/ext/filters/client_channel/resolver_registry.cc \ src/core/ext/filters/client_channel/resolver_result_parsing.cc \ @@ -5050,6 +5054,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/ext/filters/client_channel/parse_address.cc \ src/core/ext/filters/client_channel/proxy_mapper.cc \ src/core/ext/filters/client_channel/proxy_mapper_registry.cc \ + src/core/ext/filters/client_channel/request_routing.cc \ src/core/ext/filters/client_channel/resolver.cc \ src/core/ext/filters/client_channel/resolver_registry.cc \ src/core/ext/filters/client_channel/resolver_result_parsing.cc \ @@ -5881,6 +5886,7 @@ LIBGRPC++_CRONET_SRC = \ src/core/ext/filters/client_channel/parse_address.cc \ src/core/ext/filters/client_channel/proxy_mapper.cc \ src/core/ext/filters/client_channel/proxy_mapper_registry.cc \ + src/core/ext/filters/client_channel/request_routing.cc \ src/core/ext/filters/client_channel/resolver.cc \ src/core/ext/filters/client_channel/resolver_registry.cc \ src/core/ext/filters/client_channel/resolver_result_parsing.cc \ diff --git a/build.yaml b/build.yaml index 830123110a..9d73e31b2e 100644 --- a/build.yaml +++ b/build.yaml @@ -584,6 +584,7 @@ filegroups: - src/core/ext/filters/client_channel/parse_address.h - src/core/ext/filters/client_channel/proxy_mapper.h - src/core/ext/filters/client_channel/proxy_mapper_registry.h + - src/core/ext/filters/client_channel/request_routing.h - src/core/ext/filters/client_channel/resolver.h - src/core/ext/filters/client_channel/resolver_factory.h - src/core/ext/filters/client_channel/resolver_registry.h @@ -608,6 +609,7 @@ filegroups: - src/core/ext/filters/client_channel/parse_address.cc - src/core/ext/filters/client_channel/proxy_mapper.cc - src/core/ext/filters/client_channel/proxy_mapper_registry.cc + - src/core/ext/filters/client_channel/request_routing.cc - src/core/ext/filters/client_channel/resolver.cc - src/core/ext/filters/client_channel/resolver_registry.cc - src/core/ext/filters/client_channel/resolver_result_parsing.cc diff --git a/config.m4 b/config.m4 index 16de5204bb..25ffe2148a 100644 --- a/config.m4 +++ b/config.m4 @@ -352,6 +352,7 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/filters/client_channel/parse_address.cc \ src/core/ext/filters/client_channel/proxy_mapper.cc \ src/core/ext/filters/client_channel/proxy_mapper_registry.cc \ + src/core/ext/filters/client_channel/request_routing.cc \ src/core/ext/filters/client_channel/resolver.cc \ src/core/ext/filters/client_channel/resolver_registry.cc \ src/core/ext/filters/client_channel/resolver_result_parsing.cc \ diff --git a/config.w32 b/config.w32 index be10faab9c..b6e71dd09a 100644 --- a/config.w32 +++ b/config.w32 @@ -327,6 +327,7 @@ if (PHP_GRPC != "no") { "src\\core\\ext\\filters\\client_channel\\parse_address.cc " + "src\\core\\ext\\filters\\client_channel\\proxy_mapper.cc " + "src\\core\\ext\\filters\\client_channel\\proxy_mapper_registry.cc " + + "src\\core\\ext\\filters\\client_channel\\request_routing.cc " + "src\\core\\ext\\filters\\client_channel\\resolver.cc " + "src\\core\\ext\\filters\\client_channel\\resolver_registry.cc " + "src\\core\\ext\\filters\\client_channel\\resolver_result_parsing.cc " + diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index f918fe8ad9..29a79dd47a 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -355,6 +355,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/parse_address.h', 'src/core/ext/filters/client_channel/proxy_mapper.h', 'src/core/ext/filters/client_channel/proxy_mapper_registry.h', + 'src/core/ext/filters/client_channel/request_routing.h', 'src/core/ext/filters/client_channel/resolver.h', 'src/core/ext/filters/client_channel/resolver_factory.h', 'src/core/ext/filters/client_channel/resolver_registry.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 0a78733f8d..f873bc693b 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -349,6 +349,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/parse_address.h', 'src/core/ext/filters/client_channel/proxy_mapper.h', 'src/core/ext/filters/client_channel/proxy_mapper_registry.h', + 'src/core/ext/filters/client_channel/request_routing.h', 'src/core/ext/filters/client_channel/resolver.h', 'src/core/ext/filters/client_channel/resolver_factory.h', 'src/core/ext/filters/client_channel/resolver_registry.h', @@ -791,6 +792,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/parse_address.cc', 'src/core/ext/filters/client_channel/proxy_mapper.cc', 'src/core/ext/filters/client_channel/proxy_mapper_registry.cc', + 'src/core/ext/filters/client_channel/request_routing.cc', 'src/core/ext/filters/client_channel/resolver.cc', 'src/core/ext/filters/client_channel/resolver_registry.cc', 'src/core/ext/filters/client_channel/resolver_result_parsing.cc', @@ -970,6 +972,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/parse_address.h', 'src/core/ext/filters/client_channel/proxy_mapper.h', 'src/core/ext/filters/client_channel/proxy_mapper_registry.h', + 'src/core/ext/filters/client_channel/request_routing.h', 'src/core/ext/filters/client_channel/resolver.h', 'src/core/ext/filters/client_channel/resolver_factory.h', 'src/core/ext/filters/client_channel/resolver_registry.h', diff --git a/grpc.gemspec b/grpc.gemspec index 1ee7bec8e7..3c680b044f 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -285,6 +285,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/filters/client_channel/parse_address.h ) s.files += %w( src/core/ext/filters/client_channel/proxy_mapper.h ) s.files += %w( src/core/ext/filters/client_channel/proxy_mapper_registry.h ) + s.files += %w( src/core/ext/filters/client_channel/request_routing.h ) s.files += %w( src/core/ext/filters/client_channel/resolver.h ) s.files += %w( src/core/ext/filters/client_channel/resolver_factory.h ) s.files += %w( src/core/ext/filters/client_channel/resolver_registry.h ) @@ -730,6 +731,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/filters/client_channel/parse_address.cc ) s.files += %w( src/core/ext/filters/client_channel/proxy_mapper.cc ) s.files += %w( src/core/ext/filters/client_channel/proxy_mapper_registry.cc ) + s.files += %w( src/core/ext/filters/client_channel/request_routing.cc ) s.files += %w( src/core/ext/filters/client_channel/resolver.cc ) s.files += %w( src/core/ext/filters/client_channel/resolver_registry.cc ) s.files += %w( src/core/ext/filters/client_channel/resolver_result_parsing.cc ) diff --git a/grpc.gyp b/grpc.gyp index 641235eec2..80b6d0315a 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -534,6 +534,7 @@ 'src/core/ext/filters/client_channel/parse_address.cc', 'src/core/ext/filters/client_channel/proxy_mapper.cc', 'src/core/ext/filters/client_channel/proxy_mapper_registry.cc', + 'src/core/ext/filters/client_channel/request_routing.cc', 'src/core/ext/filters/client_channel/resolver.cc', 'src/core/ext/filters/client_channel/resolver_registry.cc', 'src/core/ext/filters/client_channel/resolver_result_parsing.cc', @@ -794,6 +795,7 @@ 'src/core/ext/filters/client_channel/parse_address.cc', 'src/core/ext/filters/client_channel/proxy_mapper.cc', 'src/core/ext/filters/client_channel/proxy_mapper_registry.cc', + 'src/core/ext/filters/client_channel/request_routing.cc', 'src/core/ext/filters/client_channel/resolver.cc', 'src/core/ext/filters/client_channel/resolver_registry.cc', 'src/core/ext/filters/client_channel/resolver_result_parsing.cc', @@ -1035,6 +1037,7 @@ 'src/core/ext/filters/client_channel/parse_address.cc', 'src/core/ext/filters/client_channel/proxy_mapper.cc', 'src/core/ext/filters/client_channel/proxy_mapper_registry.cc', + 'src/core/ext/filters/client_channel/request_routing.cc', 'src/core/ext/filters/client_channel/resolver.cc', 'src/core/ext/filters/client_channel/resolver_registry.cc', 'src/core/ext/filters/client_channel/resolver_result_parsing.cc', @@ -1288,6 +1291,7 @@ 'src/core/ext/filters/client_channel/parse_address.cc', 'src/core/ext/filters/client_channel/proxy_mapper.cc', 'src/core/ext/filters/client_channel/proxy_mapper_registry.cc', + 'src/core/ext/filters/client_channel/request_routing.cc', 'src/core/ext/filters/client_channel/resolver.cc', 'src/core/ext/filters/client_channel/resolver_registry.cc', 'src/core/ext/filters/client_channel/resolver_result_parsing.cc', diff --git a/package.xml b/package.xml index 68fc7433cb..2632fcb276 100644 --- a/package.xml +++ b/package.xml @@ -290,6 +290,7 @@ + @@ -735,6 +736,7 @@ + diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 70aac47231..dd741f1e2d 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -35,10 +35,10 @@ #include "src/core/ext/filters/client_channel/http_connect_handshaker.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h" +#include "src/core/ext/filters/client_channel/request_routing.h" #include "src/core/ext/filters/client_channel/resolver_registry.h" #include "src/core/ext/filters/client_channel/resolver_result_parsing.h" #include "src/core/ext/filters/client_channel/retry_throttle.h" -#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/deadline/deadline_filter.h" #include "src/core/lib/backoff/backoff.h" @@ -63,7 +63,6 @@ #include "src/core/lib/transport/static_metadata.h" #include "src/core/lib/transport/status_metadata.h" -using grpc_core::ServerAddressList; using grpc_core::internal::ClientChannelMethodParams; using grpc_core::internal::ClientChannelMethodParamsTable; using grpc_core::internal::ProcessedResolverResult; @@ -88,31 +87,18 @@ grpc_core::TraceFlag grpc_client_channel_trace(false, "client_channel"); struct external_connectivity_watcher; typedef struct client_channel_channel_data { - grpc_core::OrphanablePtr resolver; - bool started_resolving; + grpc_core::ManualConstructor request_router; + bool deadline_checking_enabled; - grpc_client_channel_factory* client_channel_factory; bool enable_retries; size_t per_rpc_retry_buffer_size; /** combiner protecting all variables below in this data structure */ grpc_combiner* combiner; - /** currently active load balancer */ - grpc_core::OrphanablePtr lb_policy; /** retry throttle data */ grpc_core::RefCountedPtr retry_throttle_data; /** maps method names to method_parameters structs */ grpc_core::RefCountedPtr method_params_table; - /** incoming resolver result - set by resolver.next() */ - grpc_channel_args* resolver_result; - /** a list of closures that are all waiting for resolver result to come in */ - grpc_closure_list waiting_for_resolver_result_closures; - /** resolver callback */ - grpc_closure on_resolver_result_changed; - /** connectivity state being tracked */ - grpc_connectivity_state_tracker state_tracker; - /** when an lb_policy arrives, should we try to exit idle */ - bool exit_idle_when_lb_policy_arrives; /** owning stack */ grpc_channel_stack* owning_stack; /** interested parties (owned) */ @@ -129,418 +115,40 @@ typedef struct client_channel_channel_data { grpc_core::UniquePtr info_lb_policy_name; /** service config in JSON form */ grpc_core::UniquePtr info_service_config_json; - /* backpointer to grpc_channel's channelz node */ - grpc_core::channelz::ClientChannelNode* channelz_channel; - /* caches if the last resolution event contained addresses */ - bool previous_resolution_contained_addresses; } channel_data; -typedef struct { - channel_data* chand; - /** used as an identifier, don't dereference it because the LB policy may be - * non-existing when the callback is run */ - grpc_core::LoadBalancingPolicy* lb_policy; - grpc_closure closure; -} reresolution_request_args; - -/** We create one watcher for each new lb_policy that is returned from a - resolver, to watch for state changes from the lb_policy. When a state - change is seen, we update the channel, and create a new watcher. */ -typedef struct { - channel_data* chand; - grpc_closure on_changed; - grpc_connectivity_state state; - grpc_core::LoadBalancingPolicy* lb_policy; -} lb_policy_connectivity_watcher; - -static void watch_lb_policy_locked(channel_data* chand, - grpc_core::LoadBalancingPolicy* lb_policy, - grpc_connectivity_state current_state); - -static const char* channel_connectivity_state_change_string( - grpc_connectivity_state state) { - switch (state) { - case GRPC_CHANNEL_IDLE: - return "Channel state change to IDLE"; - case GRPC_CHANNEL_CONNECTING: - return "Channel state change to CONNECTING"; - case GRPC_CHANNEL_READY: - return "Channel state change to READY"; - case GRPC_CHANNEL_TRANSIENT_FAILURE: - return "Channel state change to TRANSIENT_FAILURE"; - case GRPC_CHANNEL_SHUTDOWN: - return "Channel state change to SHUTDOWN"; - } - GPR_UNREACHABLE_CODE(return "UNKNOWN"); -} - -static void set_channel_connectivity_state_locked(channel_data* chand, - grpc_connectivity_state state, - grpc_error* error, - const char* reason) { - /* TODO: Improve failure handling: - * - Make it possible for policies to return GRPC_CHANNEL_TRANSIENT_FAILURE. - * - Hand over pending picks from old policies during the switch that happens - * when resolver provides an update. */ - if (chand->lb_policy != nullptr) { - if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - /* cancel picks with wait_for_ready=false */ - chand->lb_policy->CancelMatchingPicksLocked( - /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY, - /* check= */ 0, GRPC_ERROR_REF(error)); - } else if (state == GRPC_CHANNEL_SHUTDOWN) { - /* cancel all picks */ - chand->lb_policy->CancelMatchingPicksLocked(/* mask= */ 0, /* check= */ 0, - GRPC_ERROR_REF(error)); - } - } - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: setting connectivity state to %s", chand, - grpc_connectivity_state_name(state)); - } - if (chand->channelz_channel != nullptr) { - chand->channelz_channel->AddTraceEvent( - grpc_core::channelz::ChannelTrace::Severity::Info, - grpc_slice_from_static_string( - channel_connectivity_state_change_string(state))); - } - grpc_connectivity_state_set(&chand->state_tracker, state, error, reason); -} - -static void on_lb_policy_state_changed_locked(void* arg, grpc_error* error) { - lb_policy_connectivity_watcher* w = - static_cast(arg); - /* check if the notification is for the latest policy */ - if (w->lb_policy == w->chand->lb_policy.get()) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: lb_policy=%p state changed to %s", w->chand, - w->lb_policy, grpc_connectivity_state_name(w->state)); - } - set_channel_connectivity_state_locked(w->chand, w->state, - GRPC_ERROR_REF(error), "lb_changed"); - if (w->state != GRPC_CHANNEL_SHUTDOWN) { - watch_lb_policy_locked(w->chand, w->lb_policy, w->state); - } - } - GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack, "watch_lb_policy"); - gpr_free(w); -} - -static void watch_lb_policy_locked(channel_data* chand, - grpc_core::LoadBalancingPolicy* lb_policy, - grpc_connectivity_state current_state) { - lb_policy_connectivity_watcher* w = - static_cast(gpr_malloc(sizeof(*w))); - GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy"); - w->chand = chand; - GRPC_CLOSURE_INIT(&w->on_changed, on_lb_policy_state_changed_locked, w, - grpc_combiner_scheduler(chand->combiner)); - w->state = current_state; - w->lb_policy = lb_policy; - lb_policy->NotifyOnStateChangeLocked(&w->state, &w->on_changed); -} - -static void start_resolving_locked(channel_data* chand) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: starting name resolution", chand); - } - GPR_ASSERT(!chand->started_resolving); - chand->started_resolving = true; - GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - chand->resolver->NextLocked(&chand->resolver_result, - &chand->on_resolver_result_changed); -} - -// Invoked from the resolver NextLocked() callback when the resolver -// is shutting down. -static void on_resolver_shutdown_locked(channel_data* chand, - grpc_error* error) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: shutting down", chand); - } - if (chand->lb_policy != nullptr) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", chand, - chand->lb_policy.get()); - } - grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(), - chand->interested_parties); - chand->lb_policy.reset(); - } - if (chand->resolver != nullptr) { - // This should never happen; it can only be triggered by a resolver - // implementation spotaneously deciding to report shutdown without - // being orphaned. This code is included just to be defensive. - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: spontaneous shutdown from resolver %p", - chand, chand->resolver.get()); - } - chand->resolver.reset(); - set_channel_connectivity_state_locked( - chand, GRPC_CHANNEL_SHUTDOWN, - GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Resolver spontaneous shutdown", &error, 1), - "resolver_spontaneous_shutdown"); - } - grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures, - GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Channel disconnected", &error, 1)); - GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures); - GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "resolver"); - grpc_channel_args_destroy(chand->resolver_result); - chand->resolver_result = nullptr; - GRPC_ERROR_UNREF(error); -} - -static void request_reresolution_locked(void* arg, grpc_error* error) { - reresolution_request_args* args = - static_cast(arg); - channel_data* chand = args->chand; - // If this invocation is for a stale LB policy, treat it as an LB shutdown - // signal. - if (args->lb_policy != chand->lb_policy.get() || error != GRPC_ERROR_NONE || - chand->resolver == nullptr) { - GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "re-resolution"); - gpr_free(args); - return; - } - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: started name re-resolving", chand); - } - chand->resolver->RequestReresolutionLocked(); - // Give back the closure to the LB policy. - chand->lb_policy->SetReresolutionClosureLocked(&args->closure); -} - -using TraceStringVector = grpc_core::InlinedVector; - -// Creates a new LB policy, replacing any previous one. -// If the new policy is created successfully, sets *connectivity_state and -// *connectivity_error to its initial connectivity state; otherwise, -// leaves them unchanged. -static void create_new_lb_policy_locked( - channel_data* chand, char* lb_policy_name, grpc_json* lb_config, - grpc_connectivity_state* connectivity_state, - grpc_error** connectivity_error, TraceStringVector* trace_strings) { - grpc_core::LoadBalancingPolicy::Args lb_policy_args; - lb_policy_args.combiner = chand->combiner; - lb_policy_args.client_channel_factory = chand->client_channel_factory; - lb_policy_args.args = chand->resolver_result; - lb_policy_args.lb_config = lb_config; - grpc_core::OrphanablePtr new_lb_policy = - grpc_core::LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( - lb_policy_name, lb_policy_args); - if (GPR_UNLIKELY(new_lb_policy == nullptr)) { - gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name); - if (chand->channelz_channel != nullptr) { - char* str; - gpr_asprintf(&str, "Could not create LB policy \'%s\'", lb_policy_name); - trace_strings->push_back(str); - } - } else { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: created new LB policy \"%s\" (%p)", chand, - lb_policy_name, new_lb_policy.get()); - } - if (chand->channelz_channel != nullptr) { - char* str; - gpr_asprintf(&str, "Created new LB policy \'%s\'", lb_policy_name); - trace_strings->push_back(str); - } - // Swap out the LB policy and update the fds in - // chand->interested_parties. - if (chand->lb_policy != nullptr) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", chand, - chand->lb_policy.get()); - } - grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(), - chand->interested_parties); - chand->lb_policy->HandOffPendingPicksLocked(new_lb_policy.get()); - } - chand->lb_policy = std::move(new_lb_policy); - grpc_pollset_set_add_pollset_set(chand->lb_policy->interested_parties(), - chand->interested_parties); - // Set up re-resolution callback. - reresolution_request_args* args = - static_cast(gpr_zalloc(sizeof(*args))); - args->chand = chand; - args->lb_policy = chand->lb_policy.get(); - GRPC_CLOSURE_INIT(&args->closure, request_reresolution_locked, args, - grpc_combiner_scheduler(chand->combiner)); - GRPC_CHANNEL_STACK_REF(chand->owning_stack, "re-resolution"); - chand->lb_policy->SetReresolutionClosureLocked(&args->closure); - // Get the new LB policy's initial connectivity state and start a - // connectivity watch. - GRPC_ERROR_UNREF(*connectivity_error); - *connectivity_state = - chand->lb_policy->CheckConnectivityLocked(connectivity_error); - if (chand->exit_idle_when_lb_policy_arrives) { - chand->lb_policy->ExitIdleLocked(); - chand->exit_idle_when_lb_policy_arrives = false; - } - watch_lb_policy_locked(chand, chand->lb_policy.get(), *connectivity_state); - } -} - -static void maybe_add_trace_message_for_address_changes_locked( - channel_data* chand, TraceStringVector* trace_strings) { - const ServerAddressList* addresses = - grpc_core::FindServerAddressListChannelArg(chand->resolver_result); - const bool resolution_contains_addresses = - addresses != nullptr && addresses->size() > 0; - if (!resolution_contains_addresses && - chand->previous_resolution_contained_addresses) { - trace_strings->push_back(gpr_strdup("Address list became empty")); - } else if (resolution_contains_addresses && - !chand->previous_resolution_contained_addresses) { - trace_strings->push_back(gpr_strdup("Address list became non-empty")); - } - chand->previous_resolution_contained_addresses = - resolution_contains_addresses; -} - -static void concatenate_and_add_channel_trace_locked( - channel_data* chand, TraceStringVector* trace_strings) { - if (!trace_strings->empty()) { - gpr_strvec v; - gpr_strvec_init(&v); - gpr_strvec_add(&v, gpr_strdup("Resolution event: ")); - bool is_first = 1; - for (size_t i = 0; i < trace_strings->size(); ++i) { - if (!is_first) gpr_strvec_add(&v, gpr_strdup(", ")); - is_first = false; - gpr_strvec_add(&v, (*trace_strings)[i]); - } - char* flat; - size_t flat_len = 0; - flat = gpr_strvec_flatten(&v, &flat_len); - chand->channelz_channel->AddTraceEvent( - grpc_core::channelz::ChannelTrace::Severity::Info, - grpc_slice_new(flat, flat_len, gpr_free)); - gpr_strvec_destroy(&v); - } -} - -// Callback invoked when a resolver result is available. -static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { +// Synchronous callback from chand->request_router to process a resolver +// result update. +static bool process_resolver_result_locked(void* arg, + const grpc_channel_args& args, + const char** lb_policy_name, + grpc_json** lb_policy_config) { channel_data* chand = static_cast(arg); + ProcessedResolverResult resolver_result(args, chand->enable_retries); + grpc_core::UniquePtr service_config_json = + resolver_result.service_config_json(); if (grpc_client_channel_trace.enabled()) { - const char* disposition = - chand->resolver_result != nullptr - ? "" - : (error == GRPC_ERROR_NONE ? " (transient error)" - : " (resolver shutdown)"); - gpr_log(GPR_INFO, - "chand=%p: got resolver result: resolver_result=%p error=%s%s", - chand, chand->resolver_result, grpc_error_string(error), - disposition); + gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"", + chand, service_config_json.get()); } - // Handle shutdown. - if (error != GRPC_ERROR_NONE || chand->resolver == nullptr) { - on_resolver_shutdown_locked(chand, GRPC_ERROR_REF(error)); - return; - } - // Data used to set the channel's connectivity state. - bool set_connectivity_state = true; - // We only want to trace the address resolution in the follow cases: - // (a) Address resolution resulted in service config change. - // (b) Address resolution that causes number of backends to go from - // zero to non-zero. - // (c) Address resolution that causes number of backends to go from - // non-zero to zero. - // (d) Address resolution that causes a new LB policy to be created. - // - // we track a list of strings to eventually be concatenated and traced. - TraceStringVector trace_strings; - grpc_connectivity_state connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE; - grpc_error* connectivity_error = - GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy"); - // chand->resolver_result will be null in the case of a transient - // resolution error. In that case, we don't have any new result to - // process, which means that we keep using the previous result (if any). - if (chand->resolver_result == nullptr) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: resolver transient failure", chand); - } - // Don't override connectivity state if we already have an LB policy. - if (chand->lb_policy != nullptr) set_connectivity_state = false; - } else { - // Parse the resolver result. - ProcessedResolverResult resolver_result(chand->resolver_result, - chand->enable_retries); - chand->retry_throttle_data = resolver_result.retry_throttle_data(); - chand->method_params_table = resolver_result.method_params_table(); - grpc_core::UniquePtr service_config_json = - resolver_result.service_config_json(); - if (service_config_json != nullptr && grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"", - chand, service_config_json.get()); - } - grpc_core::UniquePtr lb_policy_name = - resolver_result.lb_policy_name(); - grpc_json* lb_policy_config = resolver_result.lb_policy_config(); - // Check to see if we're already using the right LB policy. - // Note: It's safe to use chand->info_lb_policy_name here without - // taking a lock on chand->info_mu, because this function is the - // only thing that modifies its value, and it can only be invoked - // once at any given time. - bool lb_policy_name_changed = - chand->info_lb_policy_name == nullptr || - strcmp(chand->info_lb_policy_name.get(), lb_policy_name.get()) != 0; - if (chand->lb_policy != nullptr && !lb_policy_name_changed) { - // Continue using the same LB policy. Update with new addresses. - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p: updating existing LB policy \"%s\" (%p)", - chand, lb_policy_name.get(), chand->lb_policy.get()); - } - chand->lb_policy->UpdateLocked(*chand->resolver_result, lb_policy_config); - // No need to set the channel's connectivity state; the existing - // watch on the LB policy will take care of that. - set_connectivity_state = false; - } else { - // Instantiate new LB policy. - create_new_lb_policy_locked(chand, lb_policy_name.get(), lb_policy_config, - &connectivity_state, &connectivity_error, - &trace_strings); - } - // Note: It's safe to use chand->info_service_config_json here without - // taking a lock on chand->info_mu, because this function is the - // only thing that modifies its value, and it can only be invoked - // once at any given time. - if (chand->channelz_channel != nullptr) { - if (((service_config_json == nullptr) != - (chand->info_service_config_json == nullptr)) || - (service_config_json != nullptr && - strcmp(service_config_json.get(), - chand->info_service_config_json.get()) != 0)) { - // TODO(ncteisen): might be worth somehow including a snippet of the - // config in the trace, at the risk of bloating the trace logs. - trace_strings.push_back(gpr_strdup("Service config changed")); - } - maybe_add_trace_message_for_address_changes_locked(chand, &trace_strings); - concatenate_and_add_channel_trace_locked(chand, &trace_strings); - } - // Swap out the data used by cc_get_channel_info(). - gpr_mu_lock(&chand->info_mu); - chand->info_lb_policy_name = std::move(lb_policy_name); - chand->info_service_config_json = std::move(service_config_json); - gpr_mu_unlock(&chand->info_mu); - // Clean up. - grpc_channel_args_destroy(chand->resolver_result); - chand->resolver_result = nullptr; - } - // Set the channel's connectivity state if needed. - if (set_connectivity_state) { - set_channel_connectivity_state_locked( - chand, connectivity_state, connectivity_error, "resolver_result"); - } else { - GRPC_ERROR_UNREF(connectivity_error); - } - // Invoke closures that were waiting for results and renew the watch. - GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures); - chand->resolver->NextLocked(&chand->resolver_result, - &chand->on_resolver_result_changed); + // Update channel state. + chand->retry_throttle_data = resolver_result.retry_throttle_data(); + chand->method_params_table = resolver_result.method_params_table(); + // Swap out the data used by cc_get_channel_info(). + gpr_mu_lock(&chand->info_mu); + chand->info_lb_policy_name = resolver_result.lb_policy_name(); + const bool service_config_changed = + ((service_config_json == nullptr) != + (chand->info_service_config_json == nullptr)) || + (service_config_json != nullptr && + strcmp(service_config_json.get(), + chand->info_service_config_json.get()) != 0); + chand->info_service_config_json = std::move(service_config_json); + gpr_mu_unlock(&chand->info_mu); + // Return results. + *lb_policy_name = chand->info_lb_policy_name.get(); + *lb_policy_config = resolver_result.lb_policy_config(); + return service_config_changed; } static void start_transport_op_locked(void* arg, grpc_error* error_ignored) { @@ -550,15 +158,14 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) { channel_data* chand = static_cast(elem->channel_data); if (op->on_connectivity_state_change != nullptr) { - grpc_connectivity_state_notify_on_state_change( - &chand->state_tracker, op->connectivity_state, - op->on_connectivity_state_change); + chand->request_router->NotifyOnConnectivityStateChange( + op->connectivity_state, op->on_connectivity_state_change); op->on_connectivity_state_change = nullptr; op->connectivity_state = nullptr; } if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) { - if (chand->lb_policy == nullptr) { + if (chand->request_router->lb_policy() == nullptr) { grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"); GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error)); @@ -567,7 +174,8 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) { grpc_error* error = GRPC_ERROR_NONE; grpc_core::LoadBalancingPolicy::PickState pick_state; // Pick must return synchronously, because pick_state.on_complete is null. - GPR_ASSERT(chand->lb_policy->PickLocked(&pick_state, &error)); + GPR_ASSERT( + chand->request_router->lb_policy()->PickLocked(&pick_state, &error)); if (pick_state.connected_subchannel != nullptr) { pick_state.connected_subchannel->Ping(op->send_ping.on_initiate, op->send_ping.on_ack); @@ -586,37 +194,14 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) { } if (op->disconnect_with_error != GRPC_ERROR_NONE) { - if (chand->resolver != nullptr) { - set_channel_connectivity_state_locked( - chand, GRPC_CHANNEL_SHUTDOWN, - GRPC_ERROR_REF(op->disconnect_with_error), "disconnect"); - chand->resolver.reset(); - if (!chand->started_resolving) { - grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures, - GRPC_ERROR_REF(op->disconnect_with_error)); - GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures); - } - if (chand->lb_policy != nullptr) { - grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(), - chand->interested_parties); - chand->lb_policy.reset(); - } - } - GRPC_ERROR_UNREF(op->disconnect_with_error); + chand->request_router->ShutdownLocked(op->disconnect_with_error); } if (op->reset_connect_backoff) { - if (chand->resolver != nullptr) { - chand->resolver->ResetBackoffLocked(); - chand->resolver->RequestReresolutionLocked(); - } - if (chand->lb_policy != nullptr) { - chand->lb_policy->ResetBackoffLocked(); - } + chand->request_router->ResetConnectionBackoffLocked(); } GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "start_transport_op"); - GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE); } @@ -667,12 +252,9 @@ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem, gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); chand->owning_stack = args->channel_stack; - GRPC_CLOSURE_INIT(&chand->on_resolver_result_changed, - on_resolver_result_changed_locked, chand, - grpc_combiner_scheduler(chand->combiner)); + chand->deadline_checking_enabled = + grpc_deadline_checking_enabled(args->channel_args); chand->interested_parties = grpc_pollset_set_create(); - grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, - "client_channel"); grpc_client_channel_start_backup_polling(chand->interested_parties); // Record max per-RPC retry buffer size. const grpc_arg* arg = grpc_channel_args_find( @@ -682,8 +264,6 @@ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem, // Record enable_retries. arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_ENABLE_RETRIES); chand->enable_retries = grpc_channel_arg_get_bool(arg, true); - chand->channelz_channel = nullptr; - chand->previous_resolution_contained_addresses = false; // Record client channel factory. arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_CLIENT_CHANNEL_FACTORY); @@ -695,9 +275,7 @@ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem, return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "client channel factory arg must be a pointer"); } - grpc_client_channel_factory_ref( - static_cast(arg->value.pointer.p)); - chand->client_channel_factory = + grpc_client_channel_factory* client_channel_factory = static_cast(arg->value.pointer.p); // Get server name to resolve, using proxy mapper if needed. arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI); @@ -713,39 +291,24 @@ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem, grpc_channel_args* new_args = nullptr; grpc_proxy_mappers_map_name(arg->value.string, args->channel_args, &proxy_name, &new_args); - // Instantiate resolver. - chand->resolver = grpc_core::ResolverRegistry::CreateResolver( - proxy_name != nullptr ? proxy_name : arg->value.string, - new_args != nullptr ? new_args : args->channel_args, - chand->interested_parties, chand->combiner); - if (proxy_name != nullptr) gpr_free(proxy_name); - if (new_args != nullptr) grpc_channel_args_destroy(new_args); - if (chand->resolver == nullptr) { - return GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed"); - } - chand->deadline_checking_enabled = - grpc_deadline_checking_enabled(args->channel_args); - return GRPC_ERROR_NONE; + // Instantiate request router. + grpc_client_channel_factory_ref(client_channel_factory); + grpc_error* error = GRPC_ERROR_NONE; + chand->request_router.Init( + chand->owning_stack, chand->combiner, client_channel_factory, + chand->interested_parties, &grpc_client_channel_trace, + process_resolver_result_locked, chand, + proxy_name != nullptr ? proxy_name : arg->value.string /* target_uri */, + new_args != nullptr ? new_args : args->channel_args, &error); + gpr_free(proxy_name); + grpc_channel_args_destroy(new_args); + return error; } /* Destructor for channel_data */ static void cc_destroy_channel_elem(grpc_channel_element* elem) { channel_data* chand = static_cast(elem->channel_data); - if (chand->resolver != nullptr) { - // The only way we can get here is if we never started resolving, - // because we take a ref to the channel stack when we start - // resolving and do not release it until the resolver callback is - // invoked after the resolver shuts down. - chand->resolver.reset(); - } - if (chand->client_channel_factory != nullptr) { - grpc_client_channel_factory_unref(chand->client_channel_factory); - } - if (chand->lb_policy != nullptr) { - grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(), - chand->interested_parties); - chand->lb_policy.reset(); - } + chand->request_router.Destroy(); // TODO(roth): Once we convert the filter API to C++, there will no // longer be any need to explicitly reset these smart pointer data members. chand->info_lb_policy_name.reset(); @@ -753,7 +316,6 @@ static void cc_destroy_channel_elem(grpc_channel_element* elem) { chand->retry_throttle_data.reset(); chand->method_params_table.reset(); grpc_client_channel_stop_backup_polling(chand->interested_parties); - grpc_connectivity_state_destroy(&chand->state_tracker); grpc_pollset_set_destroy(chand->interested_parties); GRPC_COMBINER_UNREF(chand->combiner, "client_channel"); gpr_mu_destroy(&chand->info_mu); @@ -810,6 +372,7 @@ static void cc_destroy_channel_elem(grpc_channel_element* elem) { // - add census stats for retries namespace { + struct call_data; // State used for starting a retryable batch on a subchannel call. @@ -894,12 +457,12 @@ struct subchannel_call_retry_state { bool completed_recv_initial_metadata : 1; bool started_recv_trailing_metadata : 1; bool completed_recv_trailing_metadata : 1; + // State for callback processing. subchannel_batch_data* recv_initial_metadata_ready_deferred_batch = nullptr; grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE; subchannel_batch_data* recv_message_ready_deferred_batch = nullptr; grpc_error* recv_message_error = GRPC_ERROR_NONE; subchannel_batch_data* recv_trailing_metadata_internal_batch = nullptr; - // State for callback processing. // NOTE: Do not move this next to the metadata bitfields above. That would // save space but will also result in a data race because compiler will // generate a 2 byte store which overwrites the meta-data fields upon @@ -908,12 +471,12 @@ struct subchannel_call_retry_state { }; // Pending batches stored in call data. -typedef struct { +struct pending_batch { // The pending batch. If nullptr, this slot is empty. grpc_transport_stream_op_batch* batch; // Indicates whether payload for send ops has been cached in call data. bool send_ops_cached; -} pending_batch; +}; /** Call data. Holds a pointer to grpc_subchannel_call and the associated machinery to create such a pointer. @@ -950,11 +513,8 @@ struct call_data { for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches); ++i) { GPR_ASSERT(pending_batches[i].batch == nullptr); } - for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) { - if (pick.subchannel_call_context[i].value != nullptr) { - pick.subchannel_call_context[i].destroy( - pick.subchannel_call_context[i].value); - } + if (have_request) { + request.Destroy(); } } @@ -981,12 +541,11 @@ struct call_data { // Set when we get a cancel_stream op. grpc_error* cancel_error = GRPC_ERROR_NONE; - grpc_core::LoadBalancingPolicy::PickState pick; + grpc_core::ManualConstructor request; + bool have_request = false; grpc_closure pick_closure; - grpc_closure pick_cancel_closure; grpc_polling_entity* pollent = nullptr; - bool pollent_added_to_interested_parties = false; // Batches are added to this list when received from above. // They are removed when we are done handling the batch (i.e., when @@ -1036,6 +595,7 @@ struct call_data { grpc_linked_mdelem* send_trailing_metadata_storage = nullptr; grpc_metadata_batch send_trailing_metadata; }; + } // namespace // Forward declarations. @@ -1438,8 +998,9 @@ static void do_retry(grpc_call_element* elem, "client_channel_call_retry"); calld->subchannel_call = nullptr; } - if (calld->pick.connected_subchannel != nullptr) { - calld->pick.connected_subchannel.reset(); + if (calld->have_request) { + calld->have_request = false; + calld->request.Destroy(); } // Compute backoff delay. grpc_millis next_attempt_time; @@ -1588,6 +1149,7 @@ static bool maybe_retry(grpc_call_element* elem, // namespace { + subchannel_batch_data::subchannel_batch_data(grpc_call_element* elem, call_data* calld, int refcount, bool set_on_complete) @@ -1628,6 +1190,7 @@ void subchannel_batch_data::destroy() { call_data* calld = static_cast(elem->call_data); GRPC_CALL_STACK_UNREF(calld->owning_call, "batch_data"); } + } // namespace // Creates a subchannel_batch_data object on the call's arena with the @@ -2644,17 +2207,18 @@ static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) { const size_t parent_data_size = calld->enable_retries ? sizeof(subchannel_call_retry_state) : 0; const grpc_core::ConnectedSubchannel::CallArgs call_args = { - calld->pollent, // pollent - calld->path, // path - calld->call_start_time, // start_time - calld->deadline, // deadline - calld->arena, // arena - calld->pick.subchannel_call_context, // context - calld->call_combiner, // call_combiner - parent_data_size // parent_data_size + calld->pollent, // pollent + calld->path, // path + calld->call_start_time, // start_time + calld->deadline, // deadline + calld->arena, // arena + calld->request->pick()->subchannel_call_context, // context + calld->call_combiner, // call_combiner + parent_data_size // parent_data_size }; - grpc_error* new_error = calld->pick.connected_subchannel->CreateCall( - call_args, &calld->subchannel_call); + grpc_error* new_error = + calld->request->pick()->connected_subchannel->CreateCall( + call_args, &calld->subchannel_call); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s", chand, calld, calld->subchannel_call, grpc_error_string(new_error)); @@ -2666,7 +2230,8 @@ static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) { if (parent_data_size > 0) { new (grpc_connected_subchannel_call_get_parent_data( calld->subchannel_call)) - subchannel_call_retry_state(calld->pick.subchannel_call_context); + subchannel_call_retry_state( + calld->request->pick()->subchannel_call_context); } pending_batches_resume(elem); } @@ -2678,7 +2243,7 @@ static void pick_done(void* arg, grpc_error* error) { grpc_call_element* elem = static_cast(arg); channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); - if (GPR_UNLIKELY(calld->pick.connected_subchannel == nullptr)) { + if (GPR_UNLIKELY(calld->request->pick()->connected_subchannel == nullptr)) { // Failed to create subchannel. // If there was no error, this is an LB policy drop, in which case // we return an error; otherwise, we may retry. @@ -2707,135 +2272,27 @@ static void pick_done(void* arg, grpc_error* error) { } } -static void maybe_add_call_to_channel_interested_parties_locked( - grpc_call_element* elem) { - channel_data* chand = static_cast(elem->channel_data); - call_data* calld = static_cast(elem->call_data); - if (!calld->pollent_added_to_interested_parties) { - calld->pollent_added_to_interested_parties = true; - grpc_polling_entity_add_to_pollset_set(calld->pollent, - chand->interested_parties); - } -} - -static void maybe_del_call_from_channel_interested_parties_locked( - grpc_call_element* elem) { +// If the channel is in TRANSIENT_FAILURE and the call is not +// wait_for_ready=true, fails the call and returns true. +static bool fail_call_if_in_transient_failure(grpc_call_element* elem) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); - if (calld->pollent_added_to_interested_parties) { - calld->pollent_added_to_interested_parties = false; - grpc_polling_entity_del_from_pollset_set(calld->pollent, - chand->interested_parties); + grpc_transport_stream_op_batch* batch = calld->pending_batches[0].batch; + if (chand->request_router->GetConnectivityState() == + GRPC_CHANNEL_TRANSIENT_FAILURE && + (batch->payload->send_initial_metadata.send_initial_metadata_flags & + GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) { + pending_batches_fail( + elem, + grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "channel is in state TRANSIENT_FAILURE"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), + true /* yield_call_combiner */); + return true; } + return false; } -// Invoked when a pick is completed to leave the client_channel combiner -// and continue processing in the call combiner. -// If needed, removes the call's polling entity from chand->interested_parties. -static void pick_done_locked(grpc_call_element* elem, grpc_error* error) { - call_data* calld = static_cast(elem->call_data); - maybe_del_call_from_channel_interested_parties_locked(elem); - GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_SCHED(&calld->pick_closure, error); -} - -namespace grpc_core { - -// Performs subchannel pick via LB policy. -class LbPicker { - public: - // Starts a pick on chand->lb_policy. - static void StartLocked(grpc_call_element* elem) { - channel_data* chand = static_cast(elem->channel_data); - call_data* calld = static_cast(elem->call_data); - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: starting pick on lb_policy=%p", - chand, calld, chand->lb_policy.get()); - } - // If this is a retry, use the send_initial_metadata payload that - // we've cached; otherwise, use the pending batch. The - // send_initial_metadata batch will be the first pending batch in the - // list, as set by get_batch_index() above. - calld->pick.initial_metadata = - calld->seen_send_initial_metadata - ? &calld->send_initial_metadata - : calld->pending_batches[0] - .batch->payload->send_initial_metadata.send_initial_metadata; - calld->pick.initial_metadata_flags = - calld->seen_send_initial_metadata - ? calld->send_initial_metadata_flags - : calld->pending_batches[0] - .batch->payload->send_initial_metadata - .send_initial_metadata_flags; - GRPC_CLOSURE_INIT(&calld->pick_closure, &LbPicker::DoneLocked, elem, - grpc_combiner_scheduler(chand->combiner)); - calld->pick.on_complete = &calld->pick_closure; - GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback"); - grpc_error* error = GRPC_ERROR_NONE; - const bool pick_done = chand->lb_policy->PickLocked(&calld->pick, &error); - if (GPR_LIKELY(pick_done)) { - // Pick completed synchronously. - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed synchronously", - chand, calld); - } - pick_done_locked(elem, error); - GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback"); - } else { - // Pick will be returned asynchronously. - // Add the polling entity from call_data to the channel_data's - // interested_parties, so that the I/O of the LB policy can be done - // under it. It will be removed in pick_done_locked(). - maybe_add_call_to_channel_interested_parties_locked(elem); - // Request notification on call cancellation. - GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel"); - grpc_call_combiner_set_notify_on_cancel( - calld->call_combiner, - GRPC_CLOSURE_INIT(&calld->pick_cancel_closure, - &LbPicker::CancelLocked, elem, - grpc_combiner_scheduler(chand->combiner))); - } - } - - private: - // Callback invoked by LoadBalancingPolicy::PickLocked() for async picks. - // Unrefs the LB policy and invokes pick_done_locked(). - static void DoneLocked(void* arg, grpc_error* error) { - grpc_call_element* elem = static_cast(arg); - channel_data* chand = static_cast(elem->channel_data); - call_data* calld = static_cast(elem->call_data); - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed asynchronously", - chand, calld); - } - pick_done_locked(elem, GRPC_ERROR_REF(error)); - GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback"); - } - - // Note: This runs under the client_channel combiner, but will NOT be - // holding the call combiner. - static void CancelLocked(void* arg, grpc_error* error) { - grpc_call_element* elem = static_cast(arg); - channel_data* chand = static_cast(elem->channel_data); - call_data* calld = static_cast(elem->call_data); - // Note: chand->lb_policy may have changed since we started our pick, - // in which case we will be cancelling the pick on a policy other than - // the one we started it on. However, this will just be a no-op. - if (GPR_UNLIKELY(error != GRPC_ERROR_NONE && chand->lb_policy != nullptr)) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: cancelling pick from LB policy %p", chand, - calld, chand->lb_policy.get()); - } - chand->lb_policy->CancelPickLocked(&calld->pick, GRPC_ERROR_REF(error)); - } - GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel"); - } -}; - -} // namespace grpc_core - // Applies service config to the call. Must be invoked once we know // that the resolver has returned results to the channel. static void apply_service_config_to_call_locked(grpc_call_element* elem) { @@ -2892,224 +2349,66 @@ static void apply_service_config_to_call_locked(grpc_call_element* elem) { } } -// If the channel is in TRANSIENT_FAILURE and the call is not -// wait_for_ready=true, fails the call and returns true. -static bool fail_call_if_in_transient_failure(grpc_call_element* elem) { - channel_data* chand = static_cast(elem->channel_data); - call_data* calld = static_cast(elem->call_data); - grpc_transport_stream_op_batch* batch = calld->pending_batches[0].batch; - if (grpc_connectivity_state_check(&chand->state_tracker) == - GRPC_CHANNEL_TRANSIENT_FAILURE && - (batch->payload->send_initial_metadata.send_initial_metadata_flags & - GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) { - pending_batches_fail( - elem, - grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "channel is in state TRANSIENT_FAILURE"), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), - true /* yield_call_combiner */); - return true; - } - return false; -} - // Invoked once resolver results are available. -static void process_service_config_and_start_lb_pick_locked( - grpc_call_element* elem) { +static bool maybe_apply_service_config_to_call_locked(void* arg) { + grpc_call_element* elem = static_cast(arg); call_data* calld = static_cast(elem->call_data); // Only get service config data on the first attempt. if (GPR_LIKELY(calld->num_attempts_completed == 0)) { apply_service_config_to_call_locked(elem); // Check this after applying service config, since it may have // affected the call's wait_for_ready value. - if (fail_call_if_in_transient_failure(elem)) return; + if (fail_call_if_in_transient_failure(elem)) return false; } - // Start LB pick. - grpc_core::LbPicker::StartLocked(elem); + return true; } -namespace grpc_core { - -// Handles waiting for a resolver result. -// Used only for the first call on an idle channel. -class ResolverResultWaiter { - public: - explicit ResolverResultWaiter(grpc_call_element* elem) : elem_(elem) { - channel_data* chand = static_cast(elem->channel_data); - call_data* calld = static_cast(elem->call_data); - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: deferring pick pending resolver result", - chand, calld); - } - // Add closure to be run when a resolver result is available. - GRPC_CLOSURE_INIT(&done_closure_, &ResolverResultWaiter::DoneLocked, this, - grpc_combiner_scheduler(chand->combiner)); - AddToWaitingList(); - // Set cancellation closure, so that we abort if the call is cancelled. - GRPC_CLOSURE_INIT(&cancel_closure_, &ResolverResultWaiter::CancelLocked, - this, grpc_combiner_scheduler(chand->combiner)); - grpc_call_combiner_set_notify_on_cancel(calld->call_combiner, - &cancel_closure_); - } - - private: - // Adds closure_ to chand->waiting_for_resolver_result_closures. - void AddToWaitingList() { - channel_data* chand = static_cast(elem_->channel_data); - grpc_closure_list_append(&chand->waiting_for_resolver_result_closures, - &done_closure_, GRPC_ERROR_NONE); - } - - // Invoked when a resolver result is available. - static void DoneLocked(void* arg, grpc_error* error) { - ResolverResultWaiter* self = static_cast(arg); - // If CancelLocked() has already run, delete ourselves without doing - // anything. Note that the call stack may have already been destroyed, - // so it's not safe to access anything in elem_. - if (GPR_UNLIKELY(self->finished_)) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "call cancelled before resolver result"); - } - Delete(self); - return; - } - // Otherwise, process the resolver result. - grpc_call_element* elem = self->elem_; - channel_data* chand = static_cast(elem->channel_data); - call_data* calld = static_cast(elem->call_data); - if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: resolver failed to return data", - chand, calld); - } - pick_done_locked(elem, GRPC_ERROR_REF(error)); - } else if (GPR_UNLIKELY(chand->resolver == nullptr)) { - // Shutting down. - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: resolver disconnected", chand, - calld); - } - pick_done_locked(elem, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); - } else if (GPR_UNLIKELY(chand->lb_policy == nullptr)) { - // Transient resolver failure. - // If call has wait_for_ready=true, try again; otherwise, fail. - uint32_t send_initial_metadata_flags = - calld->seen_send_initial_metadata - ? calld->send_initial_metadata_flags - : calld->pending_batches[0] - .batch->payload->send_initial_metadata - .send_initial_metadata_flags; - if (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: resolver returned but no LB policy; " - "wait_for_ready=true; trying again", - chand, calld); - } - // Re-add ourselves to the waiting list. - self->AddToWaitingList(); - // Return early so that we don't set finished_ to true below. - return; - } else { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: resolver returned but no LB policy; " - "wait_for_ready=false; failing", - chand, calld); - } - pick_done_locked( - elem, - grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Name resolution failure"), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); - } - } else { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: resolver returned, doing LB pick", - chand, calld); - } - process_service_config_and_start_lb_pick_locked(elem); - } - self->finished_ = true; - } - - // Invoked when the call is cancelled. - // Note: This runs under the client_channel combiner, but will NOT be - // holding the call combiner. - static void CancelLocked(void* arg, grpc_error* error) { - ResolverResultWaiter* self = static_cast(arg); - // If DoneLocked() has already run, delete ourselves without doing anything. - if (GPR_LIKELY(self->finished_)) { - Delete(self); - return; - } - // If we are being cancelled, immediately invoke pick_done_locked() - // to propagate the error back to the caller. - if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { - grpc_call_element* elem = self->elem_; - channel_data* chand = static_cast(elem->channel_data); - call_data* calld = static_cast(elem->call_data); - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: cancelling call waiting for name " - "resolution", - chand, calld); - } - // Note: Although we are not in the call combiner here, we are - // basically stealing the call combiner from the pending pick, so - // it's safe to call pick_done_locked() here -- we are essentially - // calling it here instead of calling it in DoneLocked(). - pick_done_locked(elem, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Pick cancelled", &error, 1)); - } - self->finished_ = true; - } - - grpc_call_element* elem_; - grpc_closure done_closure_; - grpc_closure cancel_closure_; - bool finished_ = false; -}; - -} // namespace grpc_core - static void start_pick_locked(void* arg, grpc_error* ignored) { grpc_call_element* elem = static_cast(arg); call_data* calld = static_cast(elem->call_data); channel_data* chand = static_cast(elem->channel_data); - GPR_ASSERT(calld->pick.connected_subchannel == nullptr); + GPR_ASSERT(!calld->have_request); GPR_ASSERT(calld->subchannel_call == nullptr); - if (GPR_LIKELY(chand->lb_policy != nullptr)) { - // We already have resolver results, so process the service config - // and start an LB pick. - process_service_config_and_start_lb_pick_locked(elem); - } else if (GPR_UNLIKELY(chand->resolver == nullptr)) { - pick_done_locked(elem, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); - } else { - // We do not yet have an LB policy, so wait for a resolver result. - if (GPR_UNLIKELY(!chand->started_resolving)) { - start_resolving_locked(chand); - } else { - // Normally, we want to do this check in - // process_service_config_and_start_lb_pick_locked(), so that we - // can honor the wait_for_ready setting in the service config. - // However, if the channel is in TRANSIENT_FAILURE at this point, that - // means that the resolver has returned a failure, so we're not going - // to get a service config right away. In that case, we fail the - // call now based on the wait_for_ready value passed in from the - // application. - if (fail_call_if_in_transient_failure(elem)) return; - } - // Create a new waiter, which will delete itself when done. - grpc_core::New(elem); - // Add the polling entity from call_data to the channel_data's - // interested_parties, so that the I/O of the resolver can be done - // under it. It will be removed in pick_done_locked(). - maybe_add_call_to_channel_interested_parties_locked(elem); + // Normally, we want to do this check until after we've processed the + // service config, so that we can honor the wait_for_ready setting in + // the service config. However, if the channel is in TRANSIENT_FAILURE + // and we don't have an LB policy at this point, that means that the + // resolver has returned a failure, so we're not going to get a service + // config right away. In that case, we fail the call now based on the + // wait_for_ready value passed in from the application. + if (chand->request_router->lb_policy() == nullptr && + fail_call_if_in_transient_failure(elem)) { + return; } + // If this is a retry, use the send_initial_metadata payload that + // we've cached; otherwise, use the pending batch. The + // send_initial_metadata batch will be the first pending batch in the + // list, as set by get_batch_index() above. + // TODO(roth): What if the LB policy needs to add something to the + // call's initial metadata, and then there's a retry? We don't want + // the new metadata to be added twice. We might need to somehow + // allocate the subchannel batch earlier so that we can give the + // subchannel's copy of the metadata batch (which is copied for each + // attempt) to the LB policy instead the one from the parent channel. + grpc_metadata_batch* initial_metadata = + calld->seen_send_initial_metadata + ? &calld->send_initial_metadata + : calld->pending_batches[0] + .batch->payload->send_initial_metadata.send_initial_metadata; + uint32_t* initial_metadata_flags = + calld->seen_send_initial_metadata + ? &calld->send_initial_metadata_flags + : &calld->pending_batches[0] + .batch->payload->send_initial_metadata + .send_initial_metadata_flags; + GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem, + grpc_schedule_on_exec_ctx); + calld->request.Init(calld->owning_call, calld->call_combiner, calld->pollent, + initial_metadata, initial_metadata_flags, + maybe_apply_service_config_to_call_locked, elem, + &calld->pick_closure); + calld->have_request = true; + chand->request_router->RouteCallLocked(calld->request.get()); } // @@ -3249,23 +2548,10 @@ const grpc_channel_filter grpc_client_channel_filter = { "client-channel", }; -static void try_to_connect_locked(void* arg, grpc_error* error_ignored) { - channel_data* chand = static_cast(arg); - if (chand->lb_policy != nullptr) { - chand->lb_policy->ExitIdleLocked(); - } else { - chand->exit_idle_when_lb_policy_arrives = true; - if (!chand->started_resolving && chand->resolver != nullptr) { - start_resolving_locked(chand); - } - } - GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "try_to_connect"); -} - void grpc_client_channel_set_channelz_node( grpc_channel_element* elem, grpc_core::channelz::ClientChannelNode* node) { channel_data* chand = static_cast(elem->channel_data); - chand->channelz_channel = node; + chand->request_router->set_channelz_node(node); } void grpc_client_channel_populate_child_refs( @@ -3273,17 +2559,22 @@ void grpc_client_channel_populate_child_refs( grpc_core::channelz::ChildRefsList* child_subchannels, grpc_core::channelz::ChildRefsList* child_channels) { channel_data* chand = static_cast(elem->channel_data); - if (chand->lb_policy != nullptr) { - chand->lb_policy->FillChildRefsForChannelz(child_subchannels, - child_channels); + if (chand->request_router->lb_policy() != nullptr) { + chand->request_router->lb_policy()->FillChildRefsForChannelz( + child_subchannels, child_channels); } } +static void try_to_connect_locked(void* arg, grpc_error* error_ignored) { + channel_data* chand = static_cast(arg); + chand->request_router->ExitIdleLocked(); + GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "try_to_connect"); +} + grpc_connectivity_state grpc_client_channel_check_connectivity_state( grpc_channel_element* elem, int try_to_connect) { channel_data* chand = static_cast(elem->channel_data); - grpc_connectivity_state out = - grpc_connectivity_state_check(&chand->state_tracker); + grpc_connectivity_state out = chand->request_router->GetConnectivityState(); if (out == GRPC_CHANNEL_IDLE && try_to_connect) { GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect"); GRPC_CLOSURE_SCHED( @@ -3328,19 +2619,19 @@ static void external_connectivity_watcher_list_append( } static void external_connectivity_watcher_list_remove( - channel_data* chand, external_connectivity_watcher* too_remove) { + channel_data* chand, external_connectivity_watcher* to_remove) { GPR_ASSERT( - lookup_external_connectivity_watcher(chand, too_remove->on_complete)); + lookup_external_connectivity_watcher(chand, to_remove->on_complete)); gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); - if (too_remove == chand->external_connectivity_watcher_list_head) { - chand->external_connectivity_watcher_list_head = too_remove->next; + if (to_remove == chand->external_connectivity_watcher_list_head) { + chand->external_connectivity_watcher_list_head = to_remove->next; gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); return; } external_connectivity_watcher* w = chand->external_connectivity_watcher_list_head; while (w != nullptr) { - if (w->next == too_remove) { + if (w->next == to_remove) { w->next = w->next->next; gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); return; @@ -3392,15 +2683,15 @@ static void watch_connectivity_state_locked(void* arg, GRPC_CLOSURE_RUN(w->watcher_timer_init, GRPC_ERROR_NONE); GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete_locked, w, grpc_combiner_scheduler(w->chand->combiner)); - grpc_connectivity_state_notify_on_state_change(&w->chand->state_tracker, - w->state, &w->my_closure); + w->chand->request_router->NotifyOnConnectivityStateChange(w->state, + &w->my_closure); } else { GPR_ASSERT(w->watcher_timer_init == nullptr); found = lookup_external_connectivity_watcher(w->chand, w->on_complete); if (found) { GPR_ASSERT(found->on_complete == w->on_complete); - grpc_connectivity_state_notify_on_state_change( - &found->chand->state_tracker, nullptr, &found->my_closure); + found->chand->request_router->NotifyOnConnectivityStateChange( + nullptr, &found->my_closure); } grpc_polling_entity_del_from_pollset_set(&w->pollent, w->chand->interested_parties); diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index 29b8c28be6..293d8e960c 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -65,10 +65,10 @@ class LoadBalancingPolicy : public InternallyRefCounted { struct PickState { /// Initial metadata associated with the picking call. grpc_metadata_batch* initial_metadata = nullptr; - /// Bitmask used for selective cancelling. See + /// Pointer to bitmask used for selective cancelling. See /// \a CancelMatchingPicksLocked() and \a GRPC_INITIAL_METADATA_* in /// grpc_types.h. - uint32_t initial_metadata_flags = 0; + uint32_t* initial_metadata_flags = nullptr; /// Storage for LB token in \a initial_metadata, or nullptr if not used. grpc_linked_mdelem lb_token_mdelem_storage; /// Closure to run when pick is complete, if not completed synchronously. @@ -88,6 +88,9 @@ class LoadBalancingPolicy : public InternallyRefCounted { LoadBalancingPolicy(const LoadBalancingPolicy&) = delete; LoadBalancingPolicy& operator=(const LoadBalancingPolicy&) = delete; + /// Returns the name of the LB policy. + virtual const char* name() const GRPC_ABSTRACT; + /// Updates the policy with a new set of \a args and a new \a lb_config from /// the resolver. Note that the LB policy gets the set of addresses from the /// GRPC_ARG_SERVER_ADDRESS_LIST channel arg. diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 3c4f0d6552..ba40febd53 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -122,10 +122,14 @@ TraceFlag grpc_lb_glb_trace(false, "glb"); namespace { +constexpr char kGrpclb[] = "grpclb"; + class GrpcLb : public LoadBalancingPolicy { public: explicit GrpcLb(const Args& args); + const char* name() const override { return kGrpclb; } + void UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) override; bool PickLocked(PickState* pick, grpc_error** error) override; @@ -1136,7 +1140,7 @@ void GrpcLb::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, pending_picks_ = nullptr; while (pp != nullptr) { PendingPick* next = pp->next; - if ((pp->pick->initial_metadata_flags & initial_metadata_flags_mask) == + if ((*pp->pick->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { // Note: pp is deleted in this callback. GRPC_CLOSURE_SCHED(&pp->on_complete, @@ -1819,7 +1823,7 @@ class GrpcLbFactory : public LoadBalancingPolicyFactory { return OrphanablePtr(New(args)); } - const char* name() const override { return "grpclb"; } + const char* name() const override { return kGrpclb; } }; } // namespace diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 74c17612a2..d6ff74ec7f 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -43,10 +43,14 @@ namespace { // pick_first LB policy // +constexpr char kPickFirst[] = "pick_first"; + class PickFirst : public LoadBalancingPolicy { public: explicit PickFirst(const Args& args); + const char* name() const override { return kPickFirst; } + void UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) override; bool PickLocked(PickState* pick, grpc_error** error) override; @@ -234,7 +238,7 @@ void PickFirst::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, pending_picks_ = nullptr; while (pick != nullptr) { PickState* next = pick->next; - if ((pick->initial_metadata_flags & initial_metadata_flags_mask) == + if ((*pick->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( @@ -622,7 +626,7 @@ class PickFirstFactory : public LoadBalancingPolicyFactory { return OrphanablePtr(New(args)); } - const char* name() const override { return "pick_first"; } + const char* name() const override { return kPickFirst; } }; } // namespace diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 63089afbd7..3bcb33ef11 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -53,10 +53,14 @@ namespace { // round_robin LB policy // +constexpr char kRoundRobin[] = "round_robin"; + class RoundRobin : public LoadBalancingPolicy { public: explicit RoundRobin(const Args& args); + const char* name() const override { return kRoundRobin; } + void UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) override; bool PickLocked(PickState* pick, grpc_error** error) override; @@ -291,7 +295,7 @@ void RoundRobin::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, pending_picks_ = nullptr; while (pick != nullptr) { PickState* next = pick->next; - if ((pick->initial_metadata_flags & initial_metadata_flags_mask) == + if ((*pick->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { pick->connected_subchannel.reset(); GRPC_CLOSURE_SCHED(pick->on_complete, @@ -700,7 +704,7 @@ class RoundRobinFactory : public LoadBalancingPolicyFactory { return OrphanablePtr(New(args)); } - const char* name() const override { return "round_robin"; } + const char* name() const override { return kRoundRobin; } }; } // namespace diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc index 3c25de2386..8787f5bcc2 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -115,10 +115,14 @@ TraceFlag grpc_lb_xds_trace(false, "xds"); namespace { +constexpr char kXds[] = "xds_experimental"; + class XdsLb : public LoadBalancingPolicy { public: explicit XdsLb(const Args& args); + const char* name() const override { return kXds; } + void UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) override; bool PickLocked(PickState* pick, grpc_error** error) override; @@ -1053,7 +1057,7 @@ void XdsLb::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, pending_picks_ = nullptr; while (pp != nullptr) { PendingPick* next = pp->next; - if ((pp->pick->initial_metadata_flags & initial_metadata_flags_mask) == + if ((*pp->pick->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { // Note: pp is deleted in this callback. GRPC_CLOSURE_SCHED(&pp->on_complete, @@ -1651,7 +1655,7 @@ class XdsFactory : public LoadBalancingPolicyFactory { return OrphanablePtr(New(args)); } - const char* name() const override { return "xds_experimental"; } + const char* name() const override { return kXds; } }; } // namespace diff --git a/src/core/ext/filters/client_channel/request_routing.cc b/src/core/ext/filters/client_channel/request_routing.cc new file mode 100644 index 0000000000..f9a7e164e7 --- /dev/null +++ b/src/core/ext/filters/client_channel/request_routing.cc @@ -0,0 +1,936 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include + +#include "src/core/ext/filters/client_channel/request_routing.h" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "src/core/ext/filters/client_channel/backup_poller.h" +#include "src/core/ext/filters/client_channel/http_connect_handshaker.h" +#include "src/core/ext/filters/client_channel/lb_policy_registry.h" +#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h" +#include "src/core/ext/filters/client_channel/resolver_registry.h" +#include "src/core/ext/filters/client_channel/retry_throttle.h" +#include "src/core/ext/filters/client_channel/server_address.h" +#include "src/core/ext/filters/client_channel/subchannel.h" +#include "src/core/ext/filters/deadline/deadline_filter.h" +#include "src/core/lib/backoff/backoff.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/connected_channel.h" +#include "src/core/lib/channel/status_util.h" +#include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/inlined_vector.h" +#include "src/core/lib/gprpp/manual_constructor.h" +#include "src/core/lib/iomgr/combiner.h" +#include "src/core/lib/iomgr/iomgr.h" +#include "src/core/lib/iomgr/polling_entity.h" +#include "src/core/lib/profiling/timers.h" +#include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/slice/slice_string_helpers.h" +#include "src/core/lib/surface/channel.h" +#include "src/core/lib/transport/connectivity_state.h" +#include "src/core/lib/transport/error_utils.h" +#include "src/core/lib/transport/metadata.h" +#include "src/core/lib/transport/metadata_batch.h" +#include "src/core/lib/transport/service_config.h" +#include "src/core/lib/transport/static_metadata.h" +#include "src/core/lib/transport/status_metadata.h" + +namespace grpc_core { + +// +// RequestRouter::Request::ResolverResultWaiter +// + +// Handles waiting for a resolver result. +// Used only for the first call on an idle channel. +class RequestRouter::Request::ResolverResultWaiter { + public: + explicit ResolverResultWaiter(Request* request) + : request_router_(request->request_router_), + request_(request), + tracer_enabled_(request_router_->tracer_->enabled()) { + if (tracer_enabled_) { + gpr_log(GPR_INFO, + "request_router=%p request=%p: deferring pick pending resolver " + "result", + request_router_, request); + } + // Add closure to be run when a resolver result is available. + GRPC_CLOSURE_INIT(&done_closure_, &DoneLocked, this, + grpc_combiner_scheduler(request_router_->combiner_)); + AddToWaitingList(); + // Set cancellation closure, so that we abort if the call is cancelled. + GRPC_CLOSURE_INIT(&cancel_closure_, &CancelLocked, this, + grpc_combiner_scheduler(request_router_->combiner_)); + grpc_call_combiner_set_notify_on_cancel(request->call_combiner_, + &cancel_closure_); + } + + private: + // Adds done_closure_ to + // request_router_->waiting_for_resolver_result_closures_. + void AddToWaitingList() { + grpc_closure_list_append( + &request_router_->waiting_for_resolver_result_closures_, &done_closure_, + GRPC_ERROR_NONE); + } + + // Invoked when a resolver result is available. + static void DoneLocked(void* arg, grpc_error* error) { + ResolverResultWaiter* self = static_cast(arg); + RequestRouter* request_router = self->request_router_; + // If CancelLocked() has already run, delete ourselves without doing + // anything. Note that the call stack may have already been destroyed, + // so it's not safe to access anything in state_. + if (GPR_UNLIKELY(self->finished_)) { + if (self->tracer_enabled_) { + gpr_log(GPR_INFO, + "request_router=%p: call cancelled before resolver result", + request_router); + } + Delete(self); + return; + } + // Otherwise, process the resolver result. + Request* request = self->request_; + if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { + if (self->tracer_enabled_) { + gpr_log(GPR_INFO, + "request_router=%p request=%p: resolver failed to return data", + request_router, request); + } + GRPC_CLOSURE_RUN(request->on_route_done_, GRPC_ERROR_REF(error)); + } else if (GPR_UNLIKELY(request_router->resolver_ == nullptr)) { + // Shutting down. + if (self->tracer_enabled_) { + gpr_log(GPR_INFO, "request_router=%p request=%p: resolver disconnected", + request_router, request); + } + GRPC_CLOSURE_RUN(request->on_route_done_, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); + } else if (GPR_UNLIKELY(request_router->lb_policy_ == nullptr)) { + // Transient resolver failure. + // If call has wait_for_ready=true, try again; otherwise, fail. + if (*request->pick_.initial_metadata_flags & + GRPC_INITIAL_METADATA_WAIT_FOR_READY) { + if (self->tracer_enabled_) { + gpr_log(GPR_INFO, + "request_router=%p request=%p: resolver returned but no LB " + "policy; wait_for_ready=true; trying again", + request_router, request); + } + // Re-add ourselves to the waiting list. + self->AddToWaitingList(); + // Return early so that we don't set finished_ to true below. + return; + } else { + if (self->tracer_enabled_) { + gpr_log(GPR_INFO, + "request_router=%p request=%p: resolver returned but no LB " + "policy; wait_for_ready=false; failing", + request_router, request); + } + GRPC_CLOSURE_RUN( + request->on_route_done_, + grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Name resolution failure"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); + } + } else { + if (self->tracer_enabled_) { + gpr_log(GPR_INFO, + "request_router=%p request=%p: resolver returned, doing LB " + "pick", + request_router, request); + } + request->ProcessServiceConfigAndStartLbPickLocked(); + } + self->finished_ = true; + } + + // Invoked when the call is cancelled. + // Note: This runs under the client_channel combiner, but will NOT be + // holding the call combiner. + static void CancelLocked(void* arg, grpc_error* error) { + ResolverResultWaiter* self = static_cast(arg); + RequestRouter* request_router = self->request_router_; + // If DoneLocked() has already run, delete ourselves without doing anything. + if (self->finished_) { + Delete(self); + return; + } + Request* request = self->request_; + // If we are being cancelled, immediately invoke on_route_done_ + // to propagate the error back to the caller. + if (error != GRPC_ERROR_NONE) { + if (self->tracer_enabled_) { + gpr_log(GPR_INFO, + "request_router=%p request=%p: cancelling call waiting for " + "name resolution", + request_router, request); + } + // Note: Although we are not in the call combiner here, we are + // basically stealing the call combiner from the pending pick, so + // it's safe to run on_route_done_ here -- we are essentially + // calling it here instead of calling it in DoneLocked(). + GRPC_CLOSURE_RUN(request->on_route_done_, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Pick cancelled", &error, 1)); + } + self->finished_ = true; + } + + RequestRouter* request_router_; + Request* request_; + const bool tracer_enabled_; + grpc_closure done_closure_; + grpc_closure cancel_closure_; + bool finished_ = false; +}; + +// +// RequestRouter::Request::AsyncPickCanceller +// + +// Handles the call combiner cancellation callback for an async LB pick. +class RequestRouter::Request::AsyncPickCanceller { + public: + explicit AsyncPickCanceller(Request* request) + : request_router_(request->request_router_), + request_(request), + tracer_enabled_(request_router_->tracer_->enabled()) { + GRPC_CALL_STACK_REF(request->owning_call_, "pick_callback_cancel"); + // Set cancellation closure, so that we abort if the call is cancelled. + GRPC_CLOSURE_INIT(&cancel_closure_, &CancelLocked, this, + grpc_combiner_scheduler(request_router_->combiner_)); + grpc_call_combiner_set_notify_on_cancel(request->call_combiner_, + &cancel_closure_); + } + + void MarkFinishedLocked() { + finished_ = true; + GRPC_CALL_STACK_UNREF(request_->owning_call_, "pick_callback_cancel"); + } + + private: + // Invoked when the call is cancelled. + // Note: This runs under the client_channel combiner, but will NOT be + // holding the call combiner. + static void CancelLocked(void* arg, grpc_error* error) { + AsyncPickCanceller* self = static_cast(arg); + Request* request = self->request_; + RequestRouter* request_router = self->request_router_; + if (!self->finished_) { + // Note: request_router->lb_policy_ may have changed since we started our + // pick, in which case we will be cancelling the pick on a policy other + // than the one we started it on. However, this will just be a no-op. + if (error != GRPC_ERROR_NONE && request_router->lb_policy_ != nullptr) { + if (self->tracer_enabled_) { + gpr_log(GPR_INFO, + "request_router=%p request=%p: cancelling pick from LB " + "policy %p", + request_router, request, request_router->lb_policy_.get()); + } + request_router->lb_policy_->CancelPickLocked(&request->pick_, + GRPC_ERROR_REF(error)); + } + request->pick_canceller_ = nullptr; + GRPC_CALL_STACK_UNREF(request->owning_call_, "pick_callback_cancel"); + } + Delete(self); + } + + RequestRouter* request_router_; + Request* request_; + const bool tracer_enabled_; + grpc_closure cancel_closure_; + bool finished_ = false; +}; + +// +// RequestRouter::Request +// + +RequestRouter::Request::Request(grpc_call_stack* owning_call, + grpc_call_combiner* call_combiner, + grpc_polling_entity* pollent, + grpc_metadata_batch* send_initial_metadata, + uint32_t* send_initial_metadata_flags, + ApplyServiceConfigCallback apply_service_config, + void* apply_service_config_user_data, + grpc_closure* on_route_done) + : owning_call_(owning_call), + call_combiner_(call_combiner), + pollent_(pollent), + apply_service_config_(apply_service_config), + apply_service_config_user_data_(apply_service_config_user_data), + on_route_done_(on_route_done) { + pick_.initial_metadata = send_initial_metadata; + pick_.initial_metadata_flags = send_initial_metadata_flags; +} + +RequestRouter::Request::~Request() { + if (pick_.connected_subchannel != nullptr) { + pick_.connected_subchannel.reset(); + } + for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) { + if (pick_.subchannel_call_context[i].destroy != nullptr) { + pick_.subchannel_call_context[i].destroy( + pick_.subchannel_call_context[i].value); + } + } +} + +// Invoked once resolver results are available. +void RequestRouter::Request::ProcessServiceConfigAndStartLbPickLocked() { + // Get service config data if needed. + if (!apply_service_config_(apply_service_config_user_data_)) return; + // Start LB pick. + StartLbPickLocked(); +} + +void RequestRouter::Request::MaybeAddCallToInterestedPartiesLocked() { + if (!pollent_added_to_interested_parties_) { + pollent_added_to_interested_parties_ = true; + grpc_polling_entity_add_to_pollset_set( + pollent_, request_router_->interested_parties_); + } +} + +void RequestRouter::Request::MaybeRemoveCallFromInterestedPartiesLocked() { + if (pollent_added_to_interested_parties_) { + pollent_added_to_interested_parties_ = false; + grpc_polling_entity_del_from_pollset_set( + pollent_, request_router_->interested_parties_); + } +} + +// Starts a pick on the LB policy. +void RequestRouter::Request::StartLbPickLocked() { + if (request_router_->tracer_->enabled()) { + gpr_log(GPR_INFO, + "request_router=%p request=%p: starting pick on lb_policy=%p", + request_router_, this, request_router_->lb_policy_.get()); + } + GRPC_CLOSURE_INIT(&on_pick_done_, &LbPickDoneLocked, this, + grpc_combiner_scheduler(request_router_->combiner_)); + pick_.on_complete = &on_pick_done_; + GRPC_CALL_STACK_REF(owning_call_, "pick_callback"); + grpc_error* error = GRPC_ERROR_NONE; + const bool pick_done = + request_router_->lb_policy_->PickLocked(&pick_, &error); + if (pick_done) { + // Pick completed synchronously. + if (request_router_->tracer_->enabled()) { + gpr_log(GPR_INFO, + "request_router=%p request=%p: pick completed synchronously", + request_router_, this); + } + GRPC_CLOSURE_RUN(on_route_done_, error); + GRPC_CALL_STACK_UNREF(owning_call_, "pick_callback"); + } else { + // Pick will be returned asynchronously. + // Add the request's polling entity to the request_router's + // interested_parties, so that the I/O of the LB policy can be done + // under it. It will be removed in LbPickDoneLocked(). + MaybeAddCallToInterestedPartiesLocked(); + // Request notification on call cancellation. + // We allocate a separate object to track cancellation, since the + // cancellation closure might still be pending when we need to reuse + // the memory in which this Request object is stored for a subsequent + // retry attempt. + pick_canceller_ = New(this); + } +} + +// Callback invoked by LoadBalancingPolicy::PickLocked() for async picks. +// Unrefs the LB policy and invokes on_route_done_. +void RequestRouter::Request::LbPickDoneLocked(void* arg, grpc_error* error) { + Request* self = static_cast(arg); + RequestRouter* request_router = self->request_router_; + if (request_router->tracer_->enabled()) { + gpr_log(GPR_INFO, + "request_router=%p request=%p: pick completed asynchronously", + request_router, self); + } + self->MaybeRemoveCallFromInterestedPartiesLocked(); + if (self->pick_canceller_ != nullptr) { + self->pick_canceller_->MarkFinishedLocked(); + } + GRPC_CLOSURE_RUN(self->on_route_done_, GRPC_ERROR_REF(error)); + GRPC_CALL_STACK_UNREF(self->owning_call_, "pick_callback"); +} + +// +// RequestRouter::LbConnectivityWatcher +// + +class RequestRouter::LbConnectivityWatcher { + public: + LbConnectivityWatcher(RequestRouter* request_router, + grpc_connectivity_state state, + LoadBalancingPolicy* lb_policy, + grpc_channel_stack* owning_stack, + grpc_combiner* combiner) + : request_router_(request_router), + state_(state), + lb_policy_(lb_policy), + owning_stack_(owning_stack) { + GRPC_CHANNEL_STACK_REF(owning_stack_, "LbConnectivityWatcher"); + GRPC_CLOSURE_INIT(&on_changed_, &OnLbPolicyStateChangedLocked, this, + grpc_combiner_scheduler(combiner)); + lb_policy_->NotifyOnStateChangeLocked(&state_, &on_changed_); + } + + ~LbConnectivityWatcher() { + GRPC_CHANNEL_STACK_UNREF(owning_stack_, "LbConnectivityWatcher"); + } + + private: + static void OnLbPolicyStateChangedLocked(void* arg, grpc_error* error) { + LbConnectivityWatcher* self = static_cast(arg); + // If the notification is not for the current policy, we're stale, + // so delete ourselves. + if (self->lb_policy_ != self->request_router_->lb_policy_.get()) { + Delete(self); + return; + } + // Otherwise, process notification. + if (self->request_router_->tracer_->enabled()) { + gpr_log(GPR_INFO, "request_router=%p: lb_policy=%p state changed to %s", + self->request_router_, self->lb_policy_, + grpc_connectivity_state_name(self->state_)); + } + self->request_router_->SetConnectivityStateLocked( + self->state_, GRPC_ERROR_REF(error), "lb_changed"); + // If shutting down, terminate watch. + if (self->state_ == GRPC_CHANNEL_SHUTDOWN) { + Delete(self); + return; + } + // Renew watch. + self->lb_policy_->NotifyOnStateChangeLocked(&self->state_, + &self->on_changed_); + } + + RequestRouter* request_router_; + grpc_connectivity_state state_; + // LB policy address. No ref held, so not safe to dereference unless + // it happens to match request_router->lb_policy_. + LoadBalancingPolicy* lb_policy_; + grpc_channel_stack* owning_stack_; + grpc_closure on_changed_; +}; + +// +// RequestRounter::ReresolutionRequestHandler +// + +class RequestRouter::ReresolutionRequestHandler { + public: + ReresolutionRequestHandler(RequestRouter* request_router, + LoadBalancingPolicy* lb_policy, + grpc_channel_stack* owning_stack, + grpc_combiner* combiner) + : request_router_(request_router), + lb_policy_(lb_policy), + owning_stack_(owning_stack) { + GRPC_CHANNEL_STACK_REF(owning_stack_, "ReresolutionRequestHandler"); + GRPC_CLOSURE_INIT(&closure_, &OnRequestReresolutionLocked, this, + grpc_combiner_scheduler(combiner)); + lb_policy_->SetReresolutionClosureLocked(&closure_); + } + + private: + static void OnRequestReresolutionLocked(void* arg, grpc_error* error) { + ReresolutionRequestHandler* self = + static_cast(arg); + RequestRouter* request_router = self->request_router_; + // If this invocation is for a stale LB policy, treat it as an LB shutdown + // signal. + if (self->lb_policy_ != request_router->lb_policy_.get() || + error != GRPC_ERROR_NONE || request_router->resolver_ == nullptr) { + GRPC_CHANNEL_STACK_UNREF(request_router->owning_stack_, + "ReresolutionRequestHandler"); + Delete(self); + return; + } + if (request_router->tracer_->enabled()) { + gpr_log(GPR_INFO, "request_router=%p: started name re-resolving", + request_router); + } + request_router->resolver_->RequestReresolutionLocked(); + // Give back the closure to the LB policy. + self->lb_policy_->SetReresolutionClosureLocked(&self->closure_); + } + + RequestRouter* request_router_; + // LB policy address. No ref held, so not safe to dereference unless + // it happens to match request_router->lb_policy_. + LoadBalancingPolicy* lb_policy_; + grpc_channel_stack* owning_stack_; + grpc_closure closure_; +}; + +// +// RequestRouter +// + +RequestRouter::RequestRouter( + grpc_channel_stack* owning_stack, grpc_combiner* combiner, + grpc_client_channel_factory* client_channel_factory, + grpc_pollset_set* interested_parties, TraceFlag* tracer, + ProcessResolverResultCallback process_resolver_result, + void* process_resolver_result_user_data, const char* target_uri, + const grpc_channel_args* args, grpc_error** error) + : owning_stack_(owning_stack), + combiner_(combiner), + client_channel_factory_(client_channel_factory), + interested_parties_(interested_parties), + tracer_(tracer), + process_resolver_result_(process_resolver_result), + process_resolver_result_user_data_(process_resolver_result_user_data) { + GRPC_CLOSURE_INIT(&on_resolver_result_changed_, + &RequestRouter::OnResolverResultChangedLocked, this, + grpc_combiner_scheduler(combiner)); + grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, + "request_router"); + grpc_channel_args* new_args = nullptr; + if (process_resolver_result == nullptr) { + grpc_arg arg = grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_SERVICE_CONFIG_DISABLE_RESOLUTION), 0); + new_args = grpc_channel_args_copy_and_add(args, &arg, 1); + } + resolver_ = ResolverRegistry::CreateResolver( + target_uri, (new_args == nullptr ? args : new_args), interested_parties_, + combiner_); + grpc_channel_args_destroy(new_args); + if (resolver_ == nullptr) { + *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed"); + } +} + +RequestRouter::~RequestRouter() { + if (resolver_ != nullptr) { + // The only way we can get here is if we never started resolving, + // because we take a ref to the channel stack when we start + // resolving and do not release it until the resolver callback is + // invoked after the resolver shuts down. + resolver_.reset(); + } + if (lb_policy_ != nullptr) { + grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(), + interested_parties_); + lb_policy_.reset(); + } + if (client_channel_factory_ != nullptr) { + grpc_client_channel_factory_unref(client_channel_factory_); + } + grpc_connectivity_state_destroy(&state_tracker_); +} + +namespace { + +const char* GetChannelConnectivityStateChangeString( + grpc_connectivity_state state) { + switch (state) { + case GRPC_CHANNEL_IDLE: + return "Channel state change to IDLE"; + case GRPC_CHANNEL_CONNECTING: + return "Channel state change to CONNECTING"; + case GRPC_CHANNEL_READY: + return "Channel state change to READY"; + case GRPC_CHANNEL_TRANSIENT_FAILURE: + return "Channel state change to TRANSIENT_FAILURE"; + case GRPC_CHANNEL_SHUTDOWN: + return "Channel state change to SHUTDOWN"; + } + GPR_UNREACHABLE_CODE(return "UNKNOWN"); +} + +} // namespace + +void RequestRouter::SetConnectivityStateLocked(grpc_connectivity_state state, + grpc_error* error, + const char* reason) { + if (lb_policy_ != nullptr) { + if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + // Cancel picks with wait_for_ready=false. + lb_policy_->CancelMatchingPicksLocked( + /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY, + /* check= */ 0, GRPC_ERROR_REF(error)); + } else if (state == GRPC_CHANNEL_SHUTDOWN) { + // Cancel all picks. + lb_policy_->CancelMatchingPicksLocked(/* mask= */ 0, /* check= */ 0, + GRPC_ERROR_REF(error)); + } + } + if (tracer_->enabled()) { + gpr_log(GPR_INFO, "request_router=%p: setting connectivity state to %s", + this, grpc_connectivity_state_name(state)); + } + if (channelz_node_ != nullptr) { + channelz_node_->AddTraceEvent( + channelz::ChannelTrace::Severity::Info, + grpc_slice_from_static_string( + GetChannelConnectivityStateChangeString(state))); + } + grpc_connectivity_state_set(&state_tracker_, state, error, reason); +} + +void RequestRouter::StartResolvingLocked() { + if (tracer_->enabled()) { + gpr_log(GPR_INFO, "request_router=%p: starting name resolution", this); + } + GPR_ASSERT(!started_resolving_); + started_resolving_ = true; + GRPC_CHANNEL_STACK_REF(owning_stack_, "resolver"); + resolver_->NextLocked(&resolver_result_, &on_resolver_result_changed_); +} + +// Invoked from the resolver NextLocked() callback when the resolver +// is shutting down. +void RequestRouter::OnResolverShutdownLocked(grpc_error* error) { + if (tracer_->enabled()) { + gpr_log(GPR_INFO, "request_router=%p: shutting down", this); + } + if (lb_policy_ != nullptr) { + if (tracer_->enabled()) { + gpr_log(GPR_INFO, "request_router=%p: shutting down lb_policy=%p", this, + lb_policy_.get()); + } + grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(), + interested_parties_); + lb_policy_.reset(); + } + if (resolver_ != nullptr) { + // This should never happen; it can only be triggered by a resolver + // implementation spotaneously deciding to report shutdown without + // being orphaned. This code is included just to be defensive. + if (tracer_->enabled()) { + gpr_log(GPR_INFO, + "request_router=%p: spontaneous shutdown from resolver %p", this, + resolver_.get()); + } + resolver_.reset(); + SetConnectivityStateLocked(GRPC_CHANNEL_SHUTDOWN, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Resolver spontaneous shutdown", &error, 1), + "resolver_spontaneous_shutdown"); + } + grpc_closure_list_fail_all(&waiting_for_resolver_result_closures_, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Channel disconnected", &error, 1)); + GRPC_CLOSURE_LIST_SCHED(&waiting_for_resolver_result_closures_); + GRPC_CHANNEL_STACK_UNREF(owning_stack_, "resolver"); + grpc_channel_args_destroy(resolver_result_); + resolver_result_ = nullptr; + GRPC_ERROR_UNREF(error); +} + +// Creates a new LB policy, replacing any previous one. +// If the new policy is created successfully, sets *connectivity_state and +// *connectivity_error to its initial connectivity state; otherwise, +// leaves them unchanged. +void RequestRouter::CreateNewLbPolicyLocked( + const char* lb_policy_name, grpc_json* lb_config, + grpc_connectivity_state* connectivity_state, + grpc_error** connectivity_error, TraceStringVector* trace_strings) { + LoadBalancingPolicy::Args lb_policy_args; + lb_policy_args.combiner = combiner_; + lb_policy_args.client_channel_factory = client_channel_factory_; + lb_policy_args.args = resolver_result_; + lb_policy_args.lb_config = lb_config; + OrphanablePtr new_lb_policy = + LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(lb_policy_name, + lb_policy_args); + if (GPR_UNLIKELY(new_lb_policy == nullptr)) { + gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name); + if (channelz_node_ != nullptr) { + char* str; + gpr_asprintf(&str, "Could not create LB policy \'%s\'", lb_policy_name); + trace_strings->push_back(str); + } + } else { + if (tracer_->enabled()) { + gpr_log(GPR_INFO, "request_router=%p: created new LB policy \"%s\" (%p)", + this, lb_policy_name, new_lb_policy.get()); + } + if (channelz_node_ != nullptr) { + char* str; + gpr_asprintf(&str, "Created new LB policy \'%s\'", lb_policy_name); + trace_strings->push_back(str); + } + // Swap out the LB policy and update the fds in interested_parties_. + if (lb_policy_ != nullptr) { + if (tracer_->enabled()) { + gpr_log(GPR_INFO, "request_router=%p: shutting down lb_policy=%p", this, + lb_policy_.get()); + } + grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(), + interested_parties_); + lb_policy_->HandOffPendingPicksLocked(new_lb_policy.get()); + } + lb_policy_ = std::move(new_lb_policy); + grpc_pollset_set_add_pollset_set(lb_policy_->interested_parties(), + interested_parties_); + // Create re-resolution request handler for the new LB policy. It + // will delete itself when no longer needed. + New(this, lb_policy_.get(), owning_stack_, + combiner_); + // Get the new LB policy's initial connectivity state and start a + // connectivity watch. + GRPC_ERROR_UNREF(*connectivity_error); + *connectivity_state = + lb_policy_->CheckConnectivityLocked(connectivity_error); + if (exit_idle_when_lb_policy_arrives_) { + lb_policy_->ExitIdleLocked(); + exit_idle_when_lb_policy_arrives_ = false; + } + // Create new watcher. It will delete itself when done. + New(this, *connectivity_state, lb_policy_.get(), + owning_stack_, combiner_); + } +} + +void RequestRouter::MaybeAddTraceMessagesForAddressChangesLocked( + TraceStringVector* trace_strings) { + const ServerAddressList* addresses = + FindServerAddressListChannelArg(resolver_result_); + const bool resolution_contains_addresses = + addresses != nullptr && addresses->size() > 0; + if (!resolution_contains_addresses && + previous_resolution_contained_addresses_) { + trace_strings->push_back(gpr_strdup("Address list became empty")); + } else if (resolution_contains_addresses && + !previous_resolution_contained_addresses_) { + trace_strings->push_back(gpr_strdup("Address list became non-empty")); + } + previous_resolution_contained_addresses_ = resolution_contains_addresses; +} + +void RequestRouter::ConcatenateAndAddChannelTraceLocked( + TraceStringVector* trace_strings) const { + if (!trace_strings->empty()) { + gpr_strvec v; + gpr_strvec_init(&v); + gpr_strvec_add(&v, gpr_strdup("Resolution event: ")); + bool is_first = 1; + for (size_t i = 0; i < trace_strings->size(); ++i) { + if (!is_first) gpr_strvec_add(&v, gpr_strdup(", ")); + is_first = false; + gpr_strvec_add(&v, (*trace_strings)[i]); + } + char* flat; + size_t flat_len = 0; + flat = gpr_strvec_flatten(&v, &flat_len); + channelz_node_->AddTraceEvent( + grpc_core::channelz::ChannelTrace::Severity::Info, + grpc_slice_new(flat, flat_len, gpr_free)); + gpr_strvec_destroy(&v); + } +} + +// Callback invoked when a resolver result is available. +void RequestRouter::OnResolverResultChangedLocked(void* arg, + grpc_error* error) { + RequestRouter* self = static_cast(arg); + if (self->tracer_->enabled()) { + const char* disposition = + self->resolver_result_ != nullptr + ? "" + : (error == GRPC_ERROR_NONE ? " (transient error)" + : " (resolver shutdown)"); + gpr_log(GPR_INFO, + "request_router=%p: got resolver result: resolver_result=%p " + "error=%s%s", + self, self->resolver_result_, grpc_error_string(error), + disposition); + } + // Handle shutdown. + if (error != GRPC_ERROR_NONE || self->resolver_ == nullptr) { + self->OnResolverShutdownLocked(GRPC_ERROR_REF(error)); + return; + } + // Data used to set the channel's connectivity state. + bool set_connectivity_state = true; + // We only want to trace the address resolution in the follow cases: + // (a) Address resolution resulted in service config change. + // (b) Address resolution that causes number of backends to go from + // zero to non-zero. + // (c) Address resolution that causes number of backends to go from + // non-zero to zero. + // (d) Address resolution that causes a new LB policy to be created. + // + // we track a list of strings to eventually be concatenated and traced. + TraceStringVector trace_strings; + grpc_connectivity_state connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE; + grpc_error* connectivity_error = + GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy"); + // resolver_result_ will be null in the case of a transient + // resolution error. In that case, we don't have any new result to + // process, which means that we keep using the previous result (if any). + if (self->resolver_result_ == nullptr) { + if (self->tracer_->enabled()) { + gpr_log(GPR_INFO, "request_router=%p: resolver transient failure", self); + } + // Don't override connectivity state if we already have an LB policy. + if (self->lb_policy_ != nullptr) set_connectivity_state = false; + } else { + // Parse the resolver result. + const char* lb_policy_name = nullptr; + grpc_json* lb_policy_config = nullptr; + const bool service_config_changed = self->process_resolver_result_( + self->process_resolver_result_user_data_, *self->resolver_result_, + &lb_policy_name, &lb_policy_config); + GPR_ASSERT(lb_policy_name != nullptr); + // Check to see if we're already using the right LB policy. + const bool lb_policy_name_changed = + self->lb_policy_ == nullptr || + strcmp(self->lb_policy_->name(), lb_policy_name) != 0; + if (self->lb_policy_ != nullptr && !lb_policy_name_changed) { + // Continue using the same LB policy. Update with new addresses. + if (self->tracer_->enabled()) { + gpr_log(GPR_INFO, + "request_router=%p: updating existing LB policy \"%s\" (%p)", + self, lb_policy_name, self->lb_policy_.get()); + } + self->lb_policy_->UpdateLocked(*self->resolver_result_, lb_policy_config); + // No need to set the channel's connectivity state; the existing + // watch on the LB policy will take care of that. + set_connectivity_state = false; + } else { + // Instantiate new LB policy. + self->CreateNewLbPolicyLocked(lb_policy_name, lb_policy_config, + &connectivity_state, &connectivity_error, + &trace_strings); + } + // Add channel trace event. + if (self->channelz_node_ != nullptr) { + if (service_config_changed) { + // TODO(ncteisen): might be worth somehow including a snippet of the + // config in the trace, at the risk of bloating the trace logs. + trace_strings.push_back(gpr_strdup("Service config changed")); + } + self->MaybeAddTraceMessagesForAddressChangesLocked(&trace_strings); + self->ConcatenateAndAddChannelTraceLocked(&trace_strings); + } + // Clean up. + grpc_channel_args_destroy(self->resolver_result_); + self->resolver_result_ = nullptr; + } + // Set the channel's connectivity state if needed. + if (set_connectivity_state) { + self->SetConnectivityStateLocked(connectivity_state, connectivity_error, + "resolver_result"); + } else { + GRPC_ERROR_UNREF(connectivity_error); + } + // Invoke closures that were waiting for results and renew the watch. + GRPC_CLOSURE_LIST_SCHED(&self->waiting_for_resolver_result_closures_); + self->resolver_->NextLocked(&self->resolver_result_, + &self->on_resolver_result_changed_); +} + +void RequestRouter::RouteCallLocked(Request* request) { + GPR_ASSERT(request->pick_.connected_subchannel == nullptr); + request->request_router_ = this; + if (lb_policy_ != nullptr) { + // We already have resolver results, so process the service config + // and start an LB pick. + request->ProcessServiceConfigAndStartLbPickLocked(); + } else if (resolver_ == nullptr) { + GRPC_CLOSURE_RUN(request->on_route_done_, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); + } else { + // We do not yet have an LB policy, so wait for a resolver result. + if (!started_resolving_) { + StartResolvingLocked(); + } + // Create a new waiter, which will delete itself when done. + New(request); + // Add the request's polling entity to the request_router's + // interested_parties, so that the I/O of the resolver can be done + // under it. It will be removed in LbPickDoneLocked(). + request->MaybeAddCallToInterestedPartiesLocked(); + } +} + +void RequestRouter::ShutdownLocked(grpc_error* error) { + if (resolver_ != nullptr) { + SetConnectivityStateLocked(GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), + "disconnect"); + resolver_.reset(); + if (!started_resolving_) { + grpc_closure_list_fail_all(&waiting_for_resolver_result_closures_, + GRPC_ERROR_REF(error)); + GRPC_CLOSURE_LIST_SCHED(&waiting_for_resolver_result_closures_); + } + if (lb_policy_ != nullptr) { + grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(), + interested_parties_); + lb_policy_.reset(); + } + } + GRPC_ERROR_UNREF(error); +} + +grpc_connectivity_state RequestRouter::GetConnectivityState() { + return grpc_connectivity_state_check(&state_tracker_); +} + +void RequestRouter::NotifyOnConnectivityStateChange( + grpc_connectivity_state* state, grpc_closure* closure) { + grpc_connectivity_state_notify_on_state_change(&state_tracker_, state, + closure); +} + +void RequestRouter::ExitIdleLocked() { + if (lb_policy_ != nullptr) { + lb_policy_->ExitIdleLocked(); + } else { + exit_idle_when_lb_policy_arrives_ = true; + if (!started_resolving_ && resolver_ != nullptr) { + StartResolvingLocked(); + } + } +} + +void RequestRouter::ResetConnectionBackoffLocked() { + if (resolver_ != nullptr) { + resolver_->ResetBackoffLocked(); + resolver_->RequestReresolutionLocked(); + } + if (lb_policy_ != nullptr) { + lb_policy_->ResetBackoffLocked(); + } +} + +} // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/request_routing.h b/src/core/ext/filters/client_channel/request_routing.h new file mode 100644 index 0000000000..0c671229c8 --- /dev/null +++ b/src/core/ext/filters/client_channel/request_routing.h @@ -0,0 +1,177 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_REQUEST_ROUTING_H +#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_REQUEST_ROUTING_H + +#include + +#include "src/core/ext/filters/client_channel/client_channel_channelz.h" +#include "src/core/ext/filters/client_channel/client_channel_factory.h" +#include "src/core/ext/filters/client_channel/lb_policy.h" +#include "src/core/ext/filters/client_channel/resolver.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/debug/trace.h" +#include "src/core/lib/gprpp/inlined_vector.h" +#include "src/core/lib/gprpp/orphanable.h" +#include "src/core/lib/iomgr/call_combiner.h" +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/polling_entity.h" +#include "src/core/lib/iomgr/pollset_set.h" +#include "src/core/lib/transport/connectivity_state.h" +#include "src/core/lib/transport/metadata_batch.h" + +namespace grpc_core { + +class RequestRouter { + public: + class Request { + public: + // Synchronous callback that applies the service config to a call. + // Returns false if the call should be failed. + typedef bool (*ApplyServiceConfigCallback)(void* user_data); + + Request(grpc_call_stack* owning_call, grpc_call_combiner* call_combiner, + grpc_polling_entity* pollent, + grpc_metadata_batch* send_initial_metadata, + uint32_t* send_initial_metadata_flags, + ApplyServiceConfigCallback apply_service_config, + void* apply_service_config_user_data, grpc_closure* on_route_done); + + ~Request(); + + // TODO(roth): It seems a bit ugly to expose this member in a + // non-const way. Find a better API to avoid this. + LoadBalancingPolicy::PickState* pick() { return &pick_; } + + private: + friend class RequestRouter; + + class ResolverResultWaiter; + class AsyncPickCanceller; + + void ProcessServiceConfigAndStartLbPickLocked(); + void StartLbPickLocked(); + static void LbPickDoneLocked(void* arg, grpc_error* error); + + void MaybeAddCallToInterestedPartiesLocked(); + void MaybeRemoveCallFromInterestedPartiesLocked(); + + // Populated by caller. + grpc_call_stack* owning_call_; + grpc_call_combiner* call_combiner_; + grpc_polling_entity* pollent_; + ApplyServiceConfigCallback apply_service_config_; + void* apply_service_config_user_data_; + grpc_closure* on_route_done_; + LoadBalancingPolicy::PickState pick_; + + // Internal state. + RequestRouter* request_router_ = nullptr; + bool pollent_added_to_interested_parties_ = false; + grpc_closure on_pick_done_; + AsyncPickCanceller* pick_canceller_ = nullptr; + }; + + // Synchronous callback that takes the service config JSON string and + // LB policy name. + // Returns true if the service config has changed since the last result. + typedef bool (*ProcessResolverResultCallback)(void* user_data, + const grpc_channel_args& args, + const char** lb_policy_name, + grpc_json** lb_policy_config); + + RequestRouter(grpc_channel_stack* owning_stack, grpc_combiner* combiner, + grpc_client_channel_factory* client_channel_factory, + grpc_pollset_set* interested_parties, TraceFlag* tracer, + ProcessResolverResultCallback process_resolver_result, + void* process_resolver_result_user_data, const char* target_uri, + const grpc_channel_args* args, grpc_error** error); + + ~RequestRouter(); + + void set_channelz_node(channelz::ClientChannelNode* channelz_node) { + channelz_node_ = channelz_node; + } + + void RouteCallLocked(Request* request); + + // TODO(roth): Add methods to cancel picks. + + void ShutdownLocked(grpc_error* error); + + void ExitIdleLocked(); + void ResetConnectionBackoffLocked(); + + grpc_connectivity_state GetConnectivityState(); + void NotifyOnConnectivityStateChange(grpc_connectivity_state* state, + grpc_closure* closure); + + LoadBalancingPolicy* lb_policy() const { return lb_policy_.get(); } + + private: + using TraceStringVector = grpc_core::InlinedVector; + + class ReresolutionRequestHandler; + class LbConnectivityWatcher; + + void StartResolvingLocked(); + void OnResolverShutdownLocked(grpc_error* error); + void CreateNewLbPolicyLocked(const char* lb_policy_name, grpc_json* lb_config, + grpc_connectivity_state* connectivity_state, + grpc_error** connectivity_error, + TraceStringVector* trace_strings); + void MaybeAddTraceMessagesForAddressChangesLocked( + TraceStringVector* trace_strings); + void ConcatenateAndAddChannelTraceLocked( + TraceStringVector* trace_strings) const; + static void OnResolverResultChangedLocked(void* arg, grpc_error* error); + + void SetConnectivityStateLocked(grpc_connectivity_state state, + grpc_error* error, const char* reason); + + // Passed in from caller at construction time. + grpc_channel_stack* owning_stack_; + grpc_combiner* combiner_; + grpc_client_channel_factory* client_channel_factory_; + grpc_pollset_set* interested_parties_; + TraceFlag* tracer_; + + channelz::ClientChannelNode* channelz_node_ = nullptr; + + // Resolver and associated state. + OrphanablePtr resolver_; + ProcessResolverResultCallback process_resolver_result_; + void* process_resolver_result_user_data_; + bool started_resolving_ = false; + grpc_channel_args* resolver_result_ = nullptr; + bool previous_resolution_contained_addresses_ = false; + grpc_closure_list waiting_for_resolver_result_closures_; + grpc_closure on_resolver_result_changed_; + + // LB policy and associated state. + OrphanablePtr lb_policy_; + bool exit_idle_when_lb_policy_arrives_ = false; + + grpc_connectivity_state_tracker state_tracker_; +}; + +} // namespace grpc_core + +#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_REQUEST_ROUTING_H */ diff --git a/src/core/ext/filters/client_channel/resolver_result_parsing.cc b/src/core/ext/filters/client_channel/resolver_result_parsing.cc index 22b06db45c..9a0122e8ec 100644 --- a/src/core/ext/filters/client_channel/resolver_result_parsing.cc +++ b/src/core/ext/filters/client_channel/resolver_result_parsing.cc @@ -43,16 +43,16 @@ namespace grpc_core { namespace internal { ProcessedResolverResult::ProcessedResolverResult( - const grpc_channel_args* resolver_result, bool parse_retry) { + const grpc_channel_args& resolver_result, bool parse_retry) { ProcessServiceConfig(resolver_result, parse_retry); // If no LB config was found above, just find the LB policy name then. if (lb_policy_name_ == nullptr) ProcessLbPolicyName(resolver_result); } void ProcessedResolverResult::ProcessServiceConfig( - const grpc_channel_args* resolver_result, bool parse_retry) { + const grpc_channel_args& resolver_result, bool parse_retry) { const grpc_arg* channel_arg = - grpc_channel_args_find(resolver_result, GRPC_ARG_SERVICE_CONFIG); + grpc_channel_args_find(&resolver_result, GRPC_ARG_SERVICE_CONFIG); const char* service_config_json = grpc_channel_arg_get_string(channel_arg); if (service_config_json != nullptr) { service_config_json_.reset(gpr_strdup(service_config_json)); @@ -60,7 +60,7 @@ void ProcessedResolverResult::ProcessServiceConfig( if (service_config_ != nullptr) { if (parse_retry) { channel_arg = - grpc_channel_args_find(resolver_result, GRPC_ARG_SERVER_URI); + grpc_channel_args_find(&resolver_result, GRPC_ARG_SERVER_URI); const char* server_uri = grpc_channel_arg_get_string(channel_arg); GPR_ASSERT(server_uri != nullptr); grpc_uri* uri = grpc_uri_parse(server_uri, true); @@ -78,7 +78,7 @@ void ProcessedResolverResult::ProcessServiceConfig( } void ProcessedResolverResult::ProcessLbPolicyName( - const grpc_channel_args* resolver_result) { + const grpc_channel_args& resolver_result) { // Prefer the LB policy name found in the service config. Note that this is // checking the deprecated loadBalancingPolicy field, rather than the new // loadBalancingConfig field. @@ -96,13 +96,13 @@ void ProcessedResolverResult::ProcessLbPolicyName( // Otherwise, find the LB policy name set by the client API. if (lb_policy_name_ == nullptr) { const grpc_arg* channel_arg = - grpc_channel_args_find(resolver_result, GRPC_ARG_LB_POLICY_NAME); + grpc_channel_args_find(&resolver_result, GRPC_ARG_LB_POLICY_NAME); lb_policy_name_.reset(gpr_strdup(grpc_channel_arg_get_string(channel_arg))); } // Special case: If at least one balancer address is present, we use // the grpclb policy, regardless of what the resolver has returned. const ServerAddressList* addresses = - FindServerAddressListChannelArg(resolver_result); + FindServerAddressListChannelArg(&resolver_result); if (addresses != nullptr) { bool found_balancer_address = false; for (size_t i = 0; i < addresses->size(); ++i) { diff --git a/src/core/ext/filters/client_channel/resolver_result_parsing.h b/src/core/ext/filters/client_channel/resolver_result_parsing.h index f1fb7406bc..98a9d26c46 100644 --- a/src/core/ext/filters/client_channel/resolver_result_parsing.h +++ b/src/core/ext/filters/client_channel/resolver_result_parsing.h @@ -36,8 +36,7 @@ namespace internal { class ClientChannelMethodParams; // A table mapping from a method name to its method parameters. -typedef grpc_core::SliceHashTable< - grpc_core::RefCountedPtr> +typedef SliceHashTable> ClientChannelMethodParamsTable; // A container of processed fields from the resolver result. Simplifies the @@ -47,33 +46,30 @@ class ProcessedResolverResult { // Processes the resolver result and populates the relative members // for later consumption. Tries to parse retry parameters only if parse_retry // is true. - ProcessedResolverResult(const grpc_channel_args* resolver_result, + ProcessedResolverResult(const grpc_channel_args& resolver_result, bool parse_retry); // Getters. Any managed object's ownership is transferred. - grpc_core::UniquePtr service_config_json() { + UniquePtr service_config_json() { return std::move(service_config_json_); } - grpc_core::RefCountedPtr retry_throttle_data() { + RefCountedPtr retry_throttle_data() { return std::move(retry_throttle_data_); } - grpc_core::RefCountedPtr - method_params_table() { + RefCountedPtr method_params_table() { return std::move(method_params_table_); } - grpc_core::UniquePtr lb_policy_name() { - return std::move(lb_policy_name_); - } + UniquePtr lb_policy_name() { return std::move(lb_policy_name_); } grpc_json* lb_policy_config() { return lb_policy_config_; } private: // Finds the service config; extracts LB config and (maybe) retry throttle // params from it. - void ProcessServiceConfig(const grpc_channel_args* resolver_result, + void ProcessServiceConfig(const grpc_channel_args& resolver_result, bool parse_retry); // Finds the LB policy name (when no LB config was found). - void ProcessLbPolicyName(const grpc_channel_args* resolver_result); + void ProcessLbPolicyName(const grpc_channel_args& resolver_result); // Parses the service config. Intended to be used by // ServiceConfig::ParseGlobalParams. @@ -85,16 +81,16 @@ class ProcessedResolverResult { void ParseRetryThrottleParamsFromServiceConfig(const grpc_json* field); // Service config. - grpc_core::UniquePtr service_config_json_; - grpc_core::UniquePtr service_config_; + UniquePtr service_config_json_; + UniquePtr service_config_; // LB policy. grpc_json* lb_policy_config_ = nullptr; - grpc_core::UniquePtr lb_policy_name_; + UniquePtr lb_policy_name_; // Retry throttle data. char* server_name_ = nullptr; - grpc_core::RefCountedPtr retry_throttle_data_; + RefCountedPtr retry_throttle_data_; // Method params table. - grpc_core::RefCountedPtr method_params_table_; + RefCountedPtr method_params_table_; }; // The parameters of a method. diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index c6ca970bee..6a1fd676ca 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -326,6 +326,7 @@ CORE_SOURCE_FILES = [ 'src/core/ext/filters/client_channel/parse_address.cc', 'src/core/ext/filters/client_channel/proxy_mapper.cc', 'src/core/ext/filters/client_channel/proxy_mapper_registry.cc', + 'src/core/ext/filters/client_channel/request_routing.cc', 'src/core/ext/filters/client_channel/resolver.cc', 'src/core/ext/filters/client_channel/resolver_registry.cc', 'src/core/ext/filters/client_channel/resolver_result_parsing.cc', diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 5011e19b03..ba2eaecafd 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -932,6 +932,8 @@ src/core/ext/filters/client_channel/proxy_mapper.cc \ src/core/ext/filters/client_channel/proxy_mapper.h \ src/core/ext/filters/client_channel/proxy_mapper_registry.cc \ src/core/ext/filters/client_channel/proxy_mapper_registry.h \ +src/core/ext/filters/client_channel/request_routing.cc \ +src/core/ext/filters/client_channel/request_routing.h \ src/core/ext/filters/client_channel/resolver.cc \ src/core/ext/filters/client_channel/resolver.h \ src/core/ext/filters/client_channel/resolver/README.md \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 46e5d54c85..336d499be9 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -9840,6 +9840,7 @@ "src/core/ext/filters/client_channel/parse_address.h", "src/core/ext/filters/client_channel/proxy_mapper.h", "src/core/ext/filters/client_channel/proxy_mapper_registry.h", + "src/core/ext/filters/client_channel/request_routing.h", "src/core/ext/filters/client_channel/resolver.h", "src/core/ext/filters/client_channel/resolver_factory.h", "src/core/ext/filters/client_channel/resolver_registry.h", @@ -9882,6 +9883,8 @@ "src/core/ext/filters/client_channel/proxy_mapper.h", "src/core/ext/filters/client_channel/proxy_mapper_registry.cc", "src/core/ext/filters/client_channel/proxy_mapper_registry.h", + "src/core/ext/filters/client_channel/request_routing.cc", + "src/core/ext/filters/client_channel/request_routing.h", "src/core/ext/filters/client_channel/resolver.cc", "src/core/ext/filters/client_channel/resolver.h", "src/core/ext/filters/client_channel/resolver_factory.h", -- cgit v1.2.3 From 1ac4a01a0edd9c1c790157d436b9c76e49b2c539 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Thu, 20 Dec 2018 12:13:08 -0800 Subject: Fix Complain of Higher Version Pylint --- src/python/grpcio_status/grpc_status/rpc_status.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src') diff --git a/src/python/grpcio_status/grpc_status/rpc_status.py b/src/python/grpcio_status/grpc_status/rpc_status.py index e23a20968e..87618fa541 100644 --- a/src/python/grpcio_status/grpc_status/rpc_status.py +++ b/src/python/grpcio_status/grpc_status/rpc_status.py @@ -24,7 +24,7 @@ import grpc import google.protobuf # pylint: disable=unused-import from google.rpc import status_pb2 -_CODE_TO_GRPC_CODE_MAPPING = dict([(x.value[0], x) for x in grpc.StatusCode]) +_CODE_TO_GRPC_CODE_MAPPING = {x.value[0]: x for x in grpc.StatusCode} _GRPC_DETAILS_METADATA_KEY = 'grpc-status-details-bin' -- cgit v1.2.3