diff options
53 files changed, 1475 insertions, 833 deletions
@@ -559,6 +559,16 @@ grpc_cc_library( ) grpc_cc_library( + name = "orphanable", + language = "c++", + public_hdrs = ["src/core/lib/support/orphanable.h"], + deps = [ + "debug_location", + "grpc_trace", + ], +) + +grpc_cc_library( name = "ref_counted", language = "c++", public_hdrs = ["src/core/lib/support/ref_counted.h"], @@ -929,6 +939,8 @@ grpc_cc_library( deps = [ "grpc_base", "grpc_deadline_filter", + "ref_counted", + "ref_counted_ptr", ], ) diff --git a/CMakeLists.txt b/CMakeLists.txt index 93954b412d..97d68cc62c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -563,6 +563,7 @@ add_dependencies(buildtests_cxx memory_test) add_dependencies(buildtests_cxx metrics_client) add_dependencies(buildtests_cxx mock_test) add_dependencies(buildtests_cxx noop-benchmark) +add_dependencies(buildtests_cxx orphanable_test) add_dependencies(buildtests_cxx proto_server_reflection_test) add_dependencies(buildtests_cxx proto_utils_test) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) @@ -10913,6 +10914,43 @@ target_link_libraries(noop-benchmark endif (gRPC_BUILD_TESTS) if (gRPC_BUILD_TESTS) +add_executable(orphanable_test + test/core/support/orphanable_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + + +target_include_directories(orphanable_test + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include + PRIVATE ${_gRPC_SSL_INCLUDE_DIR} + PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR} + PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR} + PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR} + PRIVATE ${_gRPC_CARES_INCLUDE_DIR} + PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR} + PRIVATE third_party/googletest/googletest/include + PRIVATE third_party/googletest/googletest + PRIVATE third_party/googletest/googlemock/include + PRIVATE third_party/googletest/googlemock + PRIVATE ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(orphanable_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util + grpc++ + grpc + gpr_test_util + gpr + ${_gRPC_GFLAGS_LIBRARIES} +) + +endif (gRPC_BUILD_TESTS) +if (gRPC_BUILD_TESTS) + add_executable(proto_server_reflection_test test/cpp/end2end/proto_server_reflection_test.cc third_party/googletest/googletest/src/gtest-all.cc @@ -1160,6 +1160,7 @@ memory_test: $(BINDIR)/$(CONFIG)/memory_test metrics_client: $(BINDIR)/$(CONFIG)/metrics_client mock_test: $(BINDIR)/$(CONFIG)/mock_test noop-benchmark: $(BINDIR)/$(CONFIG)/noop-benchmark +orphanable_test: $(BINDIR)/$(CONFIG)/orphanable_test proto_server_reflection_test: $(BINDIR)/$(CONFIG)/proto_server_reflection_test proto_utils_test: $(BINDIR)/$(CONFIG)/proto_utils_test qps_interarrival_test: $(BINDIR)/$(CONFIG)/qps_interarrival_test @@ -1602,6 +1603,7 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/metrics_client \ $(BINDIR)/$(CONFIG)/mock_test \ $(BINDIR)/$(CONFIG)/noop-benchmark \ + $(BINDIR)/$(CONFIG)/orphanable_test \ $(BINDIR)/$(CONFIG)/proto_server_reflection_test \ $(BINDIR)/$(CONFIG)/proto_utils_test \ $(BINDIR)/$(CONFIG)/qps_interarrival_test \ @@ -1733,6 +1735,7 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/metrics_client \ $(BINDIR)/$(CONFIG)/mock_test \ $(BINDIR)/$(CONFIG)/noop-benchmark \ + $(BINDIR)/$(CONFIG)/orphanable_test \ $(BINDIR)/$(CONFIG)/proto_server_reflection_test \ $(BINDIR)/$(CONFIG)/proto_utils_test \ $(BINDIR)/$(CONFIG)/qps_interarrival_test \ @@ -2142,6 +2145,8 @@ test_cxx: buildtests_cxx $(Q) $(BINDIR)/$(CONFIG)/mock_test || ( echo test mock_test failed ; exit 1 ) $(E) "[RUN] Testing noop-benchmark" $(Q) $(BINDIR)/$(CONFIG)/noop-benchmark || ( echo test noop-benchmark failed ; exit 1 ) + $(E) "[RUN] Testing orphanable_test" + $(Q) $(BINDIR)/$(CONFIG)/orphanable_test || ( echo test orphanable_test failed ; exit 1 ) $(E) "[RUN] Testing proto_server_reflection_test" $(Q) $(BINDIR)/$(CONFIG)/proto_server_reflection_test || ( echo test proto_server_reflection_test failed ; exit 1 ) $(E) "[RUN] Testing proto_utils_test" @@ -16141,6 +16146,49 @@ endif endif +ORPHANABLE_TEST_SRC = \ + test/core/support/orphanable_test.cc \ + +ORPHANABLE_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(ORPHANABLE_TEST_SRC)))) +ifeq ($(NO_SECURE),true) + +# You can't build secure targets if you don't have OpenSSL. + +$(BINDIR)/$(CONFIG)/orphanable_test: openssl_dep_error + +else + + + + +ifeq ($(NO_PROTOBUF),true) + +# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.0.0+. + +$(BINDIR)/$(CONFIG)/orphanable_test: protobuf_dep_error + +else + +$(BINDIR)/$(CONFIG)/orphanable_test: $(PROTOBUF_DEP) $(ORPHANABLE_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + $(E) "[LD] Linking $@" + $(Q) mkdir -p `dirname $@` + $(Q) $(LDXX) $(LDFLAGS) $(ORPHANABLE_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/orphanable_test + +endif + +endif + +$(OBJDIR)/$(CONFIG)/test/core/support/orphanable_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + +deps_orphanable_test: $(ORPHANABLE_TEST_OBJS:.o=.dep) + +ifneq ($(NO_SECURE),true) +ifneq ($(NO_DEPS),true) +-include $(ORPHANABLE_TEST_OBJS:.o=.dep) +endif +endif + + PROTO_SERVER_REFLECTION_TEST_SRC = \ test/cpp/end2end/proto_server_reflection_test.cc \ diff --git a/build.yaml b/build.yaml index c3d54f1e8a..c9adc9b44e 100644 --- a/build.yaml +++ b/build.yaml @@ -397,6 +397,7 @@ filegroups: - src/core/lib/slice/slice_internal.h - src/core/lib/slice/slice_string_helpers.h - src/core/lib/support/debug_location.h + - src/core/lib/support/orphanable.h - src/core/lib/support/ref_counted.h - src/core/lib/support/ref_counted_ptr.h - src/core/lib/support/vector.h @@ -4403,6 +4404,20 @@ targets: deps: - benchmark defaults: benchmark +- name: orphanable_test + gtest: true + build: test + language: c++ + src: + - test/core/support/orphanable_test.cc + deps: + - grpc_test_util + - grpc++ + - grpc + - gpr_test_util + - gpr + uses: + - grpc++_test - name: proto_server_reflection_test gtest: true build: test diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 358fad3d98..df5a4112a7 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -420,6 +420,7 @@ Pod::Spec.new do |s| 'src/core/lib/slice/slice_internal.h', 'src/core/lib/slice/slice_string_helpers.h', 'src/core/lib/support/debug_location.h', + 'src/core/lib/support/orphanable.h', 'src/core/lib/support/ref_counted.h', 'src/core/lib/support/ref_counted_ptr.h', 'src/core/lib/support/vector.h', @@ -901,6 +902,7 @@ Pod::Spec.new do |s| 'src/core/lib/slice/slice_internal.h', 'src/core/lib/slice/slice_string_helpers.h', 'src/core/lib/support/debug_location.h', + 'src/core/lib/support/orphanable.h', 'src/core/lib/support/ref_counted.h', 'src/core/lib/support/ref_counted_ptr.h', 'src/core/lib/support/vector.h', @@ -1085,6 +1087,6 @@ Pod::Spec.new do |s| # TODO (mxyan): Instead of this hack, add include path "third_party" to C core's include path? s.prepare_command = <<-END_OF_COMMAND - find src/core/ -type f -exec sed -E -i '.back' 's;#include "third_party/nanopb/(.*)";#include <nanopb/\\1>;g' {} \\\; + find src/core/ -type f -exec sed -E -i'.back' 's;#include "third_party/nanopb/(.*)";#include <nanopb/\\1>;g' {} \\\; END_OF_COMMAND end diff --git a/grpc.gemspec b/grpc.gemspec index 7547bc85de..f8afaa5803 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -346,6 +346,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/slice/slice_internal.h ) s.files += %w( src/core/lib/slice/slice_string_helpers.h ) s.files += %w( src/core/lib/support/debug_location.h ) + s.files += %w( src/core/lib/support/orphanable.h ) s.files += %w( src/core/lib/support/ref_counted.h ) s.files += %w( src/core/lib/support/ref_counted_ptr.h ) s.files += %w( src/core/lib/support/vector.h ) diff --git a/package.xml b/package.xml index ff3d0797ab..8c590348b6 100644 --- a/package.xml +++ b/package.xml @@ -358,6 +358,7 @@ <file baseinstalldir="/" name="src/core/lib/slice/slice_internal.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/slice/slice_string_helpers.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/support/debug_location.h" role="src" /> + <file baseinstalldir="/" name="src/core/lib/support/orphanable.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/support/ref_counted.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/support/ref_counted_ptr.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/support/vector.h" role="src" /> diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 3f3334d44a..bb29f65af9 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -1003,7 +1003,7 @@ static void create_subchannel_call_locked(grpc_call_element* elem, grpc_error* error) { channel_data* chand = (channel_data*)elem->channel_data; call_data* calld = (call_data*)elem->call_data; - const grpc_connected_subchannel_call_args call_args = { + const grpc_core::ConnectedSubchannel::CallArgs call_args = { calld->pollent, // pollent calld->path, // path calld->call_start_time, // start_time @@ -1012,8 +1012,8 @@ static void create_subchannel_call_locked(grpc_call_element* elem, calld->pick.subchannel_call_context, // context calld->call_combiner // call_combiner }; - grpc_error* new_error = grpc_connected_subchannel_create_call( - calld->pick.connected_subchannel, &call_args, &calld->subchannel_call); + grpc_error* new_error = calld->pick.connected_subchannel->CreateCall( + call_args, &calld->subchannel_call); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s", chand, calld, calld->subchannel_call, grpc_error_string(new_error)); @@ -1463,7 +1463,7 @@ static void cc_destroy_call_elem(grpc_call_element* elem, } GPR_ASSERT(calld->waiting_for_pick_batches_count == 0); if (calld->pick.connected_subchannel != nullptr) { - GRPC_CONNECTED_SUBCHANNEL_UNREF(calld->pick.connected_subchannel, "picked"); + calld->pick.connected_subchannel.reset(); } for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) { if (calld->pick.subchannel_call_context[i].value != nullptr) { diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index 1176a05b78..0cc0cb59c3 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -21,6 +21,7 @@ #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/lib/iomgr/polling_entity.h" +#include "src/core/lib/support/ref_counted_ptr.h" #include "src/core/lib/transport/connectivity_state.h" /** A load balancing policy: specified by a vtable and a struct (which @@ -54,9 +55,9 @@ typedef struct grpc_lb_policy_pick_state { grpc_linked_mdelem lb_token_mdelem_storage; /// Closure to run when pick is complete, if not completed synchronously. grpc_closure* on_complete; - /// Will be set to the selected subchannel, or NULL on failure or when + /// Will be set to the selected subchannel, or nullptr on failure or when /// the LB policy decides to drop the call. - grpc_connected_subchannel* connected_subchannel; + grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> connected_subchannel; /// Will be populated with context to pass to the subchannel call, if needed. grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT]; /// Upon success, \a *user_data will be set to whatever opaque information @@ -152,7 +153,8 @@ void grpc_lb_policy_shutdown_locked(grpc_lb_policy* policy, int grpc_lb_policy_pick_locked(grpc_lb_policy* policy, grpc_lb_policy_pick_state* pick); -/** Perform a connected subchannel ping (see \a grpc_connected_subchannel_ping) +/** Perform a connected subchannel ping (see \a + grpc_core::ConnectedSubchannel::Ping) against one of the connected subchannels managed by \a policy. */ void grpc_lb_policy_ping_one_locked(grpc_lb_policy* policy, grpc_closure* on_initiate, diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 272b3617b2..a8c5fb9c1b 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -939,7 +939,7 @@ static void glb_shutdown_locked(grpc_lb_policy* pol, } gpr_free(pp); } else { - pp->pick->connected_subchannel = nullptr; + pp->pick->connected_subchannel.reset(); GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error)); } pp = next; @@ -976,7 +976,7 @@ static void glb_cancel_pick_locked(grpc_lb_policy* pol, while (pp != nullptr) { pending_pick* next = pp->next; if (pp->pick == pick) { - pick->connected_subchannel = nullptr; + pick->connected_subchannel.reset(); GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 60385272cf..725b78d478 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -81,7 +81,7 @@ static void pf_shutdown_locked(grpc_lb_policy* pol, GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); } } else { - pick->connected_subchannel = nullptr; + pick->connected_subchannel.reset(); GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error)); } } @@ -111,7 +111,7 @@ static void pf_cancel_pick_locked(grpc_lb_policy* pol, while (pp != nullptr) { grpc_lb_policy_pick_state* next = pp->next; if (pp == pick) { - pick->connected_subchannel = nullptr; + pick->connected_subchannel.reset(); GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); @@ -176,8 +176,7 @@ static int pf_pick_locked(grpc_lb_policy* pol, pick_first_lb_policy* p = (pick_first_lb_policy*)pol; // If we have a selected subchannel already, return synchronously. if (p->selected != nullptr) { - pick->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( - p->selected->connected_subchannel, "picked"); + pick->connected_subchannel = p->selected->connected_subchannel; return 1; } // No subchannel selected yet, so handle asynchronously. @@ -217,8 +216,7 @@ static void pf_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate, grpc_closure* on_ack) { pick_first_lb_policy* p = (pick_first_lb_policy*)pol; if (p->selected) { - grpc_connected_subchannel_ping(p->selected->connected_subchannel, - on_initiate, on_ack); + p->selected->connected_subchannel->Ping(on_initiate, on_ack); } else { GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); @@ -297,8 +295,7 @@ static void pf_update_locked(grpc_lb_policy* policy, subchannel_list->num_subchannels); } if (p->selected->connected_subchannel != nullptr) { - sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( - p->selected->connected_subchannel, "pf_update_includes_selected"); + sd->connected_subchannel = p->selected->connected_subchannel; } p->selected = sd; if (p->subchannel_list != nullptr) { @@ -410,8 +407,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { // re-resolution is introduced. But we need to investigate whether we // really want to take any action instead of waiting for the selected // subchannel reconnecting. - if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN || - sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN); + if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { // If the selected channel goes bad, request a re-resolution. grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE, @@ -419,20 +416,19 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { p->started_picking = false; grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace, GRPC_ERROR_NONE); + // in transient failure. Rely on re-resolution to recover. + p->selected = nullptr; + grpc_lb_subchannel_data_stop_connectivity_watch(sd); + grpc_lb_subchannel_list_unref_for_connectivity_watch( + sd->subchannel_list, "pf_selected_shutdown"); + grpc_lb_subchannel_data_unref_subchannel( + sd, "pf_selected_shutdown"); // Unrefs connected subchannel } else { grpc_connectivity_state_set(&p->state_tracker, sd->curr_connectivity_state, GRPC_ERROR_REF(error), "selected_changed"); - } - if (sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN) { // Renew notification. grpc_lb_subchannel_data_start_connectivity_watch(sd); - } else { - p->selected = nullptr; - grpc_lb_subchannel_data_stop_connectivity_watch(sd); - grpc_lb_subchannel_list_unref_for_connectivity_watch( - sd->subchannel_list, "pf_selected_shutdown"); - grpc_lb_subchannel_data_unref_subchannel(sd, "pf_selected_shutdown"); } } return; @@ -450,6 +446,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { case GRPC_CHANNEL_READY: { // Case 2. Promote p->latest_pending_subchannel_list to // p->subchannel_list. + sd->connected_subchannel = + grpc_subchannel_get_connected_subchannel(sd->subchannel); if (sd->subchannel_list == p->latest_pending_subchannel_list) { GPR_ASSERT(p->subchannel_list != nullptr); grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list, @@ -460,9 +458,6 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { // Cases 1 and 2. grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "connecting_ready"); - sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( - grpc_subchannel_get_connected_subchannel(sd->subchannel), - "connected"); p->selected = sd; if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", (void*)p, @@ -474,8 +469,7 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { grpc_lb_policy_pick_state* pick; while ((pick = p->pending_picks)) { p->pending_picks = pick->next; - pick->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( - p->selected->connected_subchannel, "picked"); + pick->connected_subchannel = p->selected->connected_subchannel; if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Servicing pending pick with selected subchannel %p", @@ -520,39 +514,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { grpc_lb_subchannel_data_start_connectivity_watch(sd); break; } - case GRPC_CHANNEL_SHUTDOWN: { - grpc_lb_subchannel_data_stop_connectivity_watch(sd); - grpc_lb_subchannel_data_unref_subchannel(sd, "pf_candidate_shutdown"); - // Advance to next subchannel and check its state. - grpc_lb_subchannel_data* original_sd = sd; - do { - sd->subchannel_list->checking_subchannel = - (sd->subchannel_list->checking_subchannel + 1) % - sd->subchannel_list->num_subchannels; - sd = &sd->subchannel_list - ->subchannels[sd->subchannel_list->checking_subchannel]; - } while (sd->subchannel == nullptr && sd != original_sd); - if (sd == original_sd) { - grpc_lb_subchannel_list_unref_for_connectivity_watch( - sd->subchannel_list, "pf_exhausted_subchannels"); - if (sd->subchannel_list == p->subchannel_list) { - grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE, - GRPC_ERROR_NONE, - "exhausted_subchannels+reresolve"); - p->started_picking = false; - grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace, - GRPC_ERROR_NONE); - } - } else { - if (sd->subchannel_list == p->subchannel_list) { - grpc_connectivity_state_set( - &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_REF(error), "subchannel_failed"); - } - // Reuses the connectivity refs from the previous watch. - grpc_lb_subchannel_data_start_connectivity_watch(sd); - } - } + case GRPC_CHANNEL_SHUTDOWN: + GPR_UNREACHABLE_CODE(break); } } diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 92c7d5bd5d..dca345566a 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -36,6 +36,7 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/support/ref_counted_ptr.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/static_metadata.h" @@ -127,7 +128,7 @@ static void update_last_ready_subchannel_index_locked(round_robin_lb_policy* p, (void*)p, (unsigned long)last_ready_index, (void*)p->subchannel_list->subchannels[last_ready_index].subchannel, (void*)p->subchannel_list->subchannels[last_ready_index] - .connected_subchannel); + .connected_subchannel.get()); } } @@ -162,7 +163,7 @@ static void rr_shutdown_locked(grpc_lb_policy* pol, GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); } } else { - pick->connected_subchannel = nullptr; + pick->connected_subchannel.reset(); GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error)); } } @@ -192,7 +193,7 @@ static void rr_cancel_pick_locked(grpc_lb_policy* pol, while (pp != nullptr) { grpc_lb_policy_pick_state* next = pp->next; if (pp == pick) { - pick->connected_subchannel = nullptr; + pick->connected_subchannel.reset(); GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick cancelled", &error, 1)); @@ -216,7 +217,7 @@ static void rr_cancel_picks_locked(grpc_lb_policy* pol, grpc_lb_policy_pick_state* next = pick->next; if ((pick->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { - pick->connected_subchannel = nullptr; + pick->connected_subchannel.reset(); GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick cancelled", &error, 1)); @@ -262,8 +263,7 @@ static int rr_pick_locked(grpc_lb_policy* pol, /* readily available, report right away */ grpc_lb_subchannel_data* sd = &p->subchannel_list->subchannels[next_ready_index]; - pick->connected_subchannel = - GRPC_CONNECTED_SUBCHANNEL_REF(sd->connected_subchannel, "rr_picked"); + pick->connected_subchannel = sd->connected_subchannel; if (pick->user_data != nullptr) { *pick->user_data = sd->user_data; } @@ -272,8 +272,8 @@ static int rr_pick_locked(grpc_lb_policy* pol, GPR_DEBUG, "[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, " "index %" PRIuPTR ")", - p, sd->subchannel, pick->connected_subchannel, sd->subchannel_list, - next_ready_index); + p, sd->subchannel, pick->connected_subchannel.get(), + sd->subchannel_list, next_ready_index); } /* only advance the last picked pointer if the selection was used */ update_last_ready_subchannel_index_locked(p, next_ready_index); @@ -291,15 +291,14 @@ static int rr_pick_locked(grpc_lb_policy* pol, static void update_state_counters_locked(grpc_lb_subchannel_data* sd) { grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list; + GPR_ASSERT(sd->prev_connectivity_state != GRPC_CHANNEL_SHUTDOWN); + GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN); if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) { GPR_ASSERT(subchannel_list->num_ready > 0); --subchannel_list->num_ready; } else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { GPR_ASSERT(subchannel_list->num_transient_failures > 0); --subchannel_list->num_transient_failures; - } else if (sd->prev_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { - GPR_ASSERT(subchannel_list->num_shutdown > 0); - --subchannel_list->num_shutdown; } else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) { GPR_ASSERT(subchannel_list->num_idle > 0); --subchannel_list->num_idle; @@ -309,8 +308,6 @@ static void update_state_counters_locked(grpc_lb_subchannel_data* sd) { ++subchannel_list->num_ready; } else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { ++subchannel_list->num_transient_failures; - } else if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { - ++subchannel_list->num_shutdown; } else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) { ++subchannel_list->num_idle; } @@ -410,6 +407,7 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { // either the current or latest pending subchannel lists. GPR_ASSERT(sd->subchannel_list == p->subchannel_list || sd->subchannel_list == p->latest_pending_subchannel_list); + GPR_ASSERT(sd->pending_connectivity_state_unsafe != GRPC_CHANNEL_SHUTDOWN); // Now that we're inside the combiner, copy the pending connectivity // state (which was set by the connectivity state watcher) to // curr_connectivity_state, which is what we use inside of the combiner. @@ -417,18 +415,17 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { // Update state counters and new overall state. update_state_counters_locked(sd); update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error)); - // If the sd's new state is SHUTDOWN, unref the subchannel. - if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { - grpc_lb_subchannel_data_stop_connectivity_watch(sd); - grpc_lb_subchannel_data_unref_subchannel(sd, "rr_connectivity_shutdown"); - grpc_lb_subchannel_list_unref_for_connectivity_watch( - sd->subchannel_list, "rr_connectivity_shutdown"); - } else { // sd not in SHUTDOWN - if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { + // If the sd's new state is TRANSIENT_FAILURE, unref the *connected* + // subchannel, if any. + switch (sd->curr_connectivity_state) { + case GRPC_CHANNEL_TRANSIENT_FAILURE: { + sd->connected_subchannel.reset(); + break; + } + case GRPC_CHANNEL_READY: { if (sd->connected_subchannel == nullptr) { - sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( - grpc_subchannel_get_connected_subchannel(sd->subchannel), - "connected"); + sd->connected_subchannel = + grpc_subchannel_get_connected_subchannel(sd->subchannel); } if (sd->subchannel_list != p->subchannel_list) { // promote sd->subchannel_list to p->subchannel_list. @@ -471,8 +468,7 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { grpc_lb_policy_pick_state* pick; while ((pick = p->pending_picks)) { p->pending_picks = pick->next; - pick->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( - selected->connected_subchannel, "rr_picked"); + pick->connected_subchannel = selected->connected_subchannel; if (pick->user_data != nullptr) { *pick->user_data = selected->user_data; } @@ -485,10 +481,15 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { } GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); } + break; } - // Renew notification. - grpc_lb_subchannel_data_start_connectivity_watch(sd); + case GRPC_CHANNEL_SHUTDOWN: + GPR_UNREACHABLE_CODE(); + case GRPC_CHANNEL_CONNECTING: + case GRPC_CHANNEL_IDLE:; // fallthrough } + // Renew notification. + grpc_lb_subchannel_data_start_connectivity_watch(sd); } static grpc_connectivity_state rr_check_connectivity_locked( @@ -512,10 +513,9 @@ static void rr_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate, if (next_ready_index < p->subchannel_list->num_subchannels) { grpc_lb_subchannel_data* selected = &p->subchannel_list->subchannels[next_ready_index]; - grpc_connected_subchannel* target = GRPC_CONNECTED_SUBCHANNEL_REF( - selected->connected_subchannel, "rr_ping"); - grpc_connected_subchannel_ping(target, on_initiate, on_ack); - GRPC_CONNECTED_SUBCHANNEL_UNREF(target, "rr_ping"); + grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> target = + selected->connected_subchannel; + target->Ping(on_initiate, on_ack); } else { GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Round Robin not connected")); diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc index 5ce1298afc..fa2ffcc796 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc @@ -42,10 +42,7 @@ void grpc_lb_subchannel_data_unref_subchannel(grpc_lb_subchannel_data* sd, } GRPC_SUBCHANNEL_UNREF(sd->subchannel, reason); sd->subchannel = nullptr; - if (sd->connected_subchannel != nullptr) { - GRPC_CONNECTED_SUBCHANNEL_UNREF(sd->connected_subchannel, reason); - sd->connected_subchannel = nullptr; - } + sd->connected_subchannel.reset(); if (sd->user_data != nullptr) { GPR_ASSERT(sd->user_data_vtable != nullptr); sd->user_data_vtable->destroy(sd->user_data); diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 0f8cea9347..f146c724e0 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -22,6 +22,7 @@ #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/lib/debug/trace.h" +#include "src/core/lib/support/ref_counted_ptr.h" #include "src/core/lib/transport/connectivity_state.h" // TODO(roth): This code is intended to be shared between pick_first and @@ -43,7 +44,7 @@ typedef struct { grpc_lb_subchannel_list* subchannel_list; /** subchannel itself */ grpc_subchannel* subchannel; - grpc_connected_subchannel* connected_subchannel; + grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> connected_subchannel; /** Is a connectivity notification pending? */ bool connectivity_notification_pending; /** notification that connectivity has changed on subchannel */ diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index a604c55c58..5b8a14b9e7 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -41,6 +41,7 @@ #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/support/debug_location.h" #include "src/core/lib/support/manual_constructor.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/channel_init.h" @@ -55,10 +56,6 @@ #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2 -#define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \ - ((grpc_connected_subchannel*)(gpr_atm_##barrier##_load( \ - &(subchannel)->connected_subchannel))) - namespace { struct state_watcher { grpc_closure closure; @@ -98,7 +95,7 @@ struct grpc_subchannel { grpc_connect_out_args connecting_result; /** callback for connection finishing */ - grpc_closure connected; + grpc_closure on_connected; /** callback for our alarm */ grpc_closure on_alarm; @@ -107,12 +104,13 @@ struct grpc_subchannel { being setup */ grpc_pollset_set* pollset_set; - /** active connection, or null; of type grpc_connected_subchannel */ - gpr_atm connected_subchannel; - /** mutex protecting remaining elements */ gpr_mu mu; + /** active connection, or null; of type grpc_core::ConnectedSubchannel + */ + grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> connected_subchannel; + /** have we seen a disconnection? */ bool disconnected; /** are we connecting */ @@ -136,16 +134,15 @@ struct grpc_subchannel { }; struct grpc_subchannel_call { - grpc_connected_subchannel* connection; + grpc_core::ConnectedSubchannel* connection; grpc_closure* schedule_closure_after_destroy; }; #define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack*)((call) + 1)) -#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack*)(con)) #define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \ (((grpc_subchannel_call*)(callstack)) - 1) -static void subchannel_connected(void* subchannel, grpc_error* error); +static void on_subchannel_connected(void* subchannel, grpc_error* error); #ifndef NDEBUG #define REF_REASON reason @@ -163,20 +160,9 @@ static void subchannel_connected(void* subchannel, grpc_error* error); */ static void connection_destroy(void* arg, grpc_error* error) { - grpc_connected_subchannel* c = (grpc_connected_subchannel*)arg; - grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c)); - gpr_free(c); -} - -grpc_connected_subchannel* grpc_connected_subchannel_ref( - grpc_connected_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON); - return c; -} - -void grpc_connected_subchannel_unref( - grpc_connected_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - GRPC_CHANNEL_STACK_UNREF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON); + grpc_channel_stack* stk = (grpc_channel_stack*)arg; + grpc_channel_stack_destroy(stk); + gpr_free(stk); } /* @@ -243,18 +229,13 @@ grpc_subchannel* grpc_subchannel_ref_from_weak_ref( } static void disconnect(grpc_subchannel* c) { - grpc_connected_subchannel* con; grpc_subchannel_index_unregister(c->key, c); gpr_mu_lock(&c->mu); GPR_ASSERT(!c->disconnected); c->disconnected = true; grpc_connector_shutdown(c->connector, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Subchannel disconnected")); - con = GET_CONNECTED_SUBCHANNEL(c, no_barrier); - if (con != nullptr) { - GRPC_CONNECTED_SUBCHANNEL_UNREF(con, "connection"); - gpr_atm_no_barrier_store(&c->connected_subchannel, (gpr_atm)0xdeadbeef); - } + c->connected_subchannel.reset(); gpr_mu_unlock(&c->mu); } @@ -374,7 +355,7 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, if (new_args != nullptr) grpc_channel_args_destroy(new_args); c->root_external_state_watcher.next = c->root_external_state_watcher.prev = &c->root_external_state_watcher; - GRPC_CLOSURE_INIT(&c->connected, subchannel_connected, c, + GRPC_CLOSURE_INIT(&c->on_connected, on_subchannel_connected, c, grpc_schedule_on_exec_ctx); grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel"); @@ -397,7 +378,7 @@ static void continue_connect_locked(grpc_subchannel* c) { grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, "state_change"); grpc_connector_connect(c->connector, &args, &c->connecting_result, - &c->connected); + &c->on_connected); } grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel* c, @@ -458,7 +439,7 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) { return; } - if (GET_CONNECTED_SUBCHANNEL(c, no_barrier) != nullptr) { + if (c->connected_subchannel != nullptr) { /* Already connected: don't restart */ return; } @@ -481,9 +462,10 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) { const grpc_millis time_til_next = c->next_attempt_deadline - grpc_core::ExecCtx::Get()->Now(); if (time_til_next <= 0) { - gpr_log(GPR_INFO, "Retry immediately"); + gpr_log(GPR_INFO, "Subchannel %p: Retry immediately", c); } else { - gpr_log(GPR_INFO, "Retry in %" PRIdPTR " milliseconds", time_til_next); + gpr_log(GPR_INFO, "Subchannel %p: Retry in %" PRIdPTR " milliseconds", c, + time_til_next); } GRPC_CLOSURE_INIT(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx); grpc_timer_init(&c->alarm, c->next_attempt_deadline, &c->on_alarm); @@ -527,75 +509,56 @@ void grpc_subchannel_notify_on_state_change( } } -void grpc_connected_subchannel_process_transport_op( - grpc_connected_subchannel* con, grpc_transport_op* op) { - grpc_channel_stack* channel_stack = CHANNEL_STACK_FROM_CONNECTION(con); - grpc_channel_element* top_elem = grpc_channel_stack_element(channel_stack, 0); - top_elem->filter->start_transport_op(top_elem, op); -} - -static void subchannel_on_child_state_changed(void* p, grpc_error* error) { - state_watcher* sw = (state_watcher*)p; - grpc_subchannel* c = sw->subchannel; +static void on_connected_subchannel_connectivity_changed(void* p, + grpc_error* error) { + state_watcher* connected_subchannel_watcher = (state_watcher*)p; + grpc_subchannel* c = connected_subchannel_watcher->subchannel; gpr_mu* mu = &c->mu; gpr_mu_lock(mu); - /* if we failed just leave this closure */ - if (sw->connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - /* any errors on a subchannel ==> we're done, create a new one */ - sw->connectivity_state = GRPC_CHANNEL_SHUTDOWN; - } - grpc_connectivity_state_set(&c->state_tracker, sw->connectivity_state, - GRPC_ERROR_REF(error), "reflect_child"); - if (sw->connectivity_state != GRPC_CHANNEL_SHUTDOWN) { - grpc_connected_subchannel_notify_on_state_change( - GET_CONNECTED_SUBCHANNEL(c, no_barrier), nullptr, - &sw->connectivity_state, &sw->closure); - GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); - sw = nullptr; + switch (connected_subchannel_watcher->connectivity_state) { + case GRPC_CHANNEL_TRANSIENT_FAILURE: + case GRPC_CHANNEL_SHUTDOWN: { + if (!c->disconnected && c->connected_subchannel != nullptr) { + if (grpc_trace_stream_refcount.enabled()) { + gpr_log(GPR_INFO, + "Connected subchannel %p of subchannel %p has gone into %s. " + "Attempting to reconnect.", + c->connected_subchannel.get(), c, + grpc_connectivity_state_name( + connected_subchannel_watcher->connectivity_state)); + } + c->connected_subchannel.reset(); + grpc_connectivity_state_set(&c->state_tracker, + GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_REF(error), "reflect_child"); + c->backoff_begun = false; + c->backoff->Reset(); + maybe_start_connecting_locked(c); + } else { + connected_subchannel_watcher->connectivity_state = + GRPC_CHANNEL_SHUTDOWN; + } + break; + } + default: { + grpc_connectivity_state_set( + &c->state_tracker, connected_subchannel_watcher->connectivity_state, + GRPC_ERROR_REF(error), "reflect_child"); + GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); + c->connected_subchannel->NotifyOnStateChange( + nullptr, &connected_subchannel_watcher->connectivity_state, + &connected_subchannel_watcher->closure); + connected_subchannel_watcher = nullptr; + } } - gpr_mu_unlock(mu); GRPC_SUBCHANNEL_WEAK_UNREF(c, "state_watcher"); - gpr_free(sw); -} - -static void connected_subchannel_state_op(grpc_connected_subchannel* con, - grpc_pollset_set* interested_parties, - grpc_connectivity_state* state, - grpc_closure* closure) { - grpc_transport_op* op = grpc_make_transport_op(nullptr); - grpc_channel_element* elem; - op->connectivity_state = state; - op->on_connectivity_state_change = closure; - op->bind_pollset_set = interested_parties; - elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0); - elem->filter->start_transport_op(elem, op); -} - -void grpc_connected_subchannel_notify_on_state_change( - grpc_connected_subchannel* con, grpc_pollset_set* interested_parties, - grpc_connectivity_state* state, grpc_closure* closure) { - connected_subchannel_state_op(con, interested_parties, state, closure); -} - -void grpc_connected_subchannel_ping(grpc_connected_subchannel* con, - grpc_closure* on_initiate, - grpc_closure* on_ack) { - grpc_transport_op* op = grpc_make_transport_op(nullptr); - grpc_channel_element* elem; - op->send_ping.on_initiate = on_initiate; - op->send_ping.on_ack = on_ack; - elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0); - elem->filter->start_transport_op(elem, op); + gpr_free(connected_subchannel_watcher); } static bool publish_transport_locked(grpc_subchannel* c) { - grpc_connected_subchannel* con; - grpc_channel_stack* stk; - state_watcher* sw_subchannel; - /* construct channel stack */ grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create(); grpc_channel_stack_builder_set_channel_arguments( @@ -607,8 +570,9 @@ static bool publish_transport_locked(grpc_subchannel* c) { grpc_channel_stack_builder_destroy(builder); return false; } + grpc_channel_stack* stk; grpc_error* error = grpc_channel_stack_builder_finish( - builder, 0, 1, connection_destroy, nullptr, (void**)&con); + builder, 0, 1, connection_destroy, nullptr, (void**)&stk); if (error != GRPC_ERROR_NONE) { grpc_transport_destroy(c->connecting_result.transport); gpr_log(GPR_ERROR, "error initializing subchannel stack: %s", @@ -616,38 +580,37 @@ static bool publish_transport_locked(grpc_subchannel* c) { GRPC_ERROR_UNREF(error); return false; } - stk = CHANNEL_STACK_FROM_CONNECTION(con); memset(&c->connecting_result, 0, sizeof(c->connecting_result)); /* initialize state watcher */ - sw_subchannel = (state_watcher*)gpr_malloc(sizeof(*sw_subchannel)); - sw_subchannel->subchannel = c; - sw_subchannel->connectivity_state = GRPC_CHANNEL_READY; - GRPC_CLOSURE_INIT(&sw_subchannel->closure, subchannel_on_child_state_changed, - sw_subchannel, grpc_schedule_on_exec_ctx); + state_watcher* connected_subchannel_watcher = + (state_watcher*)gpr_zalloc(sizeof(*connected_subchannel_watcher)); + connected_subchannel_watcher->subchannel = c; + connected_subchannel_watcher->connectivity_state = GRPC_CHANNEL_READY; + GRPC_CLOSURE_INIT(&connected_subchannel_watcher->closure, + on_connected_subchannel_connectivity_changed, + connected_subchannel_watcher, grpc_schedule_on_exec_ctx); if (c->disconnected) { - gpr_free(sw_subchannel); + gpr_free(connected_subchannel_watcher); grpc_channel_stack_destroy(stk); - gpr_free(con); + gpr_free(stk); return false; } /* publish */ - /* TODO(ctiller): this full barrier seems to clear up a TSAN failure. - I'd have expected the rel_cas below to be enough, but - seemingly it's not. - Re-evaluate if we really need this. */ - gpr_atm_full_barrier(); - GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con)); + c->connected_subchannel.reset( + grpc_core::New<grpc_core::ConnectedSubchannel>(stk)); + gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p", + c->connected_subchannel.get(), c); /* setup subchannel watching connected subchannel for changes; subchannel ref for connecting is donated to the state watcher */ GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); - grpc_connected_subchannel_notify_on_state_change( - con, c->pollset_set, &sw_subchannel->connectivity_state, - &sw_subchannel->closure); + c->connected_subchannel->NotifyOnStateChange( + c->pollset_set, &connected_subchannel_watcher->connectivity_state, + &connected_subchannel_watcher->closure); /* signal completion */ grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_READY, @@ -655,11 +618,11 @@ static bool publish_transport_locked(grpc_subchannel* c) { return true; } -static void subchannel_connected(void* arg, grpc_error* error) { +static void on_subchannel_connected(void* arg, grpc_error* error) { grpc_subchannel* c = (grpc_subchannel*)arg; grpc_channel_args* delete_channel_args = c->connecting_result.channel_args; - GRPC_SUBCHANNEL_WEAK_REF(c, "connected"); + GRPC_SUBCHANNEL_WEAK_REF(c, "on_subchannel_connected"); gpr_mu_lock(&c->mu); c->connecting = false; if (c->connecting_result.transport != nullptr && @@ -694,10 +657,10 @@ static void subchannel_call_destroy(void* call, grpc_error* error) { grpc_subchannel_call* c = (grpc_subchannel_call*)call; GPR_ASSERT(c->schedule_closure_after_destroy != nullptr); GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0); - grpc_connected_subchannel* connection = c->connection; + grpc_core::ConnectedSubchannel* connection = c->connection; grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), nullptr, c->schedule_closure_after_destroy); - GRPC_CONNECTED_SUBCHANNEL_UNREF(connection, "subchannel_call"); + connection->Unref(DEBUG_LOCATION, "subchannel_call"); GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0); } @@ -728,9 +691,12 @@ void grpc_subchannel_call_process_op(grpc_subchannel_call* call, GPR_TIMER_END("grpc_subchannel_call_process_op", 0); } -grpc_connected_subchannel* grpc_subchannel_get_connected_subchannel( - grpc_subchannel* c) { - return GET_CONNECTED_SUBCHANNEL(c, acq); +grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> +grpc_subchannel_get_connected_subchannel(grpc_subchannel* c) { + gpr_mu_lock(&c->mu); + auto copy = c->connected_subchannel; + gpr_mu_unlock(&c->mu); + return copy; } const grpc_subchannel_key* grpc_subchannel_get_key( @@ -738,36 +704,6 @@ const grpc_subchannel_key* grpc_subchannel_get_key( return subchannel->key; } -grpc_error* grpc_connected_subchannel_create_call( - grpc_connected_subchannel* con, - const grpc_connected_subchannel_call_args* args, - grpc_subchannel_call** call) { - grpc_channel_stack* chanstk = CHANNEL_STACK_FROM_CONNECTION(con); - *call = (grpc_subchannel_call*)gpr_arena_alloc( - args->arena, sizeof(grpc_subchannel_call) + chanstk->call_stack_size); - grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); - (*call)->connection = GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call"); - const grpc_call_element_args call_args = { - callstk, /* call_stack */ - nullptr, /* server_transport_data */ - args->context, /* context */ - args->path, /* path */ - args->start_time, /* start_time */ - args->deadline, /* deadline */ - args->arena, /* arena */ - args->call_combiner /* call_combiner */ - }; - grpc_error* error = grpc_call_stack_init(chanstk, 1, subchannel_call_destroy, - *call, &call_args); - if (error != GRPC_ERROR_NONE) { - const char* error_string = grpc_error_string(error); - gpr_log(GPR_ERROR, "error: %s", error_string); - return error; - } - grpc_call_stack_set_pollset_or_pollset_set(callstk, args->pollent); - return GRPC_ERROR_NONE; -} - grpc_call_stack* grpc_subchannel_call_get_call_stack( grpc_subchannel_call* subchannel_call) { return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call); @@ -803,3 +739,64 @@ grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) { (char*)GRPC_ARG_SUBCHANNEL_ADDRESS, addr->len > 0 ? grpc_sockaddr_to_uri(addr) : gpr_strdup("")); } + +namespace grpc_core { +ConnectedSubchannel::ConnectedSubchannel(grpc_channel_stack* channel_stack) + : grpc_core::RefCountedWithTracing(&grpc_trace_stream_refcount), + channel_stack_(channel_stack) {} + +ConnectedSubchannel::~ConnectedSubchannel() { + GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor"); +} + +void ConnectedSubchannel::NotifyOnStateChange( + grpc_pollset_set* interested_parties, grpc_connectivity_state* state, + grpc_closure* closure) { + grpc_transport_op* op = grpc_make_transport_op(nullptr); + grpc_channel_element* elem; + op->connectivity_state = state; + op->on_connectivity_state_change = closure; + op->bind_pollset_set = interested_parties; + elem = grpc_channel_stack_element(channel_stack_, 0); + elem->filter->start_transport_op(elem, op); +} + +void ConnectedSubchannel::Ping(grpc_closure* on_initiate, + grpc_closure* on_ack) { + grpc_transport_op* op = grpc_make_transport_op(nullptr); + grpc_channel_element* elem; + op->send_ping.on_initiate = on_initiate; + op->send_ping.on_ack = on_ack; + elem = grpc_channel_stack_element(channel_stack_, 0); + elem->filter->start_transport_op(elem, op); +} + +grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args, + grpc_subchannel_call** call) { + *call = (grpc_subchannel_call*)gpr_arena_alloc( + args.arena, + sizeof(grpc_subchannel_call) + channel_stack_->call_stack_size); + grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); + Ref(DEBUG_LOCATION, "subchannel_call"); + (*call)->connection = this; + const grpc_call_element_args call_args = { + callstk, /* call_stack */ + nullptr, /* server_transport_data */ + args.context, /* context */ + args.path, /* path */ + args.start_time, /* start_time */ + args.deadline, /* deadline */ + args.arena, /* arena */ + args.call_combiner /* call_combiner */ + }; + grpc_error* error = grpc_call_stack_init( + channel_stack_, 1, subchannel_call_destroy, *call, &call_args); + if (error != GRPC_ERROR_NONE) { + const char* error_string = grpc_error_string(error); + gpr_log(GPR_ERROR, "error: %s", error_string); + return error; + } + grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent); + return GRPC_ERROR_NONE; +} +} // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index 9d34fff07a..3bcc5c2432 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -23,6 +23,8 @@ #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/support/arena.h" +#include "src/core/lib/support/ref_counted.h" +#include "src/core/lib/support/ref_counted_ptr.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/metadata.h" @@ -32,7 +34,6 @@ /** A (sub-)channel that knows how to connect to exactly one target address. Provides a target for load balancing. */ typedef struct grpc_subchannel grpc_subchannel; -typedef struct grpc_connected_subchannel grpc_connected_subchannel; typedef struct grpc_subchannel_call grpc_subchannel_call; typedef struct grpc_subchannel_args grpc_subchannel_args; typedef struct grpc_subchannel_key grpc_subchannel_key; @@ -48,10 +49,6 @@ typedef struct grpc_subchannel_key grpc_subchannel_key; grpc_subchannel_weak_ref((p), __FILE__, __LINE__, (r)) #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) \ grpc_subchannel_weak_unref((p), __FILE__, __LINE__, (r)) -#define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) \ - grpc_connected_subchannel_ref((p), __FILE__, __LINE__, (r)) -#define GRPC_CONNECTED_SUBCHANNEL_UNREF(p, r) \ - grpc_connected_subchannel_unref((p), __FILE__, __LINE__, (r)) #define GRPC_SUBCHANNEL_CALL_REF(p, r) \ grpc_subchannel_call_ref((p), __FILE__, __LINE__, (r)) #define GRPC_SUBCHANNEL_CALL_UNREF(p, r) \ @@ -65,14 +62,39 @@ typedef struct grpc_subchannel_key grpc_subchannel_key; #define GRPC_SUBCHANNEL_UNREF(p, r) grpc_subchannel_unref((p)) #define GRPC_SUBCHANNEL_WEAK_REF(p, r) grpc_subchannel_weak_ref((p)) #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) grpc_subchannel_weak_unref((p)) -#define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) grpc_connected_subchannel_ref((p)) -#define GRPC_CONNECTED_SUBCHANNEL_UNREF(p, r) \ - grpc_connected_subchannel_unref((p)) #define GRPC_SUBCHANNEL_CALL_REF(p, r) grpc_subchannel_call_ref((p)) #define GRPC_SUBCHANNEL_CALL_UNREF(p, r) grpc_subchannel_call_unref((p)) #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS #endif +namespace grpc_core { +class ConnectedSubchannel : public grpc_core::RefCountedWithTracing { + public: + struct CallArgs { + grpc_polling_entity* pollent; + grpc_slice path; + gpr_timespec start_time; + grpc_millis deadline; + gpr_arena* arena; + grpc_call_context_element* context; + grpc_call_combiner* call_combiner; + }; + + explicit ConnectedSubchannel(grpc_channel_stack* channel_stack); + ~ConnectedSubchannel(); + + grpc_channel_stack* channel_stack() { return channel_stack_; } + void NotifyOnStateChange(grpc_pollset_set* interested_parties, + grpc_connectivity_state* state, + grpc_closure* closure); + void Ping(grpc_closure* on_initiate, grpc_closure* on_ack); + grpc_error* CreateCall(const CallArgs& args, grpc_subchannel_call** call); + + private: + grpc_channel_stack* channel_stack_; +}; +} // namespace grpc_core + grpc_subchannel* grpc_subchannel_ref( grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); grpc_subchannel* grpc_subchannel_ref_from_weak_ref( @@ -83,35 +105,11 @@ grpc_subchannel* grpc_subchannel_weak_ref( grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_weak_unref( grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -grpc_connected_subchannel* grpc_connected_subchannel_ref( - grpc_connected_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -void grpc_connected_subchannel_unref( - grpc_connected_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_call_ref( grpc_subchannel_call* call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_call_unref( grpc_subchannel_call* call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -/** construct a subchannel call */ -typedef struct { - grpc_polling_entity* pollent; - grpc_slice path; - gpr_timespec start_time; - grpc_millis deadline; - gpr_arena* arena; - grpc_call_context_element* context; - grpc_call_combiner* call_combiner; -} grpc_connected_subchannel_call_args; - -grpc_error* grpc_connected_subchannel_create_call( - grpc_connected_subchannel* connected_subchannel, - const grpc_connected_subchannel_call_args* args, - grpc_subchannel_call** subchannel_call); - -/** process a transport level op */ -void grpc_connected_subchannel_process_transport_op( - grpc_connected_subchannel* subchannel, grpc_transport_op* op); - /** poll the current connectivity state of a channel */ grpc_connectivity_state grpc_subchannel_check_connectivity( grpc_subchannel* channel, grpc_error** error); @@ -121,17 +119,12 @@ grpc_connectivity_state grpc_subchannel_check_connectivity( void grpc_subchannel_notify_on_state_change( grpc_subchannel* channel, grpc_pollset_set* interested_parties, grpc_connectivity_state* state, grpc_closure* notify); -void grpc_connected_subchannel_notify_on_state_change( - grpc_connected_subchannel* channel, grpc_pollset_set* interested_parties, - grpc_connectivity_state* state, grpc_closure* notify); -void grpc_connected_subchannel_ping(grpc_connected_subchannel* channel, - grpc_closure* on_initiate, - grpc_closure* on_ack); - -/** retrieve the grpc_connected_subchannel - or NULL if called before - the subchannel becomes connected */ -grpc_connected_subchannel* grpc_subchannel_get_connected_subchannel( - grpc_subchannel* subchannel); + +/** retrieve the grpc_core::ConnectedSubchannel - or nullptr if not connected + * (which may happen before it initially connects or during transient failures) + * */ +grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> +grpc_subchannel_get_connected_subchannel(grpc_subchannel* c); /** return the subchannel index key for \a subchannel */ const grpc_subchannel_key* grpc_subchannel_get_key( diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc index 1ab7e516de..aa14d5931a 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.cc +++ b/src/core/lib/iomgr/ev_epoll1_linux.cc @@ -20,12 +20,14 @@ #include <grpc/support/log.h> -/* This polling engine is only relevant on linux kernels supporting epoll() */ +/* This polling engine is only relevant on linux kernels supporting epoll + epoll_create() or epoll_create1() */ #ifdef GRPC_LINUX_EPOLL #include "src/core/lib/iomgr/ev_epoll1_linux.h" #include <assert.h> #include <errno.h> +#include <fcntl.h> #include <limits.h> #include <poll.h> #include <pthread.h> @@ -84,11 +86,28 @@ typedef struct epoll_set { /* The global singleton epoll set */ static epoll_set g_epoll_set; +static int epoll_create_and_cloexec() { +#ifdef GRPC_LINUX_EPOLL_CREATE1 + int fd = epoll_create1(EPOLL_CLOEXEC); + if (fd < 0) { + gpr_log(GPR_ERROR, "epoll_create1 unavailable"); + } +#else + int fd = epoll_create(MAX_EPOLL_EVENTS); + if (fd < 0) { + gpr_log(GPR_ERROR, "epoll_create unavailable"); + } else if (fcntl(fd, F_SETFD, FD_CLOEXEC) != 0) { + gpr_log(GPR_ERROR, "fcntl following epoll_create failed"); + return -1; + } +#endif + return fd; +} + /* Must be called *only* once */ static bool epoll_set_init() { - g_epoll_set.epfd = epoll_create1(EPOLL_CLOEXEC); + g_epoll_set.epfd = epoll_create_and_cloexec(); if (g_epoll_set.epfd < 0) { - gpr_log(GPR_ERROR, "epoll unavailable"); return false; } diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 5f5f45a7a5..e4a2d67e4b 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -21,7 +21,7 @@ #include <grpc/support/log.h> /* This polling engine is only relevant on linux kernels supporting epoll() */ -#ifdef GRPC_LINUX_EPOLL +#ifdef GRPC_LINUX_EPOLL_CREATE1 #include "src/core/lib/iomgr/ev_epollex_linux.h" @@ -1442,15 +1442,15 @@ const grpc_event_engine_vtable* grpc_init_epollex_linux( return &vtable; } -#else /* defined(GRPC_LINUX_EPOLL) */ +#else /* defined(GRPC_LINUX_EPOLL_CREATE1) */ #if defined(GRPC_POSIX_SOCKET) #include "src/core/lib/iomgr/ev_epollex_linux.h" -/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return - * NULL */ +/* If GRPC_LINUX_EPOLL_CREATE1 is not defined, it means + epoll_create1 is not available. Return NULL */ const grpc_event_engine_vtable* grpc_init_epollex_linux( bool explicitly_requested) { return nullptr; } #endif /* defined(GRPC_POSIX_SOCKET) */ -#endif /* !defined(GRPC_LINUX_EPOLL) */ +#endif /* !defined(GRPC_LINUX_EPOLL_CREATE1) */ diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc index 8072a6cbed..3544d4f3a4 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.cc +++ b/src/core/lib/iomgr/ev_epollsig_linux.cc @@ -22,7 +22,7 @@ #include <grpc/support/log.h> /* This polling engine is only relevant on linux kernels supporting epoll() */ -#ifdef GRPC_LINUX_EPOLL +#ifdef GRPC_LINUX_EPOLL_CREATE1 #include "src/core/lib/iomgr/ev_epollsig_linux.h" @@ -1725,11 +1725,11 @@ const grpc_event_engine_vtable* grpc_init_epollsig_linux( return &vtable; } -#else /* defined(GRPC_LINUX_EPOLL) */ +#else /* defined(GRPC_LINUX_EPOLL_CREATE1) */ #if defined(GRPC_POSIX_SOCKET) #include "src/core/lib/iomgr/ev_epollsig_linux.h" -/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return - * NULL */ +/* If GRPC_LINUX_EPOLL_CREATE1 is not defined, it means + epoll_create1 is not available. Return NULL */ const grpc_event_engine_vtable* grpc_init_epollsig_linux( bool explicit_request) { return nullptr; @@ -1737,4 +1737,4 @@ const grpc_event_engine_vtable* grpc_init_epollsig_linux( #endif /* defined(GRPC_POSIX_SOCKET) */ void grpc_use_signal(int signum) {} -#endif /* !defined(GRPC_LINUX_EPOLL) */ +#endif /* !defined(GRPC_LINUX_EPOLL_CREATE1) */ diff --git a/src/core/lib/iomgr/ev_epollsig_linux.h b/src/core/lib/iomgr/ev_epollsig_linux.h index 5b8aba9d9f..48178d3713 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.h +++ b/src/core/lib/iomgr/ev_epollsig_linux.h @@ -24,10 +24,10 @@ const grpc_event_engine_vtable* grpc_init_epollsig_linux(bool explicit_request); -#ifdef GRPC_LINUX_EPOLL +#ifdef GRPC_LINUX_EPOLL_CREATE1 void* grpc_fd_get_polling_island(grpc_fd* fd); void* grpc_pollset_get_polling_island(grpc_pollset* ps); bool grpc_are_polling_islands_equal(void* p, void* q); -#endif /* defined(GRPC_LINUX_EPOLL) */ +#endif /* defined(GRPC_LINUX_EPOLL_CREATE1) */ #endif /* GRPC_CORE_LIB_IOMGR_EV_EPOLLSIG_LINUX_H */ diff --git a/src/core/lib/iomgr/is_epollexclusive_available.cc b/src/core/lib/iomgr/is_epollexclusive_available.cc index e5803532e7..08f9cf2b69 100644 --- a/src/core/lib/iomgr/is_epollexclusive_available.cc +++ b/src/core/lib/iomgr/is_epollexclusive_available.cc @@ -20,7 +20,7 @@ #include "src/core/lib/iomgr/is_epollexclusive_available.h" -#ifdef GRPC_LINUX_EPOLL +#ifdef GRPC_LINUX_EPOLL_CREATE1 #include <grpc/support/log.h> diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h index 9fae8c0052..25090898ed 100644 --- a/src/core/lib/iomgr/port.h +++ b/src/core/lib/iomgr/port.h @@ -37,6 +37,7 @@ #define GRPC_POSIX_SOCKETUTILS 1 #define GRPC_POSIX_WAKEUP_FD 1 #define GRPC_TIMER_USE_GENERIC 1 +#define GRPC_LINUX_EPOLL 1 #elif defined(GPR_WINDOWS) #define GRPC_TIMER_USE_GENERIC 1 #define GRPC_WINSOCK_SOCKET 1 @@ -67,8 +68,11 @@ #define GRPC_POSIX_WAKEUP_FD 1 #define GRPC_TIMER_USE_GENERIC 1 #ifdef __GLIBC_PREREQ -#if __GLIBC_PREREQ(2, 9) +#if __GLIBC_PREREQ(2, 4) #define GRPC_LINUX_EPOLL 1 +#endif +#if __GLIBC_PREREQ(2, 9) +#define GRPC_LINUX_EPOLL_CREATE1 1 #define GRPC_LINUX_EVENTFD 1 #endif #if __GLIBC_PREREQ(2, 10) @@ -77,6 +81,7 @@ #endif #ifndef __GLIBC__ #define GRPC_LINUX_EPOLL 1 +#define GRPC_LINUX_EPOLL_CREATE1 1 #define GRPC_LINUX_EVENTFD 1 #define GRPC_MSG_IOVLEN_TYPE int #endif diff --git a/src/core/lib/support/abstract.h b/src/core/lib/support/abstract.h index 5498769a7d..1dffa30128 100644 --- a/src/core/lib/support/abstract.h +++ b/src/core/lib/support/abstract.h @@ -26,4 +26,9 @@ #define GRPC_ABSTRACT_BASE_CLASS \ static void operator delete(void* p) { abort(); } +// gRPC currently can't depend on libstdc++, so we can't use "= 0" for +// pure virtual methods. Instead, we use this macro. +#define GRPC_ABSTRACT \ + { GPR_ASSERT(false); } + #endif /* GRPC_CORE_LIB_SUPPORT_ABSTRACT_H */ diff --git a/src/core/lib/support/orphanable.h b/src/core/lib/support/orphanable.h new file mode 100644 index 0000000000..2f537573fd --- /dev/null +++ b/src/core/lib/support/orphanable.h @@ -0,0 +1,171 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_SUPPORT_ORPHANABLE_H +#define GRPC_CORE_LIB_SUPPORT_ORPHANABLE_H + +#include <grpc/support/log.h> +#include <grpc/support/sync.h> + +#include <memory> + +#include "src/core/lib/debug/trace.h" +#include "src/core/lib/support/abstract.h" +#include "src/core/lib/support/debug_location.h" +#include "src/core/lib/support/memory.h" + +namespace grpc_core { + +// A base class for orphanable objects, which have one external owner +// but are not necessarily destroyed immediately when the external owner +// gives up ownership. Instead, the owner calls the object's Orphan() +// method, and the object then takes responsibility for its own cleanup +// and destruction. +class Orphanable { + public: + // Gives up ownership of the object. The implementation must arrange + // to eventually destroy the object without further interaction from the + // caller. + virtual void Orphan() GRPC_ABSTRACT; + + // Not copyable or movable. + Orphanable(const Orphanable&) = delete; + Orphanable& operator=(const Orphanable&) = delete; + + GRPC_ABSTRACT_BASE_CLASS + + protected: + Orphanable() {} + virtual ~Orphanable() {} +}; + +template <typename T> +class OrphanableDelete { + public: + void operator()(T* p) { p->Orphan(); } +}; + +template <typename T, typename Deleter = OrphanableDelete<T>> +using OrphanablePtr = std::unique_ptr<T, Deleter>; + +template <typename T, typename... Args> +inline OrphanablePtr<T> MakeOrphanable(Args&&... args) { + return OrphanablePtr<T>(New<T>(std::forward<Args>(args)...)); +} + +// A type of Orphanable with internal ref-counting. +class InternallyRefCounted : public Orphanable { + public: + // Not copyable nor movable. + InternallyRefCounted(const InternallyRefCounted&) = delete; + InternallyRefCounted& operator=(const InternallyRefCounted&) = delete; + + GRPC_ABSTRACT_BASE_CLASS + + protected: + InternallyRefCounted() { gpr_ref_init(&refs_, 1); } + virtual ~InternallyRefCounted() {} + + void Ref() { gpr_ref(&refs_); } + + void Unref() { + if (gpr_unref(&refs_)) { + Delete(this); + } + } + + // Allow Delete() to access destructor. + template <typename T> + friend void Delete(T*); + + private: + gpr_refcount refs_; +}; + +// An alternative version of the InternallyRefCounted base class that +// supports tracing. This is intended to be used in cases where the +// object will be handled both by idiomatic C++ code using smart +// pointers and legacy code that is manually calling Ref() and Unref(). +// Once all of our code is converted to idiomatic C++, we may be able to +// eliminate this class. +class InternallyRefCountedWithTracing : public Orphanable { + public: + // Not copyable nor movable. + InternallyRefCountedWithTracing(const InternallyRefCountedWithTracing&) = + delete; + InternallyRefCountedWithTracing& operator=( + const InternallyRefCountedWithTracing&) = delete; + + GRPC_ABSTRACT_BASE_CLASS + + protected: + // Allow Delete() to access destructor. + template <typename T> + friend void Delete(T*); + + InternallyRefCountedWithTracing() + : InternallyRefCountedWithTracing(static_cast<TraceFlag*>(nullptr)) {} + + explicit InternallyRefCountedWithTracing(TraceFlag* trace_flag) + : trace_flag_(trace_flag) { + gpr_ref_init(&refs_, 1); + } + +#ifdef NDEBUG + explicit InternallyRefCountedWithTracing(DebugOnlyTraceFlag* trace_flag) + : InternallyRefCountedWithTracing() {} +#endif + + virtual ~InternallyRefCountedWithTracing() {} + + void Ref() { gpr_ref(&refs_); } + + void Ref(const DebugLocation& location, const char* reason) { + if (location.Log() && trace_flag_ != nullptr && trace_flag_->enabled()) { + gpr_atm old_refs = gpr_atm_no_barrier_load(&refs_.count); + gpr_log(GPR_DEBUG, "%s:%p %s:%d ref %" PRIdPTR " -> %" PRIdPTR " %s", + trace_flag_->name(), this, location.file(), location.line(), + old_refs, old_refs + 1, reason); + } + Ref(); + } + + void Unref() { + if (gpr_unref(&refs_)) { + Delete(this); + } + } + + void Unref(const DebugLocation& location, const char* reason) { + if (location.Log() && trace_flag_ != nullptr && trace_flag_->enabled()) { + gpr_atm old_refs = gpr_atm_no_barrier_load(&refs_.count); + gpr_log(GPR_DEBUG, "%s:%p %s:%d unref %" PRIdPTR " -> %" PRIdPTR " %s", + trace_flag_->name(), this, location.file(), location.line(), + old_refs, old_refs - 1, reason); + } + Unref(); + } + + private: + TraceFlag* trace_flag_ = nullptr; + gpr_refcount refs_; +}; + +} // namespace grpc_core + +#endif /* GRPC_CORE_LIB_SUPPORT_ORPHANABLE_H */ diff --git a/src/core/lib/support/ref_counted.h b/src/core/lib/support/ref_counted.h index 4c662f9119..48c11f7bbf 100644 --- a/src/core/lib/support/ref_counted.h +++ b/src/core/lib/support/ref_counted.h @@ -23,6 +23,7 @@ #include <grpc/support/sync.h> #include "src/core/lib/debug/trace.h" +#include "src/core/lib/support/abstract.h" #include "src/core/lib/support/debug_location.h" #include "src/core/lib/support/memory.h" @@ -45,6 +46,8 @@ class RefCounted { RefCounted(const RefCounted&) = delete; RefCounted& operator=(const RefCounted&) = delete; + GRPC_ABSTRACT_BASE_CLASS + protected: // Allow Delete() to access destructor. template <typename T> @@ -98,18 +101,26 @@ class RefCountedWithTracing { RefCountedWithTracing(const RefCountedWithTracing&) = delete; RefCountedWithTracing& operator=(const RefCountedWithTracing&) = delete; + GRPC_ABSTRACT_BASE_CLASS + protected: // Allow Delete() to access destructor. template <typename T> friend void Delete(T*); - RefCountedWithTracing() : RefCountedWithTracing(nullptr) {} + RefCountedWithTracing() + : RefCountedWithTracing(static_cast<TraceFlag*>(nullptr)) {} explicit RefCountedWithTracing(TraceFlag* trace_flag) : trace_flag_(trace_flag) { gpr_ref_init(&refs_, 1); } +#ifdef NDEBUG + explicit RefCountedWithTracing(DebugOnlyTraceFlag* trace_flag) + : RefCountedWithTracing() {} +#endif + virtual ~RefCountedWithTracing() {} private: diff --git a/src/core/lib/support/ref_counted_ptr.h b/src/core/lib/support/ref_counted_ptr.h index dc2385e369..76ff0bba66 100644 --- a/src/core/lib/support/ref_counted_ptr.h +++ b/src/core/lib/support/ref_counted_ptr.h @@ -76,6 +76,15 @@ class RefCountedPtr { T& operator*() const { return *value_; } T* operator->() const { return value_; } + bool operator==(const RefCountedPtr& other) const { + return value_ == other.value_; + } + bool operator==(const T* other) const { return value_ == other; } + bool operator!=(const RefCountedPtr& other) const { + return value_ != other.value_; + } + bool operator!=(const T* other) const { return value_ != other; } + private: T* value_ = nullptr; }; diff --git a/src/objective-c/BoringSSL.podspec b/src/objective-c/BoringSSL.podspec index c61afc1a8f..6e406b0dc9 100644 --- a/src/objective-c/BoringSSL.podspec +++ b/src/objective-c/BoringSSL.podspec @@ -136,12 +136,12 @@ Pod::Spec.new do |s| # Replace "const BIGNUM *I" in rsa.h with a lowercase i, as the former fails when including # OpenSSL in a Swift bridging header (complex.h defines "I", and it's as if the compiler # included it in every bridged header). - sed -E -i '.back' 's/\\*I,/*i,/g' include/openssl/rsa.h + sed -E -i'.back' 's/\\*I,/*i,/g' include/openssl/rsa.h # Replace `#include "../crypto/internal.h"` in e_tls.c with `#include "../internal.h"`. The # former assumes crypto/ is in the headers search path, which is hard to enforce when using # dynamic frameworks. The latters always works, being relative to the current file. - sed -E -i '.back' 's/crypto\\///g' crypto/cipher/e_tls.c + sed -E -i'.back' 's/crypto\\///g' crypto/cipher/e_tls.c # Add a module map and an umbrella header cat > include/openssl/umbrella.h <<EOF @@ -197,11 +197,11 @@ Pod::Spec.new do |s| # https://github.com/libgit2/libgit2/commit/1ddada422caf8e72ba97dca2568d2bf879fed5f2 and libvpx # in https://chromium.googlesource.com/webm/libvpx/+/1bec0c5a7e885ec792f6bb658eb3f34ad8f37b15 # work around it by removing the include. We need four of its macros, so we expand them here. - sed -E -i '.back' '/<inttypes.h>/d' include/openssl/bn.h - sed -E -i '.back' 's/PRIu32/"u"/g' include/openssl/bn.h - sed -E -i '.back' 's/PRIx32/"x"/g' include/openssl/bn.h - sed -E -i '.back' 's/PRIu64/"llu"/g' include/openssl/bn.h - sed -E -i '.back' 's/PRIx64/"llx"/g' include/openssl/bn.h + sed -E -i'.back' '/<inttypes.h>/d' include/openssl/bn.h + sed -E -i'.back' 's/PRIu32/"u"/g' include/openssl/bn.h + sed -E -i'.back' 's/PRIx32/"x"/g' include/openssl/bn.h + sed -E -i'.back' 's/PRIu64/"llu"/g' include/openssl/bn.h + sed -E -i'.back' 's/PRIx64/"llx"/g' include/openssl/bn.h # This is a bit ridiculous, but requiring people to install Go in order to build is slightly # more ridiculous IMO. To save you from scrolling, this is the last part of the podspec. diff --git a/src/objective-c/README.md b/src/objective-c/README.md index e76ee173ea..40aba0317b 100644 --- a/src/objective-c/README.md +++ b/src/objective-c/README.md @@ -1,5 +1,12 @@ [![Cocoapods](https://img.shields.io/cocoapods/v/gRPC.svg)](https://cocoapods.org/pods/gRPC) # gRPC for Objective-C +gRPC Objective C library provides Objective C API for users to make gRPC calls on iOS or OS X +platforms. Currently, the minimum supported iOS version is 7.0 and OS X version is 10.9 (Mavericks). + +While gRPC doesn't require the use of an IDL to describe the API of services, using one simplifies +usage and adds some interoperability guarantees. Here we use [Protocol Buffers][], and provide a +plugin for the Protobuf Compiler (_protoc_) to generate client libraries to communicate with gRPC +services. - [Write your API declaration in proto format](#write-protos) - [Integrate a proto library in your project](#cocoapods) @@ -10,11 +17,6 @@ - [Install protoc and the gRPC plugin without using Homebrew](#no-homebrew) - [Integrate the generated gRPC library without using Cocoapods](#no-cocoapods) -While gRPC doesn't require the use of an IDL to describe the API of services, using one simplifies -usage and adds some interoperability guarantees. Here we use [Protocol Buffers][], and provide a -plugin for the Protobuf Compiler (_protoc_) to generate client libraries to communicate with gRPC -services. - <a name="write-protos"></a> ## Write your API declaration in proto format diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py index db410d307b..79793a710e 100644 --- a/src/python/grpcio/grpc/__init__.py +++ b/src/python/grpcio/grpc/__init__.py @@ -35,152 +35,152 @@ class FutureCancelledError(Exception): class Future(six.with_metaclass(abc.ABCMeta)): """A representation of a computation in another control flow. - Computations represented by a Future may be yet to be begun, may be ongoing, - or may have already completed. - """ + Computations represented by a Future may be yet to be begun, + may be ongoing, or may have already completed. + """ @abc.abstractmethod def cancel(self): """Attempts to cancel the computation. - This method does not block. + This method does not block. - Returns: - bool: - Returns True if the computation was canceled. - Returns False under all other circumstances, for example: - 1. computation has begun and could not be canceled. - 2. computation has finished - 3. computation is scheduled for execution and it is impossible to - determine its state without blocking. - """ + Returns: + bool: + Returns True if the computation was canceled. + Returns False under all other circumstances, for example: + 1. computation has begun and could not be canceled. + 2. computation has finished + 3. computation is scheduled for execution and it is impossible + to determine its state without blocking. + """ raise NotImplementedError() @abc.abstractmethod def cancelled(self): """Describes whether the computation was cancelled. - This method does not block. + This method does not block. - Returns: - bool: - Returns True if the computation was cancelled before its result became - available. - False under all other circumstances, for example: - 1. computation was not cancelled. - 2. computation's result is available. - """ + Returns: + bool: + Returns True if the computation was cancelled before its result became + available. + False under all other circumstances, for example: + 1. computation was not cancelled. + 2. computation's result is available. + """ raise NotImplementedError() @abc.abstractmethod def running(self): """Describes whether the computation is taking place. - This method does not block. + This method does not block. - Returns: - bool: - Returns True if the computation is scheduled for execution or currently - executing. - Returns False if the computation already executed or was cancelled. - """ + Returns: + bool: + Returns True if the computation is scheduled for execution or + currently executing. + Returns False if the computation already executed or was cancelled. + """ raise NotImplementedError() @abc.abstractmethod def done(self): """Describes whether the computation has taken place. - This method does not block. + This method does not block. - Returns: - bool: - Returns True if the computation already executed or was cancelled. - Returns False if the computation is scheduled for execution or currently - executing. - This is exactly opposite of the running() method's result. - """ + Returns: + bool: + Returns True if the computation already executed or was cancelled. + Returns False if the computation is scheduled for execution or + currently executing. + This is exactly opposite of the running() method's result. + """ raise NotImplementedError() @abc.abstractmethod def result(self, timeout=None): """Returns the result of the computation or raises its exception. - This method may return immediately or may block. + This method may return immediately or may block. - Args: - timeout: The length of time in seconds to wait for the computation to - finish or be cancelled. If None, the call will block until the - computations's termination. + Args: + timeout: The length of time in seconds to wait for the computation to + finish or be cancelled. If None, the call will block until the + computations's termination. - Returns: - The return value of the computation. + Returns: + The return value of the computation. - Raises: - FutureTimeoutError: If a timeout value is passed and the computation does - not terminate within the allotted time. - FutureCancelledError: If the computation was cancelled. - Exception: If the computation raised an exception, this call will raise - the same exception. - """ + Raises: + FutureTimeoutError: If a timeout value is passed and the computation + does not terminate within the allotted time. + FutureCancelledError: If the computation was cancelled. + Exception: If the computation raised an exception, this call will + raise the same exception. + """ raise NotImplementedError() @abc.abstractmethod def exception(self, timeout=None): """Return the exception raised by the computation. - This method may return immediately or may block. + This method may return immediately or may block. - Args: - timeout: The length of time in seconds to wait for the computation to - terminate or be cancelled. If None, the call will block until the - computations's termination. + Args: + timeout: The length of time in seconds to wait for the computation to + terminate or be cancelled. If None, the call will block until the + computations's termination. - Returns: - The exception raised by the computation, or None if the computation did - not raise an exception. + Returns: + The exception raised by the computation, or None if the computation + did not raise an exception. - Raises: - FutureTimeoutError: If a timeout value is passed and the computation does - not terminate within the allotted time. - FutureCancelledError: If the computation was cancelled. - """ + Raises: + FutureTimeoutError: If a timeout value is passed and the computation + does not terminate within the allotted time. + FutureCancelledError: If the computation was cancelled. + """ raise NotImplementedError() @abc.abstractmethod def traceback(self, timeout=None): """Access the traceback of the exception raised by the computation. - This method may return immediately or may block. + This method may return immediately or may block. - Args: - timeout: The length of time in seconds to wait for the computation to - terminate or be cancelled. If None, the call will block until the - computations's termination. + Args: + timeout: The length of time in seconds to wait for the computation + to terminate or be cancelled. If None, the call will block until + the computation's termination. - Returns: - The traceback of the exception raised by the computation, or None if the - computation did not raise an exception. + Returns: + The traceback of the exception raised by the computation, or None + if the computation did not raise an exception. - Raises: - FutureTimeoutError: If a timeout value is passed and the computation does - not terminate within the allotted time. - FutureCancelledError: If the computation was cancelled. - """ + Raises: + FutureTimeoutError: If a timeout value is passed and the computation + does not terminate within the allotted time. + FutureCancelledError: If the computation was cancelled. + """ raise NotImplementedError() @abc.abstractmethod def add_done_callback(self, fn): """Adds a function to be called at completion of the computation. - The callback will be passed this Future object describing the outcome of - the computation. + The callback will be passed this Future object describing the outcome + of the computation. - If the computation has already completed, the callback will be called - immediately. + If the computation has already completed, the callback will be called + immediately. - Args: - fn: A callable taking this Future object as its single parameter. - """ + Args: + fn: A callable taking this Future object as its single parameter. + """ raise NotImplementedError() @@ -191,14 +191,14 @@ class Future(six.with_metaclass(abc.ABCMeta)): class ChannelConnectivity(enum.Enum): """Mirrors grpc_connectivity_state in the gRPC Core. - Attributes: - IDLE: The channel is idle. - CONNECTING: The channel is connecting. - READY: The channel is ready to conduct RPCs. - TRANSIENT_FAILURE: The channel has seen a failure from which it expects to - recover. - SHUTDOWN: The channel has seen a failure from which it cannot recover. - """ + Attributes: + IDLE: The channel is idle. + CONNECTING: The channel is connecting. + READY: The channel is ready to conduct RPCs. + TRANSIENT_FAILURE: The channel has seen a failure from which it expects + to recover. + SHUTDOWN: The channel has seen a failure from which it cannot recover. + """ IDLE = (_cygrpc.ConnectivityState.idle, 'idle') CONNECTING = (_cygrpc.ConnectivityState.connecting, 'connecting') READY = (_cygrpc.ConnectivityState.ready, 'ready') @@ -250,44 +250,44 @@ class RpcContext(six.with_metaclass(abc.ABCMeta)): def is_active(self): """Describes whether the RPC is active or has terminated. - Returns: - bool: - True if RPC is active, False otherwise. - """ + Returns: + bool: + True if RPC is active, False otherwise. + """ raise NotImplementedError() @abc.abstractmethod def time_remaining(self): """Describes the length of allowed time remaining for the RPC. - Returns: - A nonnegative float indicating the length of allowed time in seconds - remaining for the RPC to complete before it is considered to have timed - out, or None if no deadline was specified for the RPC. - """ + Returns: + A nonnegative float indicating the length of allowed time in seconds + remaining for the RPC to complete before it is considered to have + timed out, or None if no deadline was specified for the RPC. + """ raise NotImplementedError() @abc.abstractmethod def cancel(self): """Cancels the RPC. - Idempotent and has no effect if the RPC has already terminated. - """ + Idempotent and has no effect if the RPC has already terminated. + """ raise NotImplementedError() @abc.abstractmethod def add_callback(self, callback): """Registers a callback to be called on RPC termination. - Args: - callback: A no-parameter callable to be called on RPC termination. + Args: + callback: A no-parameter callable to be called on RPC termination. - Returns: - bool: - True if the callback was added and will be called later; False if the - callback was not added and will not be called (because the RPC - already terminated or some other reason). - """ + Returns: + bool: + True if the callback was added and will be called later; False if + the callback was not added and will not be called (because the RPC + already terminated or some other reason). + """ raise NotImplementedError() @@ -301,44 +301,44 @@ class Call(six.with_metaclass(abc.ABCMeta, RpcContext)): def initial_metadata(self): """Accesses the initial metadata sent by the server. - This method blocks until the value is available. + This method blocks until the value is available. - Returns: - The initial :term:`metadata`. - """ + Returns: + The initial :term:`metadata`. + """ raise NotImplementedError() @abc.abstractmethod def trailing_metadata(self): """Accesses the trailing metadata sent by the server. - This method blocks until the value is available. + This method blocks until the value is available. - Returns: - The trailing :term:`metadata`. - """ + Returns: + The trailing :term:`metadata`. + """ raise NotImplementedError() @abc.abstractmethod def code(self): """Accesses the status code sent by the server. - This method blocks until the value is available. + This method blocks until the value is available. - Returns: - The StatusCode value for the RPC. - """ + Returns: + The StatusCode value for the RPC. + """ raise NotImplementedError() @abc.abstractmethod def details(self): """Accesses the details sent by the server. - This method blocks until the value is available. + This method blocks until the value is available. - Returns: - The details string of the RPC. - """ + Returns: + The details string of the RPC. + """ raise NotImplementedError() @@ -578,9 +578,9 @@ class AuthMetadataPlugin(six.with_metaclass(abc.ABCMeta)): class ServerCredentials(object): """An encapsulation of the data required to open a secure port on a Server. - This class has no supported interface - it exists to define the type of its - instances and its instances exist to be passed to other functions. - """ + This class has no supported interface - it exists to define the type of its + instances and its instances exist to be passed to other functions. + """ def __init__(self, credentials): self._credentials = credentials @@ -611,61 +611,65 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): def __call__(self, request, timeout=None, metadata=None, credentials=None): """Synchronously invokes the underlying RPC. - Args: - request: The request value for the RPC. - timeout: An optional duration of time in seconds to allow for the RPC. - metadata: Optional :term:`metadata` to be transmitted to the - service-side of the RPC. - credentials: An optional CallCredentials for the RPC. + Args: + request: The request value for the RPC. + timeout: An optional duration of time in seconds to allow + for the RPC. + metadata: Optional :term:`metadata` to be transmitted to the + service-side of the RPC. + credentials: An optional CallCredentials for the RPC. - Returns: - The response value for the RPC. + Returns: + The response value for the RPC. - Raises: - RpcError: Indicating that the RPC terminated with non-OK status. The - raised RpcError will also be a Call for the RPC affording the RPC's - metadata, status code, and details. - """ + Raises: + RpcError: Indicating that the RPC terminated with non-OK status. The + raised RpcError will also be a Call for the RPC affording the RPC's + metadata, status code, and details. + """ raise NotImplementedError() @abc.abstractmethod def with_call(self, request, timeout=None, metadata=None, credentials=None): """Synchronously invokes the underlying RPC. - Args: - request: The request value for the RPC. - timeout: An optional durating of time in seconds to allow for the RPC. - metadata: Optional :term:`metadata` to be transmitted to the - service-side of the RPC. - credentials: An optional CallCredentials for the RPC. + Args: + request: The request value for the RPC. + timeout: An optional durating of time in seconds to allow for + the RPC. + metadata: Optional :term:`metadata` to be transmitted to the + service-side of the RPC. + credentials: An optional CallCredentials for the RPC. - Returns: - The response value for the RPC and a Call value for the RPC. + Returns: + The response value for the RPC and a Call value for the RPC. - Raises: - RpcError: Indicating that the RPC terminated with non-OK status. The - raised RpcError will also be a Call for the RPC affording the RPC's - metadata, status code, and details. - """ + Raises: + RpcError: Indicating that the RPC terminated with non-OK status. The + raised RpcError will also be a Call for the RPC affording the RPC's + metadata, status code, and details. + """ raise NotImplementedError() @abc.abstractmethod def future(self, request, timeout=None, metadata=None, credentials=None): """Asynchronously invokes the underlying RPC. - Args: - request: The request value for the RPC. - timeout: An optional duration of time in seconds to allow for the RPC. - metadata: Optional :term:`metadata` to be transmitted to the - service-side of the RPC. - credentials: An optional CallCredentials for the RPC. + Args: + request: The request value for the RPC. + timeout: An optional duration of time in seconds to allow for + the RPC. + metadata: Optional :term:`metadata` to be transmitted to the + service-side of the RPC. + credentials: An optional CallCredentials for the RPC. - Returns: - An object that is both a Call for the RPC and a Future. In the event of - RPC completion, the return Call-Future's result value will be the - response message of the RPC. Should the event terminate with non-OK - status, the returned Call-Future's exception value will be an RpcError. - """ + Returns: + An object that is both a Call for the RPC and a Future. + In the event of RPC completion, the return Call-Future's result + value will be the response message of the RPC. + Should the event terminate with non-OK status, + the returned Call-Future's exception value will be an RpcError. + """ raise NotImplementedError() @@ -676,19 +680,20 @@ class UnaryStreamMultiCallable(six.with_metaclass(abc.ABCMeta)): def __call__(self, request, timeout=None, metadata=None, credentials=None): """Invokes the underlying RPC. - Args: - request: The request value for the RPC. - timeout: An optional duration of time in seconds to allow for the RPC. - If None, the timeout is considered infinite. - metadata: An optional :term:`metadata` to be transmitted to the - service-side of the RPC. - credentials: An optional CallCredentials for the RPC. + Args: + request: The request value for the RPC. + timeout: An optional duration of time in seconds to allow for + the RPC. If None, the timeout is considered infinite. + metadata: An optional :term:`metadata` to be transmitted to the + service-side of the RPC. + credentials: An optional CallCredentials for the RPC. - Returns: - An object that is both a Call for the RPC and an iterator of response - values. Drawing response values from the returned Call-iterator may - raise RpcError indicating termination of the RPC with non-OK status. - """ + Returns: + An object that is both a Call for the RPC and an iterator of + response values. Drawing response values from the returned + Call-iterator may raise RpcError indicating termination of the + RPC with non-OK status. + """ raise NotImplementedError() @@ -703,22 +708,23 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): credentials=None): """Synchronously invokes the underlying RPC. - Args: - request_iterator: An iterator that yields request values for the RPC. - timeout: An optional duration of time in seconds to allow for the RPC. - If None, the timeout is considered infinite. - metadata: Optional :term:`metadata` to be transmitted to the - service-side of the RPC. - credentials: An optional CallCredentials for the RPC. + Args: + request_iterator: An iterator that yields request values for + the RPC. + timeout: An optional duration of time in seconds to allow for + the RPC. If None, the timeout is considered infinite. + metadata: Optional :term:`metadata` to be transmitted to the + service-side of the RPC. + credentials: An optional CallCredentials for the RPC. - Returns: - The response value for the RPC. + Returns: + The response value for the RPC. - Raises: - RpcError: Indicating that the RPC terminated with non-OK status. The - raised RpcError will also implement grpc.Call, affording methods - such as metadata, code, and details. - """ + Raises: + RpcError: Indicating that the RPC terminated with non-OK status. The + raised RpcError will also implement grpc.Call, affording methods + such as metadata, code, and details. + """ raise NotImplementedError() @abc.abstractmethod @@ -729,22 +735,23 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): credentials=None): """Synchronously invokes the underlying RPC on the client. - Args: - request_iterator: An iterator that yields request values for the RPC. - timeout: An optional duration of time in seconds to allow for the RPC. - If None, the timeout is considered infinite. - metadata: Optional :term:`metadata` to be transmitted to the - service-side of the RPC. - credentials: An optional CallCredentials for the RPC. + Args: + request_iterator: An iterator that yields request values for + the RPC. + timeout: An optional duration of time in seconds to allow for + the RPC. If None, the timeout is considered infinite. + metadata: Optional :term:`metadata` to be transmitted to the + service-side of the RPC. + credentials: An optional CallCredentials for the RPC. - Returns: - The response value for the RPC and a Call object for the RPC. + Returns: + The response value for the RPC and a Call object for the RPC. - Raises: - RpcError: Indicating that the RPC terminated with non-OK status. The - raised RpcError will also be a Call for the RPC affording the RPC's - metadata, status code, and details. - """ + Raises: + RpcError: Indicating that the RPC terminated with non-OK status. The + raised RpcError will also be a Call for the RPC affording the RPC's + metadata, status code, and details. + """ raise NotImplementedError() @abc.abstractmethod @@ -755,20 +762,21 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): credentials=None): """Asynchronously invokes the underlying RPC on the client. - Args: - request_iterator: An iterator that yields request values for the RPC. - timeout: An optional duration of time in seconds to allow for the RPC. - If None, the timeout is considered infinite. - metadata: Optional :term:`metadata` to be transmitted to the - service-side of the RPC. - credentials: An optional CallCredentials for the RPC. + Args: + request_iterator: An iterator that yields request values for the RPC. + timeout: An optional duration of time in seconds to allow for + the RPC. If None, the timeout is considered infinite. + metadata: Optional :term:`metadata` to be transmitted to the + service-side of the RPC. + credentials: An optional CallCredentials for the RPC. - Returns: - An object that is both a Call for the RPC and a Future. In the event of - RPC completion, the return Call-Future's result value will be the - response message of the RPC. Should the event terminate with non-OK - status, the returned Call-Future's exception value will be an RpcError. - """ + Returns: + An object that is both a Call for the RPC and a Future. + In the event of RPC completion, the return Call-Future's result value + will be the response message of the RPC. Should the event terminate + with non-OK status, the returned Call-Future's exception value will + be an RpcError. + """ raise NotImplementedError() @@ -783,19 +791,20 @@ class StreamStreamMultiCallable(six.with_metaclass(abc.ABCMeta)): credentials=None): """Invokes the underlying RPC on the client. - Args: - request_iterator: An iterator that yields request values for the RPC. - timeout: An optional duration of time in seconds to allow for the RPC. - if not specified the timeout is considered infinite. - metadata: Optional :term:`metadata` to be transmitted to the - service-side of the RPC. - credentials: An optional CallCredentials for the RPC. + Args: + request_iterator: An iterator that yields request values for the RPC. + timeout: An optional duration of time in seconds to allow for + the RPC. If not specified, the timeout is considered infinite. + metadata: Optional :term:`metadata` to be transmitted to the + service-side of the RPC. + credentials: An optional CallCredentials for the RPC. - Returns: - An object that is both a Call for the RPC and an iterator of response - values. Drawing response values from the returned Call-iterator may - raise RpcError indicating termination of the RPC with non-OK status. - """ + Returns: + An object that is both a Call for the RPC and an iterator of + response values. Drawing response values from the returned + Call-iterator may raise RpcError indicating termination of the + RPC with non-OK status. + """ raise NotImplementedError() @@ -809,31 +818,31 @@ class Channel(six.with_metaclass(abc.ABCMeta)): def subscribe(self, callback, try_to_connect=False): """Subscribe to this Channel's connectivity state machine. - A Channel may be in any of the states described by ChannelConnectivity. - This method allows application to monitor the state transitions. - The typical use case is to debug or gain better visibility into gRPC - runtime's state. + A Channel may be in any of the states described by ChannelConnectivity. + This method allows application to monitor the state transitions. + The typical use case is to debug or gain better visibility into gRPC + runtime's state. - Args: - callback: A callable to be invoked with ChannelConnectivity argument. - ChannelConnectivity describes current state of the channel. - The callable will be invoked immediately upon subscription and again for - every change to ChannelConnectivity until it is unsubscribed or this - Channel object goes out of scope. - try_to_connect: A boolean indicating whether or not this Channel should - attempt to connect immediately. If set to False, gRPC runtime decides - when to connect. - """ + Args: + callback: A callable to be invoked with ChannelConnectivity argument. + ChannelConnectivity describes current state of the channel. + The callable will be invoked immediately upon subscription + and again for every change to ChannelConnectivity until it + is unsubscribed or this Channel object goes out of scope. + try_to_connect: A boolean indicating whether or not this Channel + should attempt to connect immediately. If set to False, gRPC + runtime decides when to connect. + """ raise NotImplementedError() @abc.abstractmethod def unsubscribe(self, callback): """Unsubscribes a subscribed callback from this Channel's connectivity. - Args: - callback: A callable previously registered with this Channel from having - been passed to its "subscribe" method. - """ + Args: + callback: A callable previously registered with this Channel from + having been passed to its "subscribe" method. + """ raise NotImplementedError() @abc.abstractmethod @@ -843,16 +852,17 @@ class Channel(six.with_metaclass(abc.ABCMeta)): response_deserializer=None): """Creates a UnaryUnaryMultiCallable for a unary-unary method. - Args: - method: The name of the RPC method. - request_serializer: Optional behaviour for serializing the request - message. Request goes unserialized in case None is passed. - response_deserializer: Optional behaviour for deserializing the response - message. Response goes undeserialized in case None is passed. + Args: + method: The name of the RPC method. + request_serializer: Optional behaviour for serializing the request + message. Request goes unserialized in case None is passed. + response_deserializer: Optional behaviour for deserializing the + response message. Response goes undeserialized in case None + is passed. - Returns: - A UnaryUnaryMultiCallable value for the named unary-unary method. - """ + Returns: + A UnaryUnaryMultiCallable value for the named unary-unary method. + """ raise NotImplementedError() @abc.abstractmethod @@ -862,16 +872,17 @@ class Channel(six.with_metaclass(abc.ABCMeta)): response_deserializer=None): """Creates a UnaryStreamMultiCallable for a unary-stream method. - Args: - method: The name of the RPC method. - request_serializer: Optional behaviour for serializing the request - message. Request goes unserialized in case None is passed. - response_deserializer: Optional behaviour for deserializing the response - message. Response goes undeserialized in case None is passed. + Args: + method: The name of the RPC method. + request_serializer: Optional behaviour for serializing the request + message. Request goes unserialized in case None is passed. + response_deserializer: Optional behaviour for deserializing the + response message. Response goes undeserialized in case None is + passed. - Returns: - A UnaryStreamMultiCallable value for the name unary-stream method. - """ + Returns: + A UnaryStreamMultiCallable value for the name unary-stream method. + """ raise NotImplementedError() @abc.abstractmethod @@ -881,16 +892,17 @@ class Channel(six.with_metaclass(abc.ABCMeta)): response_deserializer=None): """Creates a StreamUnaryMultiCallable for a stream-unary method. - Args: - method: The name of the RPC method. - request_serializer: Optional behaviour for serializing the request - message. Request goes unserialized in case None is passed. - response_deserializer: Optional behaviour for deserializing the response - message. Response goes undeserialized in case None is passed. + Args: + method: The name of the RPC method. + request_serializer: Optional behaviour for serializing the request + message. Request goes unserialized in case None is passed. + response_deserializer: Optional behaviour for deserializing the + response message. Response goes undeserialized in case None is + passed. - Returns: - A StreamUnaryMultiCallable value for the named stream-unary method. - """ + Returns: + A StreamUnaryMultiCallable value for the named stream-unary method. + """ raise NotImplementedError() @abc.abstractmethod @@ -900,16 +912,17 @@ class Channel(six.with_metaclass(abc.ABCMeta)): response_deserializer=None): """Creates a StreamStreamMultiCallable for a stream-stream method. - Args: - method: The name of the RPC method. - request_serializer: Optional behaviour for serializing the request - message. Request goes unserialized in case None is passed. - response_deserializer: Optional behaviour for deserializing the response - message. Response goes undeserialized in case None is passed. + Args: + method: The name of the RPC method. + request_serializer: Optional behaviour for serializing the request + message. Request goes unserialized in case None is passed. + response_deserializer: Optional behaviour for deserializing the + response message. Response goes undeserialized in case None + is passed. - Returns: - A StreamStreamMultiCallable value for the named stream-stream method. - """ + Returns: + A StreamStreamMultiCallable value for the named stream-stream method. + """ raise NotImplementedError() @@ -923,79 +936,79 @@ class ServicerContext(six.with_metaclass(abc.ABCMeta, RpcContext)): def invocation_metadata(self): """Accesses the metadata from the sent by the client. - Returns: - The invocation :term:`metadata`. - """ + Returns: + The invocation :term:`metadata`. + """ raise NotImplementedError() @abc.abstractmethod def peer(self): """Identifies the peer that invoked the RPC being serviced. - Returns: - A string identifying the peer that invoked the RPC being serviced. - The string format is determined by gRPC runtime. - """ + Returns: + A string identifying the peer that invoked the RPC being serviced. + The string format is determined by gRPC runtime. + """ raise NotImplementedError() @abc.abstractmethod def peer_identities(self): """Gets one or more peer identity(s). - Equivalent to - servicer_context.auth_context().get( - servicer_context.peer_identity_key()) + Equivalent to + servicer_context.auth_context().get( + servicer_context.peer_identity_key()) - Returns: - An iterable of the identities, or None if the call is not authenticated. - Each identity is returned as a raw bytes type. - """ + Returns: + An iterable of the identities, or None if the call is not + authenticated. Each identity is returned as a raw bytes type. + """ raise NotImplementedError() @abc.abstractmethod def peer_identity_key(self): """The auth property used to identify the peer. - For example, "x509_common_name" or "x509_subject_alternative_name" are - used to identify an SSL peer. + For example, "x509_common_name" or "x509_subject_alternative_name" are + used to identify an SSL peer. - Returns: - The auth property (string) that indicates the - peer identity, or None if the call is not authenticated. - """ + Returns: + The auth property (string) that indicates the + peer identity, or None if the call is not authenticated. + """ raise NotImplementedError() @abc.abstractmethod def auth_context(self): """Gets the auth context for the call. - Returns: - A map of strings to an iterable of bytes for each auth property. - """ + Returns: + A map of strings to an iterable of bytes for each auth property. + """ raise NotImplementedError() @abc.abstractmethod def send_initial_metadata(self, initial_metadata): """Sends the initial metadata value to the client. - This method need not be called by implementations if they have no - metadata to add to what the gRPC runtime will transmit. + This method need not be called by implementations if they have no + metadata to add to what the gRPC runtime will transmit. - Args: - initial_metadata: The initial :term:`metadata`. - """ + Args: + initial_metadata: The initial :term:`metadata`. + """ raise NotImplementedError() @abc.abstractmethod def set_trailing_metadata(self, trailing_metadata): """Sends the trailing metadata for the RPC. - This method need not be called by implementations if they have no - metadata to add to what the gRPC runtime will transmit. + This method need not be called by implementations if they have no + metadata to add to what the gRPC runtime will transmit. - Args: - trailing_metadata: The trailing :term:`metadata`. - """ + Args: + trailing_metadata: The trailing :term:`metadata`. + """ raise NotImplementedError() @abc.abstractmethod @@ -1049,44 +1062,45 @@ class ServicerContext(six.with_metaclass(abc.ABCMeta, RpcContext)): class RpcMethodHandler(six.with_metaclass(abc.ABCMeta)): """An implementation of a single RPC method. - Attributes: - request_streaming: Whether the RPC supports exactly one request message or - any arbitrary number of request messages. - response_streaming: Whether the RPC supports exactly one response message or - any arbitrary number of response messages. - request_deserializer: A callable behavior that accepts a byte string and - returns an object suitable to be passed to this object's business logic, - or None to indicate that this object's business logic should be passed the - raw request bytes. - response_serializer: A callable behavior that accepts an object produced by - this object's business logic and returns a byte string, or None to - indicate that the byte strings produced by this object's business logic - should be transmitted on the wire as they are. - unary_unary: This object's application-specific business logic as a callable - value that takes a request value and a ServicerContext object and returns - a response value. Only non-None if both request_streaming and - response_streaming are False. - unary_stream: This object's application-specific business logic as a - callable value that takes a request value and a ServicerContext object and - returns an iterator of response values. Only non-None if request_streaming - is False and response_streaming is True. - stream_unary: This object's application-specific business logic as a - callable value that takes an iterator of request values and a - ServicerContext object and returns a response value. Only non-None if - request_streaming is True and response_streaming is False. - stream_stream: This object's application-specific business logic as a - callable value that takes an iterator of request values and a - ServicerContext object and returns an iterator of response values. Only - non-None if request_streaming and response_streaming are both True. - """ + Attributes: + request_streaming: Whether the RPC supports exactly one request message + or any arbitrary number of request messages. + response_streaming: Whether the RPC supports exactly one response message + or any arbitrary number of response messages. + request_deserializer: A callable behavior that accepts a byte string and + returns an object suitable to be passed to this object's business + logic, or None to indicate that this object's business logic should be + passed the raw request bytes. + response_serializer: A callable behavior that accepts an object produced + by this object's business logic and returns a byte string, or None to + indicate that the byte strings produced by this object's business logic + should be transmitted on the wire as they are. + unary_unary: This object's application-specific business logic as a + callable value that takes a request value and a ServicerContext object + and returns a response value. Only non-None if both request_streaming + and response_streaming are False. + unary_stream: This object's application-specific business logic as a + callable value that takes a request value and a ServicerContext object + and returns an iterator of response values. Only non-None if + request_streaming is False and response_streaming is True. + stream_unary: This object's application-specific business logic as a + callable value that takes an iterator of request values and a + ServicerContext object and returns a response value. Only non-None if + request_streaming is True and response_streaming is False. + stream_stream: This object's application-specific business logic as a + callable value that takes an iterator of request values and a + ServicerContext object and returns an iterator of response values. + Only non-None if request_streaming and response_streaming are both + True. + """ class HandlerCallDetails(six.with_metaclass(abc.ABCMeta)): """Describes an RPC that has just arrived for service. - Attributes: - method: The method name of the RPC. - invocation_metadata: The :term:`metadata` sent by the client. - """ + Attributes: + method: The method name of the RPC. + invocation_metadata: The :term:`metadata` sent by the client. + """ class GenericRpcHandler(six.with_metaclass(abc.ABCMeta)): @@ -1096,33 +1110,33 @@ class GenericRpcHandler(six.with_metaclass(abc.ABCMeta)): def service(self, handler_call_details): """Returns the handler for servicing the RPC. - Args: - handler_call_details: A HandlerCallDetails describing the RPC. + Args: + handler_call_details: A HandlerCallDetails describing the RPC. - Returns: - An RpcMethodHandler with which the RPC may be serviced if the - implementation chooses to service this RPC, or None otherwise. - """ + Returns: + An RpcMethodHandler with which the RPC may be serviced if the + implementation chooses to service this RPC, or None otherwise. + """ raise NotImplementedError() class ServiceRpcHandler(six.with_metaclass(abc.ABCMeta, GenericRpcHandler)): """An implementation of RPC methods belonging to a service. - A service handles RPC methods with structured names of the form - '/Service.Name/Service.Method', where 'Service.Name' is the value - returned by service_name(), and 'Service.Method' is the method - name. A service can have multiple method names, but only a single - service name. - """ + A service handles RPC methods with structured names of the form + '/Service.Name/Service.Method', where 'Service.Name' is the value + returned by service_name(), and 'Service.Method' is the method + name. A service can have multiple method names, but only a single + service name. + """ @abc.abstractmethod def service_name(self): """Returns this service's name. - Returns: - The service name. - """ + Returns: + The service name. + """ raise NotImplementedError() @@ -1164,83 +1178,84 @@ class Server(six.with_metaclass(abc.ABCMeta)): def add_generic_rpc_handlers(self, generic_rpc_handlers): """Registers GenericRpcHandlers with this Server. - This method is only safe to call before the server is started. + This method is only safe to call before the server is started. - Args: - generic_rpc_handlers: An iterable of GenericRpcHandlers that will be used - to service RPCs. - """ + Args: + generic_rpc_handlers: An iterable of GenericRpcHandlers that will be + used to service RPCs. + """ raise NotImplementedError() @abc.abstractmethod def add_insecure_port(self, address): """Opens an insecure port for accepting RPCs. - This method may only be called before starting the server. + This method may only be called before starting the server. - Args: - address: The address for which to open a port. - if the port is 0, or not specified in the address, then gRPC runtime - will choose a port. + Args: + address: The address for which to open a port. + if the port is 0, or not specified in the address, then gRPC runtime + will choose a port. - Returns: - integer: - An integer port on which server will accept RPC requests. - """ + Returns: + integer: + An integer port on which server will accept RPC requests. + """ raise NotImplementedError() @abc.abstractmethod def add_secure_port(self, address, server_credentials): """Opens a secure port for accepting RPCs. - This method may only be called before starting the server. + This method may only be called before starting the server. - Args: - address: The address for which to open a port. - if the port is 0, or not specified in the address, then gRPC runtime - will choose a port. - server_credentials: A ServerCredentials object. + Args: + address: The address for which to open a port. + if the port is 0, or not specified in the address, then gRPC + runtime will choose a port. + server_credentials: A ServerCredentials object. - Returns: - integer: - An integer port on which server will accept RPC requests. - """ + Returns: + integer: + An integer port on which server will accept RPC requests. + """ raise NotImplementedError() @abc.abstractmethod def start(self): """Starts this Server. - This method may only be called once. (i.e. it is not idempotent). - """ + This method may only be called once. (i.e. it is not idempotent). + """ raise NotImplementedError() @abc.abstractmethod def stop(self, grace): """Stops this Server. - This method immediately stop service of new RPCs in all cases. - If a grace period is specified, this method returns immediately - and all RPCs active at the end of the grace period are aborted. + This method immediately stop service of new RPCs in all cases. + If a grace period is specified, this method returns immediately + and all RPCs active at the end of the grace period are aborted. - If a grace period is not specified, then all existing RPCs are - teriminated immediately and the this method blocks until the last - RPC handler terminates. + If a grace period is not specified, then all existing RPCs are + teriminated immediately and the this method blocks until the last + RPC handler terminates. - This method is idempotent and may be called at any time. Passing a smaller - grace value in subsequentcall will have the effect of stopping the Server - sooner. Passing a larger grace value in subsequent call *will not* have the - effect of stopping the server later (i.e. the most restrictive grace - value is used). + This method is idempotent and may be called at any time. + Passing a smaller grace value in subsequent call will have + the effect of stopping the Server sooner. Passing a larger + grace value in subsequent call *will not* have the effect of + stopping the server later (i.e. the most restrictive grace + value is used). - Args: - grace: A duration of time in seconds or None. + Args: + grace: A duration of time in seconds or None. - Returns: - A threading.Event that will be set when this Server has completely - stopped, i.e. when running RPCs either complete or are aborted and - all handlers have terminated. - """ + Returns: + A threading.Event that will be set when this Server has completely + stopped, i.e. when running RPCs either complete or are aborted and + all handlers have terminated. + """ raise NotImplementedError() @@ -1252,15 +1267,15 @@ def unary_unary_rpc_method_handler(behavior, response_serializer=None): """Creates an RpcMethodHandler for a unary-unary RPC method. - Args: - behavior: The implementation of an RPC that accepts one request and returns - one response. - request_deserializer: An optional behavior for request deserialization. - response_serializer: An optional behavior for response serialization. + Args: + behavior: The implementation of an RPC that accepts one request + and returns one response. + request_deserializer: An optional behavior for request deserialization. + response_serializer: An optional behavior for response serialization. - Returns: - An RpcMethodHandler object that is typically used by grpc.Server. - """ + Returns: + An RpcMethodHandler object that is typically used by grpc.Server. + """ from grpc import _utilities # pylint: disable=cyclic-import return _utilities.RpcMethodHandler(False, False, request_deserializer, response_serializer, behavior, None, @@ -1272,15 +1287,15 @@ def unary_stream_rpc_method_handler(behavior, response_serializer=None): """Creates an RpcMethodHandler for a unary-stream RPC method. - Args: - behavior: The implementation of an RPC that accepts one request and returns - an iterator of response values. - request_deserializer: An optional behavior for request deserialization. - response_serializer: An optional behavior for response serialization. + Args: + behavior: The implementation of an RPC that accepts one request + and returns an iterator of response values. + request_deserializer: An optional behavior for request deserialization. + response_serializer: An optional behavior for response serialization. - Returns: - An RpcMethodHandler object that is typically used by grpc.Server. - """ + Returns: + An RpcMethodHandler object that is typically used by grpc.Server. + """ from grpc import _utilities # pylint: disable=cyclic-import return _utilities.RpcMethodHandler(False, True, request_deserializer, response_serializer, None, behavior, @@ -1292,15 +1307,15 @@ def stream_unary_rpc_method_handler(behavior, response_serializer=None): """Creates an RpcMethodHandler for a stream-unary RPC method. - Args: - behavior: The implementation of an RPC that accepts an iterator of request - values and returns a single response value. - request_deserializer: An optional behavior for request deserialization. - response_serializer: An optional behavior for response serialization. + Args: + behavior: The implementation of an RPC that accepts an iterator of + request values and returns a single response value. + request_deserializer: An optional behavior for request deserialization. + response_serializer: An optional behavior for response serialization. - Returns: - An RpcMethodHandler object that is typically used by grpc.Server. - """ + Returns: + An RpcMethodHandler object that is typically used by grpc.Server. + """ from grpc import _utilities # pylint: disable=cyclic-import return _utilities.RpcMethodHandler(True, False, request_deserializer, response_serializer, None, None, @@ -1312,15 +1327,15 @@ def stream_stream_rpc_method_handler(behavior, response_serializer=None): """Creates an RpcMethodHandler for a stream-stream RPC method. - Args: - behavior: The implementation of an RPC that accepts an iterator of request - values and returns an iterator of response values. - request_deserializer: An optional behavior for request deserialization. - response_serializer: An optional behavior for response serialization. + Args: + behavior: The implementation of an RPC that accepts an iterator of + request values and returns an iterator of response values. + request_deserializer: An optional behavior for request deserialization. + response_serializer: An optional behavior for response serialization. - Returns: - An RpcMethodHandler object that is typically used by grpc.Server. - """ + Returns: + An RpcMethodHandler object that is typically used by grpc.Server. + """ from grpc import _utilities # pylint: disable=cyclic-import return _utilities.RpcMethodHandler(True, True, request_deserializer, response_serializer, None, None, None, @@ -1330,15 +1345,16 @@ def stream_stream_rpc_method_handler(behavior, def method_handlers_generic_handler(service, method_handlers): """Creates a GenericRpcHandler from RpcMethodHandlers. - Args: - service: The name of the service that is implemented by the method_handlers. - method_handlers: A dictionary that maps method names to corresponding - RpcMethodHandler. + Args: + service: The name of the service that is implemented by the + method_handlers. + method_handlers: A dictionary that maps method names to corresponding + RpcMethodHandler. - Returns: - A GenericRpcHandler. This is typically added to the grpc.Server object - with add_generic_rpc_handlers() before starting the server. - """ + Returns: + A GenericRpcHandler. This is typically added to the grpc.Server object + with add_generic_rpc_handlers() before starting the server. + """ from grpc import _utilities # pylint: disable=cyclic-import return _utilities.DictionaryGenericHandler(service, method_handlers) @@ -1435,20 +1451,20 @@ def ssl_server_credentials(private_key_certificate_chain_pairs, require_client_auth=False): """Creates a ServerCredentials for use with an SSL-enabled Server. - Args: - private_key_certificate_chain_pairs: A list of pairs of the form - [PEM-encoded private key, PEM-encoded certificate chain]. - root_certificates: An optional byte string of PEM-encoded client root - certificates that the server will use to verify client authentication. - If omitted, require_client_auth must also be False. - require_client_auth: A boolean indicating whether or not to require - clients to be authenticated. May only be True if root_certificates - is not None. - - Returns: - A ServerCredentials for use with an SSL-enabled Server. Typically, this - object is an argument to add_secure_port() method during server setup. - """ + Args: + private_key_certificate_chain_pairs: A list of pairs of the form + [PEM-encoded private key, PEM-encoded certificate chain]. + root_certificates: An optional byte string of PEM-encoded client root + certificates that the server will use to verify client authentication. + If omitted, require_client_auth must also be False. + require_client_auth: A boolean indicating whether or not to require + clients to be authenticated. May only be True if root_certificates + is not None. + + Returns: + A ServerCredentials for use with an SSL-enabled Server. Typically, this + object is an argument to add_secure_port() method during server setup. + """ if len(private_key_certificate_chain_pairs) == 0: raise ValueError( 'At least one private key-certificate chain pair is required!') @@ -1522,16 +1538,16 @@ def dynamic_ssl_server_credentials(initial_certificate_configuration, def channel_ready_future(channel): """Creates a Future that tracks when a Channel is ready. - Cancelling the Future does not affect the channel's state machine. - It merely decouples the Future from channel state machine. + Cancelling the Future does not affect the channel's state machine. + It merely decouples the Future from channel state machine. - Args: - channel: A Channel object. + Args: + channel: A Channel object. - Returns: - A Future object that matures when the channel connectivity is - ChannelConnectivity.READY. - """ + Returns: + A Future object that matures when the channel connectivity is + ChannelConnectivity.READY. + """ from grpc import _utilities # pylint: disable=cyclic-import return _utilities.channel_ready_future(channel) @@ -1539,14 +1555,14 @@ def channel_ready_future(channel): def insecure_channel(target, options=None): """Creates an insecure Channel to a server. - Args: - target: The server address - options: An optional list of key-value pairs (channel args in gRPC runtime) - to configure the channel. + Args: + target: The server address + options: An optional list of key-value pairs (channel args + in gRPC Core runtime) to configure the channel. - Returns: - A Channel object. - """ + Returns: + A Channel object. + """ from grpc import _channel # pylint: disable=cyclic-import return _channel.Channel(target, () if options is None else options, None) @@ -1554,15 +1570,15 @@ def insecure_channel(target, options=None): def secure_channel(target, credentials, options=None): """Creates a secure Channel to a server. - Args: - target: The server address. - credentials: A ChannelCredentials instance. - options: An optional list of key-value pairs (channel args in gRPC runtime) - to configure the channel. + Args: + target: The server address. + credentials: A ChannelCredentials instance. + options: An optional list of key-value pairs (channel args + in gRPC Core runtime) to configure the channel. - Returns: - A Channel object. - """ + Returns: + A Channel object. + """ from grpc import _channel # pylint: disable=cyclic-import return _channel.Channel(target, () if options is None else options, credentials._credentials) diff --git a/templates/gRPC-Core.podspec.template b/templates/gRPC-Core.podspec.template index 9785d150e4..da404e2fef 100644 --- a/templates/gRPC-Core.podspec.template +++ b/templates/gRPC-Core.podspec.template @@ -202,6 +202,6 @@ # TODO (mxyan): Instead of this hack, add include path "third_party" to C core's include path? s.prepare_command = <<-END_OF_COMMAND - find src/core/ -type f -exec sed -E -i '.back' 's;#include "third_party/nanopb/(.*)";#include <nanopb/\\1>;g' {} \\\; + find src/core/ -type f -exec sed -E -i'.back' 's;#include "third_party/nanopb/(.*)";#include <nanopb/\\1>;g' {} \\\; END_OF_COMMAND end diff --git a/templates/tools/dockerfile/clang_format.include b/templates/tools/dockerfile/clang5.include index 79d0ff286f..11ff442787 100644 --- a/templates/tools/dockerfile/clang_format.include +++ b/templates/tools/dockerfile/clang5.include @@ -3,3 +3,5 @@ RUN wget http://releases.llvm.org/5.0.0/clang+llvm-5.0.0-linux-x86_64-ubuntu14.0 RUN tar xf clang+llvm-5.0.0-linux-x86_64-ubuntu14.04.tar.xz RUN ln -s /clang+llvm-5.0.0-linux-x86_64-ubuntu14.04/bin/clang-format /usr/local/bin/clang-format ENV CLANG_FORMAT=clang-format +RUN ln -s /clang+llvm-5.0.0-linux-x86_64-ubuntu14.04/bin/clang-tidy /usr/local/bin/clang-tidy +ENV CLANG_TIDY=clang-tidy diff --git a/templates/tools/dockerfile/grpc_clang_format/Dockerfile.template b/templates/tools/dockerfile/grpc_clang_format/Dockerfile.template index 1ab667c95d..4f24a025c6 100644 --- a/templates/tools/dockerfile/grpc_clang_format/Dockerfile.template +++ b/templates/tools/dockerfile/grpc_clang_format/Dockerfile.template @@ -16,8 +16,8 @@ FROM debian:jessie - <%include file="../clang_format.include"/> + <%include file="../clang5.include"/> ADD clang_format_all_the_things.sh / CMD ["echo 'Run with tools/distrib/clang_format_code.sh'"] -
\ No newline at end of file + diff --git a/templates/tools/dockerfile/grpc_clang_tidy/Dockerfile.template b/templates/tools/dockerfile/grpc_clang_tidy/Dockerfile.template new file mode 100644 index 0000000000..f5bceaa5f3 --- /dev/null +++ b/templates/tools/dockerfile/grpc_clang_tidy/Dockerfile.template @@ -0,0 +1,24 @@ +%YAML 1.2 +--- | + # Copyright 2015 gRPC authors. + # + # Licensed under the Apache License, Version 2.0 (the "License"); + # you may not use this file except in compliance with the License. + # You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + + FROM debian:jessie + + <%include file="../clang5.include"/> + <%include file="../python_deps.include"/> + ADD clang_tidy_all_the_things.sh / + CMD ["echo 'Run with tools/distrib/clang_tidy_code.sh'"] + + diff --git a/templates/tools/dockerfile/test/sanity/Dockerfile.template b/templates/tools/dockerfile/test/sanity/Dockerfile.template index c98f7d4176..7453e6c460 100644 --- a/templates/tools/dockerfile/test/sanity/Dockerfile.template +++ b/templates/tools/dockerfile/test/sanity/Dockerfile.template @@ -53,7 +53,7 @@ RUN chmod +x ./bazel-0.4.4-installer-linux-x86_64.sh RUN ./bazel-0.4.4-installer-linux-x86_64.sh - <%include file="../../clang_format.include"/> + <%include file="../../clang5.include"/> <%include file="../../run_tests_addons.include"/> # Define the default command. diff --git a/test/core/iomgr/ev_epollsig_linux_test.cc b/test/core/iomgr/ev_epollsig_linux_test.cc index e767e01f21..262470300e 100644 --- a/test/core/iomgr/ev_epollsig_linux_test.cc +++ b/test/core/iomgr/ev_epollsig_linux_test.cc @@ -18,7 +18,7 @@ #include "src/core/lib/iomgr/port.h" /* This test only relevant on linux systems where epoll() is available */ -#ifdef GRPC_LINUX_EPOLL +#ifdef GRPC_LINUX_EPOLL_CREATE1 #include "src/core/lib/iomgr/ev_epollsig_linux.h" #include "src/core/lib/iomgr/ev_posix.h" @@ -319,6 +319,6 @@ int main(int argc, char** argv) { grpc_shutdown(); return 0; } -#else /* defined(GRPC_LINUX_EPOLL) */ +#else /* defined(GRPC_LINUX_EPOLL_CREATE1) */ int main(int argc, char** argv) { return 0; } -#endif /* !defined(GRPC_LINUX_EPOLL) */ +#endif /* !defined(GRPC_LINUX_EPOLL_CREATE1) */ diff --git a/test/core/iomgr/pollset_set_test.cc b/test/core/iomgr/pollset_set_test.cc index f27079134b..7d2f59bed4 100644 --- a/test/core/iomgr/pollset_set_test.cc +++ b/test/core/iomgr/pollset_set_test.cc @@ -18,7 +18,7 @@ #include "src/core/lib/iomgr/port.h" /* This test only relevant on linux systems where epoll is available */ -#ifdef GRPC_LINUX_EPOLL +#ifdef GRPC_LINUX_EPOLL_CREATE1 #include <errno.h> #include <string.h> @@ -443,6 +443,6 @@ int main(int argc, char** argv) { grpc_shutdown(); return 0; } -#else /* defined(GRPC_LINUX_EPOLL) */ +#else /* defined(GRPC_LINUX_EPOLL_CREATE1) */ int main(int argc, char** argv) { return 0; } -#endif /* !defined(GRPC_LINUX_EPOLL) */ +#endif /* !defined(GRPC_LINUX_EPOLL_CREATE1) */ diff --git a/test/core/support/BUILD b/test/core/support/BUILD index 4372b49b54..c8fa046da1 100644 --- a/test/core/support/BUILD +++ b/test/core/support/BUILD @@ -215,6 +215,19 @@ grpc_cc_test( ) grpc_cc_test( + name = "orphanable_test", + srcs = ["orphanable_test.cc"], + language = "C++", + deps = [ + "//:orphanable", + "//test/core/util:gpr_test_util", + ], + external_deps = [ + "gtest", + ], +) + +grpc_cc_test( name = "ref_counted_test", srcs = ["ref_counted_test.cc"], language = "C++", diff --git a/test/core/support/orphanable_test.cc b/test/core/support/orphanable_test.cc new file mode 100644 index 0000000000..e07017ab1e --- /dev/null +++ b/test/core/support/orphanable_test.cc @@ -0,0 +1,114 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/core/lib/support/orphanable.h" + +#include <gtest/gtest.h> + +#include "src/core/lib/support/memory.h" +#include "test/core/util/test_config.h" + +namespace grpc_core { +namespace testing { +namespace { + +class Foo : public Orphanable { + public: + Foo() : Foo(0) {} + explicit Foo(int value) : value_(value) {} + void Orphan() override { Delete(this); } + int value() const { return value_; } + + private: + int value_; +}; + +TEST(Orphanable, Basic) { + Foo* foo = New<Foo>(); + foo->Orphan(); +} + +TEST(OrphanablePtr, Basic) { + OrphanablePtr<Foo> foo(New<Foo>()); + EXPECT_EQ(0, foo->value()); +} + +TEST(MakeOrphanable, DefaultConstructor) { + auto foo = MakeOrphanable<Foo>(); + EXPECT_EQ(0, foo->value()); +} + +TEST(MakeOrphanable, WithParameters) { + auto foo = MakeOrphanable<Foo>(5); + EXPECT_EQ(5, foo->value()); +} + +class Bar : public InternallyRefCounted { + public: + Bar() : Bar(0) {} + explicit Bar(int value) : value_(value) {} + void Orphan() override { Unref(); } + int value() const { return value_; } + + void StartWork() { Ref(); } + void FinishWork() { Unref(); } + + private: + int value_; +}; + +TEST(OrphanablePtr, InternallyRefCounted) { + auto bar = MakeOrphanable<Bar>(); + bar->StartWork(); + bar->FinishWork(); +} + +// Note: We use DebugOnlyTraceFlag instead of TraceFlag to ensure that +// things build properly in both debug and non-debug cases. +DebugOnlyTraceFlag baz_tracer(true, "baz"); + +class Baz : public InternallyRefCountedWithTracing { + public: + Baz() : Baz(0) {} + explicit Baz(int value) + : InternallyRefCountedWithTracing(&baz_tracer), value_(value) {} + void Orphan() override { Unref(); } + int value() const { return value_; } + + void StartWork() { Ref(DEBUG_LOCATION, "work"); } + void FinishWork() { Unref(DEBUG_LOCATION, "work"); } + + private: + int value_; +}; + +TEST(OrphanablePtr, InternallyRefCountedWithTracing) { + auto baz = MakeOrphanable<Baz>(); + baz->StartWork(); + baz->FinishWork(); +} + +} // namespace +} // namespace testing +} // namespace grpc_core + +int main(int argc, char** argv) { + grpc_test_init(argc, argv); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/test/core/support/ref_counted_ptr_test.cc b/test/core/support/ref_counted_ptr_test.cc index 1830edc4e5..ce4975d347 100644 --- a/test/core/support/ref_counted_ptr_test.cc +++ b/test/core/support/ref_counted_ptr_test.cc @@ -138,6 +138,19 @@ TEST(RefCountedPtr, DerefernceOperators) { foo_ref.value(); } +TEST(RefCountedPtr, EqualityOperators) { + RefCountedPtr<Foo> foo(New<Foo>()); + RefCountedPtr<Foo> bar = foo; + RefCountedPtr<Foo> empty; + // Test equality between RefCountedPtrs. + EXPECT_EQ(foo, bar); + EXPECT_NE(foo, empty); + // Test equality with bare pointers. + EXPECT_EQ(foo, foo.get()); + EXPECT_EQ(empty, nullptr); + EXPECT_NE(foo, nullptr); +} + TEST(MakeRefCounted, NoArgs) { RefCountedPtr<Foo> foo = MakeRefCounted<Foo>(); EXPECT_EQ(0, foo->value()); diff --git a/test/core/support/ref_counted_test.cc b/test/core/support/ref_counted_test.cc index be9b6ff7c2..0629e3ff5f 100644 --- a/test/core/support/ref_counted_test.cc +++ b/test/core/support/ref_counted_test.cc @@ -44,7 +44,9 @@ TEST(RefCounted, ExtraRef) { foo->Unref(); } -TraceFlag foo_tracer(true, "foo"); +// Note: We use DebugOnlyTraceFlag instead of TraceFlag to ensure that +// things build properly in both debug and non-debug cases. +DebugOnlyTraceFlag foo_tracer(true, "foo"); class FooWithTracing : public RefCountedWithTracing { public: diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index c6e9577f0c..5a7e52e9e9 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -673,6 +673,42 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) { GPR_ASSERT(gpr_time_cmp(deadline, now) > 0); } +TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) { + const int kNumServers = 3; + StartServers(kNumServers); + const auto ports = GetServersPorts(); + ResetStub(ports, "round_robin"); + SetNextResolution(ports); + for (size_t i = 0; i < kNumServers; ++i) WaitForServer(i); + for (size_t i = 0; i < servers_.size(); ++i) { + CheckRpcSendOk(); + EXPECT_EQ(1, servers_[i]->service_.request_count()) << "for backend #" << i; + } + // One request should have gone to each server. + for (size_t i = 0; i < servers_.size(); ++i) { + EXPECT_EQ(1, servers_[i]->service_.request_count()); + } + const auto pre_death = servers_[0]->service_.request_count(); + // Kill the first server. + servers_[0]->Shutdown(true); + // Client request still succeed. May need retrying if RR had returned a pick + // before noticing the change in the server's connectivity. + while (!SendRpc()) + ; // Retry until success. + // Send a bunch of RPCs that should succeed. + for (int i = 0; i < 10 * kNumServers; ++i) CheckRpcSendOk(); + const auto post_death = servers_[0]->service_.request_count(); + // No requests have gone to the deceased server. + EXPECT_EQ(pre_death, post_death); + // Bring the first server back up. + servers_[0].reset(new ServerData(server_host_, ports[0])); + // Requests should start arriving at the first server either right away (if + // the server managed to start before the RR policy retried the subchannel) or + // after the subchannel retry delay otherwise (RR's subchannel retried before + // the server was fully back up). + WaitForServer(0); +} + } // namespace } // namespace testing } // namespace grpc diff --git a/tools/distrib/clang_tidy_code.sh b/tools/distrib/clang_tidy_code.sh new file mode 100755 index 0000000000..5da86aa277 --- /dev/null +++ b/tools/distrib/clang_tidy_code.sh @@ -0,0 +1,33 @@ +#!/bin/bash +# Copyright 2015 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +echo "NOTE: to automagically apply fixes, invoke with --fix" + +set -ex + +# change to root directory +cd $(dirname $0)/../.. +REPO_ROOT=$(pwd) + +if [ "$CLANG_TIDY_SKIP_DOCKER" == "" ] +then + # build clang-tidy docker image + docker build -t grpc_clang_tidy tools/dockerfile/grpc_clang_tidy + + # run clang-tidy against the checked out codebase + docker run -e TEST=$TEST -e CHANGED_FILES="$CHANGED_FILES" -e CLANG_TIDY_ROOT="/local-code" --rm=true -v "${REPO_ROOT}":/local-code -t grpc_clang_tidy /clang_tidy_all_the_things.sh "$@" +else + CLANG_TIDY_ROOT="${REPO_ROOT}" tools/dockerfile/grpc_clang_tidy/clang_tidy_all_the_things.sh "$@" +fi diff --git a/tools/distrib/run_clang_tidy.py b/tools/distrib/run_clang_tidy.py index 72d7956b68..bc61d4e79a 100755 --- a/tools/distrib/run_clang_tidy.py +++ b/tools/distrib/run_clang_tidy.py @@ -69,4 +69,5 @@ for filename in args.files: shortname=filename, )) #verbose_success=True)) -jobset.run(jobs, maxjobs=args.jobs) +num_fails, res_set = jobset.run(jobs, maxjobs=args.jobs) +sys.exit(num_fails) diff --git a/tools/dockerfile/grpc_clang_format/Dockerfile b/tools/dockerfile/grpc_clang_format/Dockerfile index b3abaef465..8801315bc9 100644 --- a/tools/dockerfile/grpc_clang_format/Dockerfile +++ b/tools/dockerfile/grpc_clang_format/Dockerfile @@ -19,6 +19,8 @@ RUN wget http://releases.llvm.org/5.0.0/clang+llvm-5.0.0-linux-x86_64-ubuntu14.0 RUN tar xf clang+llvm-5.0.0-linux-x86_64-ubuntu14.04.tar.xz RUN ln -s /clang+llvm-5.0.0-linux-x86_64-ubuntu14.04/bin/clang-format /usr/local/bin/clang-format ENV CLANG_FORMAT=clang-format +RUN ln -s /clang+llvm-5.0.0-linux-x86_64-ubuntu14.04/bin/clang-tidy /usr/local/bin/clang-tidy +ENV CLANG_TIDY=clang-tidy ADD clang_format_all_the_things.sh / CMD ["echo 'Run with tools/distrib/clang_format_code.sh'"] diff --git a/tools/dockerfile/grpc_clang_tidy/Dockerfile b/tools/dockerfile/grpc_clang_tidy/Dockerfile new file mode 100644 index 0000000000..9d9d70185b --- /dev/null +++ b/tools/dockerfile/grpc_clang_tidy/Dockerfile @@ -0,0 +1,41 @@ +# Copyright 2015 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM debian:jessie + +RUN apt-get update && apt-get -y install wget xz-utils +RUN wget http://releases.llvm.org/5.0.0/clang+llvm-5.0.0-linux-x86_64-ubuntu14.04.tar.xz +RUN tar xf clang+llvm-5.0.0-linux-x86_64-ubuntu14.04.tar.xz +RUN ln -s /clang+llvm-5.0.0-linux-x86_64-ubuntu14.04/bin/clang-format /usr/local/bin/clang-format +ENV CLANG_FORMAT=clang-format +RUN ln -s /clang+llvm-5.0.0-linux-x86_64-ubuntu14.04/bin/clang-tidy /usr/local/bin/clang-tidy +ENV CLANG_TIDY=clang-tidy + +#==================== +# Python dependencies + +# Install dependencies + +RUN apt-get update && apt-get install -y \ + python-all-dev \ + python3-all-dev \ + python-pip + +# Install Python packages from PyPI +RUN pip install --upgrade pip==9.0.1 +RUN pip install virtualenv +RUN pip install futures==2.2.0 enum34==1.0.4 protobuf==3.5.0.post1 six==1.10.0 twisted==17.5.0 + +ADD clang_tidy_all_the_things.sh / +CMD ["echo 'Run with tools/distrib/clang_tidy_code.sh'"] diff --git a/tools/run_tests/sanity/check_clang_tidy.sh b/tools/dockerfile/grpc_clang_tidy/clang_tidy_all_the_things.sh index 6c4caa1ee6..1a82dd52b7 100755 --- a/tools/run_tests/sanity/check_clang_tidy.sh +++ b/tools/dockerfile/grpc_clang_tidy/clang_tidy_all_the_things.sh @@ -13,9 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -set -e +set -ex -make buildtests \ - -j "$(python -c 'import multiprocessing; print multiprocessing.cpu_count()')" -find src/core src/cpp test/core test/cpp -print0 -name '*.h' -or -name '*.cc' \ +# clang format command +CLANG_TIDY=${CLANG_TIDY:-clang-tidy-5.0} + +cd ${CLANG_TIDY_ROOT} + +find src/core src/cpp test/core test/cpp -name '*.h' -or -name '*.cc' -print0 \ | xargs -0 tools/distrib/run_clang_tidy.py "$@" diff --git a/tools/dockerfile/test/sanity/Dockerfile b/tools/dockerfile/test/sanity/Dockerfile index 6e5a133a69..7a8e1c09b1 100644 --- a/tools/dockerfile/test/sanity/Dockerfile +++ b/tools/dockerfile/test/sanity/Dockerfile @@ -110,6 +110,8 @@ RUN wget http://releases.llvm.org/5.0.0/clang+llvm-5.0.0-linux-x86_64-ubuntu14.0 RUN tar xf clang+llvm-5.0.0-linux-x86_64-ubuntu14.04.tar.xz RUN ln -s /clang+llvm-5.0.0-linux-x86_64-ubuntu14.04/bin/clang-format /usr/local/bin/clang-format ENV CLANG_FORMAT=clang-format +RUN ln -s /clang+llvm-5.0.0-linux-x86_64-ubuntu14.04/bin/clang-tidy /usr/local/bin/clang-tidy +ENV CLANG_TIDY=clang-tidy # Prepare ccache RUN ln -s /usr/bin/ccache /usr/local/bin/gcc diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index d9184f49a2..85bbeed088 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1038,6 +1038,7 @@ src/core/lib/support/manual_constructor.h \ src/core/lib/support/memory.h \ src/core/lib/support/mpscq.h \ src/core/lib/support/murmur_hash.h \ +src/core/lib/support/orphanable.h \ src/core/lib/support/ref_counted.h \ src/core/lib/support/ref_counted_ptr.h \ src/core/lib/support/spinlock.h \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 3d3c6711d0..4bf0fc74d1 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1304,6 +1304,7 @@ src/core/lib/support/mpscq.cc \ src/core/lib/support/mpscq.h \ src/core/lib/support/murmur_hash.cc \ src/core/lib/support/murmur_hash.h \ +src/core/lib/support/orphanable.h \ src/core/lib/support/ref_counted.h \ src/core/lib/support/ref_counted_ptr.h \ src/core/lib/support/spinlock.h \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index c934155ecc..df45489d5e 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -3713,6 +3713,25 @@ "gpr_test_util", "grpc", "grpc++", + "grpc++_test", + "grpc_test_util" + ], + "headers": [], + "is_filegroup": false, + "language": "c++", + "name": "orphanable_test", + "src": [ + "test/core/support/orphanable_test.cc" + ], + "third_party": false, + "type": "target" + }, + { + "deps": [ + "gpr", + "gpr_test_util", + "grpc", + "grpc++", "grpc++_proto_reflection_desc_db", "grpc++_reflection", "grpc++_test_util", @@ -8286,6 +8305,7 @@ "src/core/lib/slice/slice_internal.h", "src/core/lib/slice/slice_string_helpers.h", "src/core/lib/support/debug_location.h", + "src/core/lib/support/orphanable.h", "src/core/lib/support/ref_counted.h", "src/core/lib/support/ref_counted_ptr.h", "src/core/lib/support/vector.h", @@ -8426,6 +8446,7 @@ "src/core/lib/slice/slice_internal.h", "src/core/lib/slice/slice_string_helpers.h", "src/core/lib/support/debug_location.h", + "src/core/lib/support/orphanable.h", "src/core/lib/support/ref_counted.h", "src/core/lib/support/ref_counted_ptr.h", "src/core/lib/support/vector.h", diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 6f36dff820..dc2b39eb21 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -4092,6 +4092,30 @@ "flaky": false, "gtest": true, "language": "c++", + "name": "orphanable_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": true + }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", "name": "proto_server_reflection_test", "platforms": [ "linux", diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index c8e917f117..6b27d6f875 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -1127,6 +1127,7 @@ class Sanity(object): environ = {'TEST': 'true'} if _is_use_docker_child(): environ['CLANG_FORMAT_SKIP_DOCKER'] = 'true' + environ['CLANG_TIDY_SKIP_DOCKER'] = 'true' return [ self.config.job_spec( cmd['script'].split(), diff --git a/tools/run_tests/sanity/sanity_tests.yaml b/tools/run_tests/sanity/sanity_tests.yaml index dab991a7b1..efdb4d84b5 100644 --- a/tools/run_tests/sanity/sanity_tests.yaml +++ b/tools/run_tests/sanity/sanity_tests.yaml @@ -14,11 +14,11 @@ cpu_cost: 3 - script: tools/distrib/check_copyright.py - script: tools/distrib/clang_format_code.sh +- script: tools/distrib/clang_tidy_code.sh - script: tools/distrib/check_trailing_newlines.sh - script: tools/distrib/check_nanopb_output.sh - script: tools/distrib/check_include_guards.py - script: tools/distrib/pylint_code.sh - script: tools/distrib/yapf_code.sh - script: tools/distrib/python/check_grpcio_tools.py -- script: tools/run_tests/sanity/check_clang_tidy.sh cpu_cost: 1000 |