aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--BUILD12
-rw-r--r--CMakeLists.txt38
-rw-r--r--Makefile48
-rw-r--r--build.yaml15
-rw-r--r--gRPC-Core.podspec4
-rw-r--r--grpc.gemspec1
-rw-r--r--package.xml1
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc8
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.h8
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc4
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc75
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc64
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc5
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.h3
-rw-r--r--src/core/ext/filters/client_channel/subchannel.cc297
-rw-r--r--src/core/ext/filters/client_channel/subchannel.h79
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.cc25
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.cc10
-rw-r--r--src/core/lib/iomgr/ev_epollsig_linux.cc10
-rw-r--r--src/core/lib/iomgr/ev_epollsig_linux.h4
-rw-r--r--src/core/lib/iomgr/is_epollexclusive_available.cc2
-rw-r--r--src/core/lib/iomgr/port.h7
-rw-r--r--src/core/lib/support/abstract.h5
-rw-r--r--src/core/lib/support/orphanable.h171
-rw-r--r--src/core/lib/support/ref_counted.h13
-rw-r--r--src/core/lib/support/ref_counted_ptr.h9
-rw-r--r--src/objective-c/BoringSSL.podspec14
-rw-r--r--src/objective-c/README.md12
-rw-r--r--src/python/grpcio/grpc/__init__.py996
-rw-r--r--templates/gRPC-Core.podspec.template2
-rw-r--r--templates/tools/dockerfile/clang5.include (renamed from templates/tools/dockerfile/clang_format.include)2
-rw-r--r--templates/tools/dockerfile/grpc_clang_format/Dockerfile.template4
-rw-r--r--templates/tools/dockerfile/grpc_clang_tidy/Dockerfile.template24
-rw-r--r--templates/tools/dockerfile/test/sanity/Dockerfile.template2
-rw-r--r--test/core/iomgr/ev_epollsig_linux_test.cc6
-rw-r--r--test/core/iomgr/pollset_set_test.cc6
-rw-r--r--test/core/support/BUILD13
-rw-r--r--test/core/support/orphanable_test.cc114
-rw-r--r--test/core/support/ref_counted_ptr_test.cc13
-rw-r--r--test/core/support/ref_counted_test.cc4
-rw-r--r--test/cpp/end2end/client_lb_end2end_test.cc36
-rwxr-xr-xtools/distrib/clang_tidy_code.sh33
-rwxr-xr-xtools/distrib/run_clang_tidy.py3
-rw-r--r--tools/dockerfile/grpc_clang_format/Dockerfile2
-rw-r--r--tools/dockerfile/grpc_clang_tidy/Dockerfile41
-rwxr-xr-xtools/dockerfile/grpc_clang_tidy/clang_tidy_all_the_things.sh (renamed from tools/run_tests/sanity/check_clang_tidy.sh)11
-rw-r--r--tools/dockerfile/test/sanity/Dockerfile2
-rw-r--r--tools/doxygen/Doxyfile.c++.internal1
-rw-r--r--tools/doxygen/Doxyfile.core.internal1
-rw-r--r--tools/run_tests/generated/sources_and_headers.json21
-rw-r--r--tools/run_tests/generated/tests.json24
-rwxr-xr-xtools/run_tests/run_tests.py1
-rw-r--r--tools/run_tests/sanity/sanity_tests.yaml2
53 files changed, 1475 insertions, 833 deletions
diff --git a/BUILD b/BUILD
index ddcb0cd0d7..186d664395 100644
--- a/BUILD
+++ b/BUILD
@@ -559,6 +559,16 @@ grpc_cc_library(
)
grpc_cc_library(
+ name = "orphanable",
+ language = "c++",
+ public_hdrs = ["src/core/lib/support/orphanable.h"],
+ deps = [
+ "debug_location",
+ "grpc_trace",
+ ],
+)
+
+grpc_cc_library(
name = "ref_counted",
language = "c++",
public_hdrs = ["src/core/lib/support/ref_counted.h"],
@@ -929,6 +939,8 @@ grpc_cc_library(
deps = [
"grpc_base",
"grpc_deadline_filter",
+ "ref_counted",
+ "ref_counted_ptr",
],
)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 93954b412d..97d68cc62c 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -563,6 +563,7 @@ add_dependencies(buildtests_cxx memory_test)
add_dependencies(buildtests_cxx metrics_client)
add_dependencies(buildtests_cxx mock_test)
add_dependencies(buildtests_cxx noop-benchmark)
+add_dependencies(buildtests_cxx orphanable_test)
add_dependencies(buildtests_cxx proto_server_reflection_test)
add_dependencies(buildtests_cxx proto_utils_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
@@ -10913,6 +10914,43 @@ target_link_libraries(noop-benchmark
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
+add_executable(orphanable_test
+ test/core/support/orphanable_test.cc
+ third_party/googletest/googletest/src/gtest-all.cc
+ third_party/googletest/googlemock/src/gmock-all.cc
+)
+
+
+target_include_directories(orphanable_test
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
+ PRIVATE ${_gRPC_SSL_INCLUDE_DIR}
+ PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR}
+ PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR}
+ PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR}
+ PRIVATE ${_gRPC_CARES_INCLUDE_DIR}
+ PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR}
+ PRIVATE third_party/googletest/googletest/include
+ PRIVATE third_party/googletest/googletest
+ PRIVATE third_party/googletest/googlemock/include
+ PRIVATE third_party/googletest/googlemock
+ PRIVATE ${_gRPC_PROTO_GENS_DIR}
+)
+
+target_link_libraries(orphanable_test
+ ${_gRPC_PROTOBUF_LIBRARIES}
+ ${_gRPC_ALLTARGETS_LIBRARIES}
+ grpc_test_util
+ grpc++
+ grpc
+ gpr_test_util
+ gpr
+ ${_gRPC_GFLAGS_LIBRARIES}
+)
+
+endif (gRPC_BUILD_TESTS)
+if (gRPC_BUILD_TESTS)
+
add_executable(proto_server_reflection_test
test/cpp/end2end/proto_server_reflection_test.cc
third_party/googletest/googletest/src/gtest-all.cc
diff --git a/Makefile b/Makefile
index 8a38cc8da0..3265cea362 100644
--- a/Makefile
+++ b/Makefile
@@ -1160,6 +1160,7 @@ memory_test: $(BINDIR)/$(CONFIG)/memory_test
metrics_client: $(BINDIR)/$(CONFIG)/metrics_client
mock_test: $(BINDIR)/$(CONFIG)/mock_test
noop-benchmark: $(BINDIR)/$(CONFIG)/noop-benchmark
+orphanable_test: $(BINDIR)/$(CONFIG)/orphanable_test
proto_server_reflection_test: $(BINDIR)/$(CONFIG)/proto_server_reflection_test
proto_utils_test: $(BINDIR)/$(CONFIG)/proto_utils_test
qps_interarrival_test: $(BINDIR)/$(CONFIG)/qps_interarrival_test
@@ -1602,6 +1603,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/metrics_client \
$(BINDIR)/$(CONFIG)/mock_test \
$(BINDIR)/$(CONFIG)/noop-benchmark \
+ $(BINDIR)/$(CONFIG)/orphanable_test \
$(BINDIR)/$(CONFIG)/proto_server_reflection_test \
$(BINDIR)/$(CONFIG)/proto_utils_test \
$(BINDIR)/$(CONFIG)/qps_interarrival_test \
@@ -1733,6 +1735,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/metrics_client \
$(BINDIR)/$(CONFIG)/mock_test \
$(BINDIR)/$(CONFIG)/noop-benchmark \
+ $(BINDIR)/$(CONFIG)/orphanable_test \
$(BINDIR)/$(CONFIG)/proto_server_reflection_test \
$(BINDIR)/$(CONFIG)/proto_utils_test \
$(BINDIR)/$(CONFIG)/qps_interarrival_test \
@@ -2142,6 +2145,8 @@ test_cxx: buildtests_cxx
$(Q) $(BINDIR)/$(CONFIG)/mock_test || ( echo test mock_test failed ; exit 1 )
$(E) "[RUN] Testing noop-benchmark"
$(Q) $(BINDIR)/$(CONFIG)/noop-benchmark || ( echo test noop-benchmark failed ; exit 1 )
+ $(E) "[RUN] Testing orphanable_test"
+ $(Q) $(BINDIR)/$(CONFIG)/orphanable_test || ( echo test orphanable_test failed ; exit 1 )
$(E) "[RUN] Testing proto_server_reflection_test"
$(Q) $(BINDIR)/$(CONFIG)/proto_server_reflection_test || ( echo test proto_server_reflection_test failed ; exit 1 )
$(E) "[RUN] Testing proto_utils_test"
@@ -16141,6 +16146,49 @@ endif
endif
+ORPHANABLE_TEST_SRC = \
+ test/core/support/orphanable_test.cc \
+
+ORPHANABLE_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(ORPHANABLE_TEST_SRC))))
+ifeq ($(NO_SECURE),true)
+
+# You can't build secure targets if you don't have OpenSSL.
+
+$(BINDIR)/$(CONFIG)/orphanable_test: openssl_dep_error
+
+else
+
+
+
+
+ifeq ($(NO_PROTOBUF),true)
+
+# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.0.0+.
+
+$(BINDIR)/$(CONFIG)/orphanable_test: protobuf_dep_error
+
+else
+
+$(BINDIR)/$(CONFIG)/orphanable_test: $(PROTOBUF_DEP) $(ORPHANABLE_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+ $(E) "[LD] Linking $@"
+ $(Q) mkdir -p `dirname $@`
+ $(Q) $(LDXX) $(LDFLAGS) $(ORPHANABLE_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/orphanable_test
+
+endif
+
+endif
+
+$(OBJDIR)/$(CONFIG)/test/core/support/orphanable_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+
+deps_orphanable_test: $(ORPHANABLE_TEST_OBJS:.o=.dep)
+
+ifneq ($(NO_SECURE),true)
+ifneq ($(NO_DEPS),true)
+-include $(ORPHANABLE_TEST_OBJS:.o=.dep)
+endif
+endif
+
+
PROTO_SERVER_REFLECTION_TEST_SRC = \
test/cpp/end2end/proto_server_reflection_test.cc \
diff --git a/build.yaml b/build.yaml
index c3d54f1e8a..c9adc9b44e 100644
--- a/build.yaml
+++ b/build.yaml
@@ -397,6 +397,7 @@ filegroups:
- src/core/lib/slice/slice_internal.h
- src/core/lib/slice/slice_string_helpers.h
- src/core/lib/support/debug_location.h
+ - src/core/lib/support/orphanable.h
- src/core/lib/support/ref_counted.h
- src/core/lib/support/ref_counted_ptr.h
- src/core/lib/support/vector.h
@@ -4403,6 +4404,20 @@ targets:
deps:
- benchmark
defaults: benchmark
+- name: orphanable_test
+ gtest: true
+ build: test
+ language: c++
+ src:
+ - test/core/support/orphanable_test.cc
+ deps:
+ - grpc_test_util
+ - grpc++
+ - grpc
+ - gpr_test_util
+ - gpr
+ uses:
+ - grpc++_test
- name: proto_server_reflection_test
gtest: true
build: test
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index 358fad3d98..df5a4112a7 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -420,6 +420,7 @@ Pod::Spec.new do |s|
'src/core/lib/slice/slice_internal.h',
'src/core/lib/slice/slice_string_helpers.h',
'src/core/lib/support/debug_location.h',
+ 'src/core/lib/support/orphanable.h',
'src/core/lib/support/ref_counted.h',
'src/core/lib/support/ref_counted_ptr.h',
'src/core/lib/support/vector.h',
@@ -901,6 +902,7 @@ Pod::Spec.new do |s|
'src/core/lib/slice/slice_internal.h',
'src/core/lib/slice/slice_string_helpers.h',
'src/core/lib/support/debug_location.h',
+ 'src/core/lib/support/orphanable.h',
'src/core/lib/support/ref_counted.h',
'src/core/lib/support/ref_counted_ptr.h',
'src/core/lib/support/vector.h',
@@ -1085,6 +1087,6 @@ Pod::Spec.new do |s|
# TODO (mxyan): Instead of this hack, add include path "third_party" to C core's include path?
s.prepare_command = <<-END_OF_COMMAND
- find src/core/ -type f -exec sed -E -i '.back' 's;#include "third_party/nanopb/(.*)";#include <nanopb/\\1>;g' {} \\\;
+ find src/core/ -type f -exec sed -E -i'.back' 's;#include "third_party/nanopb/(.*)";#include <nanopb/\\1>;g' {} \\\;
END_OF_COMMAND
end
diff --git a/grpc.gemspec b/grpc.gemspec
index 7547bc85de..f8afaa5803 100644
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -346,6 +346,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/slice/slice_internal.h )
s.files += %w( src/core/lib/slice/slice_string_helpers.h )
s.files += %w( src/core/lib/support/debug_location.h )
+ s.files += %w( src/core/lib/support/orphanable.h )
s.files += %w( src/core/lib/support/ref_counted.h )
s.files += %w( src/core/lib/support/ref_counted_ptr.h )
s.files += %w( src/core/lib/support/vector.h )
diff --git a/package.xml b/package.xml
index ff3d0797ab..8c590348b6 100644
--- a/package.xml
+++ b/package.xml
@@ -358,6 +358,7 @@
<file baseinstalldir="/" name="src/core/lib/slice/slice_internal.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/slice/slice_string_helpers.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/support/debug_location.h" role="src" />
+ <file baseinstalldir="/" name="src/core/lib/support/orphanable.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/support/ref_counted.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/support/ref_counted_ptr.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/support/vector.h" role="src" />
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 3f3334d44a..bb29f65af9 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -1003,7 +1003,7 @@ static void create_subchannel_call_locked(grpc_call_element* elem,
grpc_error* error) {
channel_data* chand = (channel_data*)elem->channel_data;
call_data* calld = (call_data*)elem->call_data;
- const grpc_connected_subchannel_call_args call_args = {
+ const grpc_core::ConnectedSubchannel::CallArgs call_args = {
calld->pollent, // pollent
calld->path, // path
calld->call_start_time, // start_time
@@ -1012,8 +1012,8 @@ static void create_subchannel_call_locked(grpc_call_element* elem,
calld->pick.subchannel_call_context, // context
calld->call_combiner // call_combiner
};
- grpc_error* new_error = grpc_connected_subchannel_create_call(
- calld->pick.connected_subchannel, &call_args, &calld->subchannel_call);
+ grpc_error* new_error = calld->pick.connected_subchannel->CreateCall(
+ call_args, &calld->subchannel_call);
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
chand, calld, calld->subchannel_call, grpc_error_string(new_error));
@@ -1463,7 +1463,7 @@ static void cc_destroy_call_elem(grpc_call_element* elem,
}
GPR_ASSERT(calld->waiting_for_pick_batches_count == 0);
if (calld->pick.connected_subchannel != nullptr) {
- GRPC_CONNECTED_SUBCHANNEL_UNREF(calld->pick.connected_subchannel, "picked");
+ calld->pick.connected_subchannel.reset();
}
for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
if (calld->pick.subchannel_call_context[i].value != nullptr) {
diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h
index 1176a05b78..0cc0cb59c3 100644
--- a/src/core/ext/filters/client_channel/lb_policy.h
+++ b/src/core/ext/filters/client_channel/lb_policy.h
@@ -21,6 +21,7 @@
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/lib/iomgr/polling_entity.h"
+#include "src/core/lib/support/ref_counted_ptr.h"
#include "src/core/lib/transport/connectivity_state.h"
/** A load balancing policy: specified by a vtable and a struct (which
@@ -54,9 +55,9 @@ typedef struct grpc_lb_policy_pick_state {
grpc_linked_mdelem lb_token_mdelem_storage;
/// Closure to run when pick is complete, if not completed synchronously.
grpc_closure* on_complete;
- /// Will be set to the selected subchannel, or NULL on failure or when
+ /// Will be set to the selected subchannel, or nullptr on failure or when
/// the LB policy decides to drop the call.
- grpc_connected_subchannel* connected_subchannel;
+ grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> connected_subchannel;
/// Will be populated with context to pass to the subchannel call, if needed.
grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT];
/// Upon success, \a *user_data will be set to whatever opaque information
@@ -152,7 +153,8 @@ void grpc_lb_policy_shutdown_locked(grpc_lb_policy* policy,
int grpc_lb_policy_pick_locked(grpc_lb_policy* policy,
grpc_lb_policy_pick_state* pick);
-/** Perform a connected subchannel ping (see \a grpc_connected_subchannel_ping)
+/** Perform a connected subchannel ping (see \a
+ grpc_core::ConnectedSubchannel::Ping)
against one of the connected subchannels managed by \a policy. */
void grpc_lb_policy_ping_one_locked(grpc_lb_policy* policy,
grpc_closure* on_initiate,
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index 272b3617b2..a8c5fb9c1b 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -939,7 +939,7 @@ static void glb_shutdown_locked(grpc_lb_policy* pol,
}
gpr_free(pp);
} else {
- pp->pick->connected_subchannel = nullptr;
+ pp->pick->connected_subchannel.reset();
GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error));
}
pp = next;
@@ -976,7 +976,7 @@ static void glb_cancel_pick_locked(grpc_lb_policy* pol,
while (pp != nullptr) {
pending_pick* next = pp->next;
if (pp->pick == pick) {
- pick->connected_subchannel = nullptr;
+ pick->connected_subchannel.reset();
GRPC_CLOSURE_SCHED(&pp->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1));
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index 60385272cf..725b78d478 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -81,7 +81,7 @@ static void pf_shutdown_locked(grpc_lb_policy* pol,
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
}
} else {
- pick->connected_subchannel = nullptr;
+ pick->connected_subchannel.reset();
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error));
}
}
@@ -111,7 +111,7 @@ static void pf_cancel_pick_locked(grpc_lb_policy* pol,
while (pp != nullptr) {
grpc_lb_policy_pick_state* next = pp->next;
if (pp == pick) {
- pick->connected_subchannel = nullptr;
+ pick->connected_subchannel.reset();
GRPC_CLOSURE_SCHED(pick->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1));
@@ -176,8 +176,7 @@ static int pf_pick_locked(grpc_lb_policy* pol,
pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
// If we have a selected subchannel already, return synchronously.
if (p->selected != nullptr) {
- pick->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
- p->selected->connected_subchannel, "picked");
+ pick->connected_subchannel = p->selected->connected_subchannel;
return 1;
}
// No subchannel selected yet, so handle asynchronously.
@@ -217,8 +216,7 @@ static void pf_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate,
grpc_closure* on_ack) {
pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
if (p->selected) {
- grpc_connected_subchannel_ping(p->selected->connected_subchannel,
- on_initiate, on_ack);
+ p->selected->connected_subchannel->Ping(on_initiate, on_ack);
} else {
GRPC_CLOSURE_SCHED(on_initiate,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected"));
@@ -297,8 +295,7 @@ static void pf_update_locked(grpc_lb_policy* policy,
subchannel_list->num_subchannels);
}
if (p->selected->connected_subchannel != nullptr) {
- sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
- p->selected->connected_subchannel, "pf_update_includes_selected");
+ sd->connected_subchannel = p->selected->connected_subchannel;
}
p->selected = sd;
if (p->subchannel_list != nullptr) {
@@ -410,8 +407,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
// re-resolution is introduced. But we need to investigate whether we
// really want to take any action instead of waiting for the selected
// subchannel reconnecting.
- if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN ||
- sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
+ if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// If the selected channel goes bad, request a re-resolution.
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE,
@@ -419,20 +416,19 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
p->started_picking = false;
grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace,
GRPC_ERROR_NONE);
+ // in transient failure. Rely on re-resolution to recover.
+ p->selected = nullptr;
+ grpc_lb_subchannel_data_stop_connectivity_watch(sd);
+ grpc_lb_subchannel_list_unref_for_connectivity_watch(
+ sd->subchannel_list, "pf_selected_shutdown");
+ grpc_lb_subchannel_data_unref_subchannel(
+ sd, "pf_selected_shutdown"); // Unrefs connected subchannel
} else {
grpc_connectivity_state_set(&p->state_tracker,
sd->curr_connectivity_state,
GRPC_ERROR_REF(error), "selected_changed");
- }
- if (sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
// Renew notification.
grpc_lb_subchannel_data_start_connectivity_watch(sd);
- } else {
- p->selected = nullptr;
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- grpc_lb_subchannel_list_unref_for_connectivity_watch(
- sd->subchannel_list, "pf_selected_shutdown");
- grpc_lb_subchannel_data_unref_subchannel(sd, "pf_selected_shutdown");
}
}
return;
@@ -450,6 +446,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
case GRPC_CHANNEL_READY: {
// Case 2. Promote p->latest_pending_subchannel_list to
// p->subchannel_list.
+ sd->connected_subchannel =
+ grpc_subchannel_get_connected_subchannel(sd->subchannel);
if (sd->subchannel_list == p->latest_pending_subchannel_list) {
GPR_ASSERT(p->subchannel_list != nullptr);
grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list,
@@ -460,9 +458,6 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
// Cases 1 and 2.
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "connecting_ready");
- sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
- grpc_subchannel_get_connected_subchannel(sd->subchannel),
- "connected");
p->selected = sd;
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", (void*)p,
@@ -474,8 +469,7 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
grpc_lb_policy_pick_state* pick;
while ((pick = p->pending_picks)) {
p->pending_picks = pick->next;
- pick->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
- p->selected->connected_subchannel, "picked");
+ pick->connected_subchannel = p->selected->connected_subchannel;
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO,
"Servicing pending pick with selected subchannel %p",
@@ -520,39 +514,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
grpc_lb_subchannel_data_start_connectivity_watch(sd);
break;
}
- case GRPC_CHANNEL_SHUTDOWN: {
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- grpc_lb_subchannel_data_unref_subchannel(sd, "pf_candidate_shutdown");
- // Advance to next subchannel and check its state.
- grpc_lb_subchannel_data* original_sd = sd;
- do {
- sd->subchannel_list->checking_subchannel =
- (sd->subchannel_list->checking_subchannel + 1) %
- sd->subchannel_list->num_subchannels;
- sd = &sd->subchannel_list
- ->subchannels[sd->subchannel_list->checking_subchannel];
- } while (sd->subchannel == nullptr && sd != original_sd);
- if (sd == original_sd) {
- grpc_lb_subchannel_list_unref_for_connectivity_watch(
- sd->subchannel_list, "pf_exhausted_subchannels");
- if (sd->subchannel_list == p->subchannel_list) {
- grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE,
- GRPC_ERROR_NONE,
- "exhausted_subchannels+reresolve");
- p->started_picking = false;
- grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace,
- GRPC_ERROR_NONE);
- }
- } else {
- if (sd->subchannel_list == p->subchannel_list) {
- grpc_connectivity_state_set(
- &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
- GRPC_ERROR_REF(error), "subchannel_failed");
- }
- // Reuses the connectivity refs from the previous watch.
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
- }
- }
+ case GRPC_CHANNEL_SHUTDOWN:
+ GPR_UNREACHABLE_CODE(break);
}
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index 92c7d5bd5d..dca345566a 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -36,6 +36,7 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
+#include "src/core/lib/support/ref_counted_ptr.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/static_metadata.h"
@@ -127,7 +128,7 @@ static void update_last_ready_subchannel_index_locked(round_robin_lb_policy* p,
(void*)p, (unsigned long)last_ready_index,
(void*)p->subchannel_list->subchannels[last_ready_index].subchannel,
(void*)p->subchannel_list->subchannels[last_ready_index]
- .connected_subchannel);
+ .connected_subchannel.get());
}
}
@@ -162,7 +163,7 @@ static void rr_shutdown_locked(grpc_lb_policy* pol,
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
}
} else {
- pick->connected_subchannel = nullptr;
+ pick->connected_subchannel.reset();
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error));
}
}
@@ -192,7 +193,7 @@ static void rr_cancel_pick_locked(grpc_lb_policy* pol,
while (pp != nullptr) {
grpc_lb_policy_pick_state* next = pp->next;
if (pp == pick) {
- pick->connected_subchannel = nullptr;
+ pick->connected_subchannel.reset();
GRPC_CLOSURE_SCHED(pick->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick cancelled", &error, 1));
@@ -216,7 +217,7 @@ static void rr_cancel_picks_locked(grpc_lb_policy* pol,
grpc_lb_policy_pick_state* next = pick->next;
if ((pick->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
- pick->connected_subchannel = nullptr;
+ pick->connected_subchannel.reset();
GRPC_CLOSURE_SCHED(pick->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick cancelled", &error, 1));
@@ -262,8 +263,7 @@ static int rr_pick_locked(grpc_lb_policy* pol,
/* readily available, report right away */
grpc_lb_subchannel_data* sd =
&p->subchannel_list->subchannels[next_ready_index];
- pick->connected_subchannel =
- GRPC_CONNECTED_SUBCHANNEL_REF(sd->connected_subchannel, "rr_picked");
+ pick->connected_subchannel = sd->connected_subchannel;
if (pick->user_data != nullptr) {
*pick->user_data = sd->user_data;
}
@@ -272,8 +272,8 @@ static int rr_pick_locked(grpc_lb_policy* pol,
GPR_DEBUG,
"[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, "
"index %" PRIuPTR ")",
- p, sd->subchannel, pick->connected_subchannel, sd->subchannel_list,
- next_ready_index);
+ p, sd->subchannel, pick->connected_subchannel.get(),
+ sd->subchannel_list, next_ready_index);
}
/* only advance the last picked pointer if the selection was used */
update_last_ready_subchannel_index_locked(p, next_ready_index);
@@ -291,15 +291,14 @@ static int rr_pick_locked(grpc_lb_policy* pol,
static void update_state_counters_locked(grpc_lb_subchannel_data* sd) {
grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list;
+ GPR_ASSERT(sd->prev_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
+ GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) {
GPR_ASSERT(subchannel_list->num_ready > 0);
--subchannel_list->num_ready;
} else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
GPR_ASSERT(subchannel_list->num_transient_failures > 0);
--subchannel_list->num_transient_failures;
- } else if (sd->prev_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
- GPR_ASSERT(subchannel_list->num_shutdown > 0);
- --subchannel_list->num_shutdown;
} else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) {
GPR_ASSERT(subchannel_list->num_idle > 0);
--subchannel_list->num_idle;
@@ -309,8 +308,6 @@ static void update_state_counters_locked(grpc_lb_subchannel_data* sd) {
++subchannel_list->num_ready;
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
++subchannel_list->num_transient_failures;
- } else if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
- ++subchannel_list->num_shutdown;
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) {
++subchannel_list->num_idle;
}
@@ -410,6 +407,7 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
// either the current or latest pending subchannel lists.
GPR_ASSERT(sd->subchannel_list == p->subchannel_list ||
sd->subchannel_list == p->latest_pending_subchannel_list);
+ GPR_ASSERT(sd->pending_connectivity_state_unsafe != GRPC_CHANNEL_SHUTDOWN);
// Now that we're inside the combiner, copy the pending connectivity
// state (which was set by the connectivity state watcher) to
// curr_connectivity_state, which is what we use inside of the combiner.
@@ -417,18 +415,17 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
// Update state counters and new overall state.
update_state_counters_locked(sd);
update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error));
- // If the sd's new state is SHUTDOWN, unref the subchannel.
- if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
- grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- grpc_lb_subchannel_data_unref_subchannel(sd, "rr_connectivity_shutdown");
- grpc_lb_subchannel_list_unref_for_connectivity_watch(
- sd->subchannel_list, "rr_connectivity_shutdown");
- } else { // sd not in SHUTDOWN
- if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
+ // If the sd's new state is TRANSIENT_FAILURE, unref the *connected*
+ // subchannel, if any.
+ switch (sd->curr_connectivity_state) {
+ case GRPC_CHANNEL_TRANSIENT_FAILURE: {
+ sd->connected_subchannel.reset();
+ break;
+ }
+ case GRPC_CHANNEL_READY: {
if (sd->connected_subchannel == nullptr) {
- sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
- grpc_subchannel_get_connected_subchannel(sd->subchannel),
- "connected");
+ sd->connected_subchannel =
+ grpc_subchannel_get_connected_subchannel(sd->subchannel);
}
if (sd->subchannel_list != p->subchannel_list) {
// promote sd->subchannel_list to p->subchannel_list.
@@ -471,8 +468,7 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
grpc_lb_policy_pick_state* pick;
while ((pick = p->pending_picks)) {
p->pending_picks = pick->next;
- pick->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
- selected->connected_subchannel, "rr_picked");
+ pick->connected_subchannel = selected->connected_subchannel;
if (pick->user_data != nullptr) {
*pick->user_data = selected->user_data;
}
@@ -485,10 +481,15 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
}
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
}
+ break;
}
- // Renew notification.
- grpc_lb_subchannel_data_start_connectivity_watch(sd);
+ case GRPC_CHANNEL_SHUTDOWN:
+ GPR_UNREACHABLE_CODE();
+ case GRPC_CHANNEL_CONNECTING:
+ case GRPC_CHANNEL_IDLE:; // fallthrough
}
+ // Renew notification.
+ grpc_lb_subchannel_data_start_connectivity_watch(sd);
}
static grpc_connectivity_state rr_check_connectivity_locked(
@@ -512,10 +513,9 @@ static void rr_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate,
if (next_ready_index < p->subchannel_list->num_subchannels) {
grpc_lb_subchannel_data* selected =
&p->subchannel_list->subchannels[next_ready_index];
- grpc_connected_subchannel* target = GRPC_CONNECTED_SUBCHANNEL_REF(
- selected->connected_subchannel, "rr_ping");
- grpc_connected_subchannel_ping(target, on_initiate, on_ack);
- GRPC_CONNECTED_SUBCHANNEL_UNREF(target, "rr_ping");
+ grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> target =
+ selected->connected_subchannel;
+ target->Ping(on_initiate, on_ack);
} else {
GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Round Robin not connected"));
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
index 5ce1298afc..fa2ffcc796 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
@@ -42,10 +42,7 @@ void grpc_lb_subchannel_data_unref_subchannel(grpc_lb_subchannel_data* sd,
}
GRPC_SUBCHANNEL_UNREF(sd->subchannel, reason);
sd->subchannel = nullptr;
- if (sd->connected_subchannel != nullptr) {
- GRPC_CONNECTED_SUBCHANNEL_UNREF(sd->connected_subchannel, reason);
- sd->connected_subchannel = nullptr;
- }
+ sd->connected_subchannel.reset();
if (sd->user_data != nullptr) {
GPR_ASSERT(sd->user_data_vtable != nullptr);
sd->user_data_vtable->destroy(sd->user_data);
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
index 0f8cea9347..f146c724e0 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@@ -22,6 +22,7 @@
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/support/ref_counted_ptr.h"
#include "src/core/lib/transport/connectivity_state.h"
// TODO(roth): This code is intended to be shared between pick_first and
@@ -43,7 +44,7 @@ typedef struct {
grpc_lb_subchannel_list* subchannel_list;
/** subchannel itself */
grpc_subchannel* subchannel;
- grpc_connected_subchannel* connected_subchannel;
+ grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> connected_subchannel;
/** Is a connectivity notification pending? */
bool connectivity_notification_pending;
/** notification that connectivity has changed on subchannel */
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc
index a604c55c58..5b8a14b9e7 100644
--- a/src/core/ext/filters/client_channel/subchannel.cc
+++ b/src/core/ext/filters/client_channel/subchannel.cc
@@ -41,6 +41,7 @@
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/support/debug_location.h"
#include "src/core/lib/support/manual_constructor.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/channel_init.h"
@@ -55,10 +56,6 @@
#define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
#define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
-#define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \
- ((grpc_connected_subchannel*)(gpr_atm_##barrier##_load( \
- &(subchannel)->connected_subchannel)))
-
namespace {
struct state_watcher {
grpc_closure closure;
@@ -98,7 +95,7 @@ struct grpc_subchannel {
grpc_connect_out_args connecting_result;
/** callback for connection finishing */
- grpc_closure connected;
+ grpc_closure on_connected;
/** callback for our alarm */
grpc_closure on_alarm;
@@ -107,12 +104,13 @@ struct grpc_subchannel {
being setup */
grpc_pollset_set* pollset_set;
- /** active connection, or null; of type grpc_connected_subchannel */
- gpr_atm connected_subchannel;
-
/** mutex protecting remaining elements */
gpr_mu mu;
+ /** active connection, or null; of type grpc_core::ConnectedSubchannel
+ */
+ grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> connected_subchannel;
+
/** have we seen a disconnection? */
bool disconnected;
/** are we connecting */
@@ -136,16 +134,15 @@ struct grpc_subchannel {
};
struct grpc_subchannel_call {
- grpc_connected_subchannel* connection;
+ grpc_core::ConnectedSubchannel* connection;
grpc_closure* schedule_closure_after_destroy;
};
#define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack*)((call) + 1))
-#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack*)(con))
#define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \
(((grpc_subchannel_call*)(callstack)) - 1)
-static void subchannel_connected(void* subchannel, grpc_error* error);
+static void on_subchannel_connected(void* subchannel, grpc_error* error);
#ifndef NDEBUG
#define REF_REASON reason
@@ -163,20 +160,9 @@ static void subchannel_connected(void* subchannel, grpc_error* error);
*/
static void connection_destroy(void* arg, grpc_error* error) {
- grpc_connected_subchannel* c = (grpc_connected_subchannel*)arg;
- grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c));
- gpr_free(c);
-}
-
-grpc_connected_subchannel* grpc_connected_subchannel_ref(
- grpc_connected_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
- GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON);
- return c;
-}
-
-void grpc_connected_subchannel_unref(
- grpc_connected_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
- GRPC_CHANNEL_STACK_UNREF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON);
+ grpc_channel_stack* stk = (grpc_channel_stack*)arg;
+ grpc_channel_stack_destroy(stk);
+ gpr_free(stk);
}
/*
@@ -243,18 +229,13 @@ grpc_subchannel* grpc_subchannel_ref_from_weak_ref(
}
static void disconnect(grpc_subchannel* c) {
- grpc_connected_subchannel* con;
grpc_subchannel_index_unregister(c->key, c);
gpr_mu_lock(&c->mu);
GPR_ASSERT(!c->disconnected);
c->disconnected = true;
grpc_connector_shutdown(c->connector, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Subchannel disconnected"));
- con = GET_CONNECTED_SUBCHANNEL(c, no_barrier);
- if (con != nullptr) {
- GRPC_CONNECTED_SUBCHANNEL_UNREF(con, "connection");
- gpr_atm_no_barrier_store(&c->connected_subchannel, (gpr_atm)0xdeadbeef);
- }
+ c->connected_subchannel.reset();
gpr_mu_unlock(&c->mu);
}
@@ -374,7 +355,7 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
if (new_args != nullptr) grpc_channel_args_destroy(new_args);
c->root_external_state_watcher.next = c->root_external_state_watcher.prev =
&c->root_external_state_watcher;
- GRPC_CLOSURE_INIT(&c->connected, subchannel_connected, c,
+ GRPC_CLOSURE_INIT(&c->on_connected, on_subchannel_connected, c,
grpc_schedule_on_exec_ctx);
grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
"subchannel");
@@ -397,7 +378,7 @@ static void continue_connect_locked(grpc_subchannel* c) {
grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING,
GRPC_ERROR_NONE, "state_change");
grpc_connector_connect(c->connector, &args, &c->connecting_result,
- &c->connected);
+ &c->on_connected);
}
grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel* c,
@@ -458,7 +439,7 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) {
return;
}
- if (GET_CONNECTED_SUBCHANNEL(c, no_barrier) != nullptr) {
+ if (c->connected_subchannel != nullptr) {
/* Already connected: don't restart */
return;
}
@@ -481,9 +462,10 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) {
const grpc_millis time_til_next =
c->next_attempt_deadline - grpc_core::ExecCtx::Get()->Now();
if (time_til_next <= 0) {
- gpr_log(GPR_INFO, "Retry immediately");
+ gpr_log(GPR_INFO, "Subchannel %p: Retry immediately", c);
} else {
- gpr_log(GPR_INFO, "Retry in %" PRIdPTR " milliseconds", time_til_next);
+ gpr_log(GPR_INFO, "Subchannel %p: Retry in %" PRIdPTR " milliseconds", c,
+ time_til_next);
}
GRPC_CLOSURE_INIT(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx);
grpc_timer_init(&c->alarm, c->next_attempt_deadline, &c->on_alarm);
@@ -527,75 +509,56 @@ void grpc_subchannel_notify_on_state_change(
}
}
-void grpc_connected_subchannel_process_transport_op(
- grpc_connected_subchannel* con, grpc_transport_op* op) {
- grpc_channel_stack* channel_stack = CHANNEL_STACK_FROM_CONNECTION(con);
- grpc_channel_element* top_elem = grpc_channel_stack_element(channel_stack, 0);
- top_elem->filter->start_transport_op(top_elem, op);
-}
-
-static void subchannel_on_child_state_changed(void* p, grpc_error* error) {
- state_watcher* sw = (state_watcher*)p;
- grpc_subchannel* c = sw->subchannel;
+static void on_connected_subchannel_connectivity_changed(void* p,
+ grpc_error* error) {
+ state_watcher* connected_subchannel_watcher = (state_watcher*)p;
+ grpc_subchannel* c = connected_subchannel_watcher->subchannel;
gpr_mu* mu = &c->mu;
gpr_mu_lock(mu);
- /* if we failed just leave this closure */
- if (sw->connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- /* any errors on a subchannel ==> we're done, create a new one */
- sw->connectivity_state = GRPC_CHANNEL_SHUTDOWN;
- }
- grpc_connectivity_state_set(&c->state_tracker, sw->connectivity_state,
- GRPC_ERROR_REF(error), "reflect_child");
- if (sw->connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
- grpc_connected_subchannel_notify_on_state_change(
- GET_CONNECTED_SUBCHANNEL(c, no_barrier), nullptr,
- &sw->connectivity_state, &sw->closure);
- GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
- sw = nullptr;
+ switch (connected_subchannel_watcher->connectivity_state) {
+ case GRPC_CHANNEL_TRANSIENT_FAILURE:
+ case GRPC_CHANNEL_SHUTDOWN: {
+ if (!c->disconnected && c->connected_subchannel != nullptr) {
+ if (grpc_trace_stream_refcount.enabled()) {
+ gpr_log(GPR_INFO,
+ "Connected subchannel %p of subchannel %p has gone into %s. "
+ "Attempting to reconnect.",
+ c->connected_subchannel.get(), c,
+ grpc_connectivity_state_name(
+ connected_subchannel_watcher->connectivity_state));
+ }
+ c->connected_subchannel.reset();
+ grpc_connectivity_state_set(&c->state_tracker,
+ GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_REF(error), "reflect_child");
+ c->backoff_begun = false;
+ c->backoff->Reset();
+ maybe_start_connecting_locked(c);
+ } else {
+ connected_subchannel_watcher->connectivity_state =
+ GRPC_CHANNEL_SHUTDOWN;
+ }
+ break;
+ }
+ default: {
+ grpc_connectivity_state_set(
+ &c->state_tracker, connected_subchannel_watcher->connectivity_state,
+ GRPC_ERROR_REF(error), "reflect_child");
+ GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
+ c->connected_subchannel->NotifyOnStateChange(
+ nullptr, &connected_subchannel_watcher->connectivity_state,
+ &connected_subchannel_watcher->closure);
+ connected_subchannel_watcher = nullptr;
+ }
}
-
gpr_mu_unlock(mu);
GRPC_SUBCHANNEL_WEAK_UNREF(c, "state_watcher");
- gpr_free(sw);
-}
-
-static void connected_subchannel_state_op(grpc_connected_subchannel* con,
- grpc_pollset_set* interested_parties,
- grpc_connectivity_state* state,
- grpc_closure* closure) {
- grpc_transport_op* op = grpc_make_transport_op(nullptr);
- grpc_channel_element* elem;
- op->connectivity_state = state;
- op->on_connectivity_state_change = closure;
- op->bind_pollset_set = interested_parties;
- elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0);
- elem->filter->start_transport_op(elem, op);
-}
-
-void grpc_connected_subchannel_notify_on_state_change(
- grpc_connected_subchannel* con, grpc_pollset_set* interested_parties,
- grpc_connectivity_state* state, grpc_closure* closure) {
- connected_subchannel_state_op(con, interested_parties, state, closure);
-}
-
-void grpc_connected_subchannel_ping(grpc_connected_subchannel* con,
- grpc_closure* on_initiate,
- grpc_closure* on_ack) {
- grpc_transport_op* op = grpc_make_transport_op(nullptr);
- grpc_channel_element* elem;
- op->send_ping.on_initiate = on_initiate;
- op->send_ping.on_ack = on_ack;
- elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0);
- elem->filter->start_transport_op(elem, op);
+ gpr_free(connected_subchannel_watcher);
}
static bool publish_transport_locked(grpc_subchannel* c) {
- grpc_connected_subchannel* con;
- grpc_channel_stack* stk;
- state_watcher* sw_subchannel;
-
/* construct channel stack */
grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create();
grpc_channel_stack_builder_set_channel_arguments(
@@ -607,8 +570,9 @@ static bool publish_transport_locked(grpc_subchannel* c) {
grpc_channel_stack_builder_destroy(builder);
return false;
}
+ grpc_channel_stack* stk;
grpc_error* error = grpc_channel_stack_builder_finish(
- builder, 0, 1, connection_destroy, nullptr, (void**)&con);
+ builder, 0, 1, connection_destroy, nullptr, (void**)&stk);
if (error != GRPC_ERROR_NONE) {
grpc_transport_destroy(c->connecting_result.transport);
gpr_log(GPR_ERROR, "error initializing subchannel stack: %s",
@@ -616,38 +580,37 @@ static bool publish_transport_locked(grpc_subchannel* c) {
GRPC_ERROR_UNREF(error);
return false;
}
- stk = CHANNEL_STACK_FROM_CONNECTION(con);
memset(&c->connecting_result, 0, sizeof(c->connecting_result));
/* initialize state watcher */
- sw_subchannel = (state_watcher*)gpr_malloc(sizeof(*sw_subchannel));
- sw_subchannel->subchannel = c;
- sw_subchannel->connectivity_state = GRPC_CHANNEL_READY;
- GRPC_CLOSURE_INIT(&sw_subchannel->closure, subchannel_on_child_state_changed,
- sw_subchannel, grpc_schedule_on_exec_ctx);
+ state_watcher* connected_subchannel_watcher =
+ (state_watcher*)gpr_zalloc(sizeof(*connected_subchannel_watcher));
+ connected_subchannel_watcher->subchannel = c;
+ connected_subchannel_watcher->connectivity_state = GRPC_CHANNEL_READY;
+ GRPC_CLOSURE_INIT(&connected_subchannel_watcher->closure,
+ on_connected_subchannel_connectivity_changed,
+ connected_subchannel_watcher, grpc_schedule_on_exec_ctx);
if (c->disconnected) {
- gpr_free(sw_subchannel);
+ gpr_free(connected_subchannel_watcher);
grpc_channel_stack_destroy(stk);
- gpr_free(con);
+ gpr_free(stk);
return false;
}
/* publish */
- /* TODO(ctiller): this full barrier seems to clear up a TSAN failure.
- I'd have expected the rel_cas below to be enough, but
- seemingly it's not.
- Re-evaluate if we really need this. */
- gpr_atm_full_barrier();
- GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con));
+ c->connected_subchannel.reset(
+ grpc_core::New<grpc_core::ConnectedSubchannel>(stk));
+ gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p",
+ c->connected_subchannel.get(), c);
/* setup subchannel watching connected subchannel for changes; subchannel
ref for connecting is donated to the state watcher */
GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
- grpc_connected_subchannel_notify_on_state_change(
- con, c->pollset_set, &sw_subchannel->connectivity_state,
- &sw_subchannel->closure);
+ c->connected_subchannel->NotifyOnStateChange(
+ c->pollset_set, &connected_subchannel_watcher->connectivity_state,
+ &connected_subchannel_watcher->closure);
/* signal completion */
grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_READY,
@@ -655,11 +618,11 @@ static bool publish_transport_locked(grpc_subchannel* c) {
return true;
}
-static void subchannel_connected(void* arg, grpc_error* error) {
+static void on_subchannel_connected(void* arg, grpc_error* error) {
grpc_subchannel* c = (grpc_subchannel*)arg;
grpc_channel_args* delete_channel_args = c->connecting_result.channel_args;
- GRPC_SUBCHANNEL_WEAK_REF(c, "connected");
+ GRPC_SUBCHANNEL_WEAK_REF(c, "on_subchannel_connected");
gpr_mu_lock(&c->mu);
c->connecting = false;
if (c->connecting_result.transport != nullptr &&
@@ -694,10 +657,10 @@ static void subchannel_call_destroy(void* call, grpc_error* error) {
grpc_subchannel_call* c = (grpc_subchannel_call*)call;
GPR_ASSERT(c->schedule_closure_after_destroy != nullptr);
GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0);
- grpc_connected_subchannel* connection = c->connection;
+ grpc_core::ConnectedSubchannel* connection = c->connection;
grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), nullptr,
c->schedule_closure_after_destroy);
- GRPC_CONNECTED_SUBCHANNEL_UNREF(connection, "subchannel_call");
+ connection->Unref(DEBUG_LOCATION, "subchannel_call");
GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0);
}
@@ -728,9 +691,12 @@ void grpc_subchannel_call_process_op(grpc_subchannel_call* call,
GPR_TIMER_END("grpc_subchannel_call_process_op", 0);
}
-grpc_connected_subchannel* grpc_subchannel_get_connected_subchannel(
- grpc_subchannel* c) {
- return GET_CONNECTED_SUBCHANNEL(c, acq);
+grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel>
+grpc_subchannel_get_connected_subchannel(grpc_subchannel* c) {
+ gpr_mu_lock(&c->mu);
+ auto copy = c->connected_subchannel;
+ gpr_mu_unlock(&c->mu);
+ return copy;
}
const grpc_subchannel_key* grpc_subchannel_get_key(
@@ -738,36 +704,6 @@ const grpc_subchannel_key* grpc_subchannel_get_key(
return subchannel->key;
}
-grpc_error* grpc_connected_subchannel_create_call(
- grpc_connected_subchannel* con,
- const grpc_connected_subchannel_call_args* args,
- grpc_subchannel_call** call) {
- grpc_channel_stack* chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
- *call = (grpc_subchannel_call*)gpr_arena_alloc(
- args->arena, sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
- grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
- (*call)->connection = GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call");
- const grpc_call_element_args call_args = {
- callstk, /* call_stack */
- nullptr, /* server_transport_data */
- args->context, /* context */
- args->path, /* path */
- args->start_time, /* start_time */
- args->deadline, /* deadline */
- args->arena, /* arena */
- args->call_combiner /* call_combiner */
- };
- grpc_error* error = grpc_call_stack_init(chanstk, 1, subchannel_call_destroy,
- *call, &call_args);
- if (error != GRPC_ERROR_NONE) {
- const char* error_string = grpc_error_string(error);
- gpr_log(GPR_ERROR, "error: %s", error_string);
- return error;
- }
- grpc_call_stack_set_pollset_or_pollset_set(callstk, args->pollent);
- return GRPC_ERROR_NONE;
-}
-
grpc_call_stack* grpc_subchannel_call_get_call_stack(
grpc_subchannel_call* subchannel_call) {
return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call);
@@ -803,3 +739,64 @@ grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) {
(char*)GRPC_ARG_SUBCHANNEL_ADDRESS,
addr->len > 0 ? grpc_sockaddr_to_uri(addr) : gpr_strdup(""));
}
+
+namespace grpc_core {
+ConnectedSubchannel::ConnectedSubchannel(grpc_channel_stack* channel_stack)
+ : grpc_core::RefCountedWithTracing(&grpc_trace_stream_refcount),
+ channel_stack_(channel_stack) {}
+
+ConnectedSubchannel::~ConnectedSubchannel() {
+ GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor");
+}
+
+void ConnectedSubchannel::NotifyOnStateChange(
+ grpc_pollset_set* interested_parties, grpc_connectivity_state* state,
+ grpc_closure* closure) {
+ grpc_transport_op* op = grpc_make_transport_op(nullptr);
+ grpc_channel_element* elem;
+ op->connectivity_state = state;
+ op->on_connectivity_state_change = closure;
+ op->bind_pollset_set = interested_parties;
+ elem = grpc_channel_stack_element(channel_stack_, 0);
+ elem->filter->start_transport_op(elem, op);
+}
+
+void ConnectedSubchannel::Ping(grpc_closure* on_initiate,
+ grpc_closure* on_ack) {
+ grpc_transport_op* op = grpc_make_transport_op(nullptr);
+ grpc_channel_element* elem;
+ op->send_ping.on_initiate = on_initiate;
+ op->send_ping.on_ack = on_ack;
+ elem = grpc_channel_stack_element(channel_stack_, 0);
+ elem->filter->start_transport_op(elem, op);
+}
+
+grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args,
+ grpc_subchannel_call** call) {
+ *call = (grpc_subchannel_call*)gpr_arena_alloc(
+ args.arena,
+ sizeof(grpc_subchannel_call) + channel_stack_->call_stack_size);
+ grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
+ Ref(DEBUG_LOCATION, "subchannel_call");
+ (*call)->connection = this;
+ const grpc_call_element_args call_args = {
+ callstk, /* call_stack */
+ nullptr, /* server_transport_data */
+ args.context, /* context */
+ args.path, /* path */
+ args.start_time, /* start_time */
+ args.deadline, /* deadline */
+ args.arena, /* arena */
+ args.call_combiner /* call_combiner */
+ };
+ grpc_error* error = grpc_call_stack_init(
+ channel_stack_, 1, subchannel_call_destroy, *call, &call_args);
+ if (error != GRPC_ERROR_NONE) {
+ const char* error_string = grpc_error_string(error);
+ gpr_log(GPR_ERROR, "error: %s", error_string);
+ return error;
+ }
+ grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent);
+ return GRPC_ERROR_NONE;
+}
+} // namespace grpc_core
diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h
index 9d34fff07a..3bcc5c2432 100644
--- a/src/core/ext/filters/client_channel/subchannel.h
+++ b/src/core/ext/filters/client_channel/subchannel.h
@@ -23,6 +23,8 @@
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/support/arena.h"
+#include "src/core/lib/support/ref_counted.h"
+#include "src/core/lib/support/ref_counted_ptr.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/metadata.h"
@@ -32,7 +34,6 @@
/** A (sub-)channel that knows how to connect to exactly one target
address. Provides a target for load balancing. */
typedef struct grpc_subchannel grpc_subchannel;
-typedef struct grpc_connected_subchannel grpc_connected_subchannel;
typedef struct grpc_subchannel_call grpc_subchannel_call;
typedef struct grpc_subchannel_args grpc_subchannel_args;
typedef struct grpc_subchannel_key grpc_subchannel_key;
@@ -48,10 +49,6 @@ typedef struct grpc_subchannel_key grpc_subchannel_key;
grpc_subchannel_weak_ref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) \
grpc_subchannel_weak_unref((p), __FILE__, __LINE__, (r))
-#define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) \
- grpc_connected_subchannel_ref((p), __FILE__, __LINE__, (r))
-#define GRPC_CONNECTED_SUBCHANNEL_UNREF(p, r) \
- grpc_connected_subchannel_unref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_CALL_REF(p, r) \
grpc_subchannel_call_ref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_CALL_UNREF(p, r) \
@@ -65,14 +62,39 @@ typedef struct grpc_subchannel_key grpc_subchannel_key;
#define GRPC_SUBCHANNEL_UNREF(p, r) grpc_subchannel_unref((p))
#define GRPC_SUBCHANNEL_WEAK_REF(p, r) grpc_subchannel_weak_ref((p))
#define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) grpc_subchannel_weak_unref((p))
-#define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) grpc_connected_subchannel_ref((p))
-#define GRPC_CONNECTED_SUBCHANNEL_UNREF(p, r) \
- grpc_connected_subchannel_unref((p))
#define GRPC_SUBCHANNEL_CALL_REF(p, r) grpc_subchannel_call_ref((p))
#define GRPC_SUBCHANNEL_CALL_UNREF(p, r) grpc_subchannel_call_unref((p))
#define GRPC_SUBCHANNEL_REF_EXTRA_ARGS
#endif
+namespace grpc_core {
+class ConnectedSubchannel : public grpc_core::RefCountedWithTracing {
+ public:
+ struct CallArgs {
+ grpc_polling_entity* pollent;
+ grpc_slice path;
+ gpr_timespec start_time;
+ grpc_millis deadline;
+ gpr_arena* arena;
+ grpc_call_context_element* context;
+ grpc_call_combiner* call_combiner;
+ };
+
+ explicit ConnectedSubchannel(grpc_channel_stack* channel_stack);
+ ~ConnectedSubchannel();
+
+ grpc_channel_stack* channel_stack() { return channel_stack_; }
+ void NotifyOnStateChange(grpc_pollset_set* interested_parties,
+ grpc_connectivity_state* state,
+ grpc_closure* closure);
+ void Ping(grpc_closure* on_initiate, grpc_closure* on_ack);
+ grpc_error* CreateCall(const CallArgs& args, grpc_subchannel_call** call);
+
+ private:
+ grpc_channel_stack* channel_stack_;
+};
+} // namespace grpc_core
+
grpc_subchannel* grpc_subchannel_ref(
grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
grpc_subchannel* grpc_subchannel_ref_from_weak_ref(
@@ -83,35 +105,11 @@ grpc_subchannel* grpc_subchannel_weak_ref(
grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_weak_unref(
grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
-grpc_connected_subchannel* grpc_connected_subchannel_ref(
- grpc_connected_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
-void grpc_connected_subchannel_unref(
- grpc_connected_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_call_ref(
grpc_subchannel_call* call GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_call_unref(
grpc_subchannel_call* call GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
-/** construct a subchannel call */
-typedef struct {
- grpc_polling_entity* pollent;
- grpc_slice path;
- gpr_timespec start_time;
- grpc_millis deadline;
- gpr_arena* arena;
- grpc_call_context_element* context;
- grpc_call_combiner* call_combiner;
-} grpc_connected_subchannel_call_args;
-
-grpc_error* grpc_connected_subchannel_create_call(
- grpc_connected_subchannel* connected_subchannel,
- const grpc_connected_subchannel_call_args* args,
- grpc_subchannel_call** subchannel_call);
-
-/** process a transport level op */
-void grpc_connected_subchannel_process_transport_op(
- grpc_connected_subchannel* subchannel, grpc_transport_op* op);
-
/** poll the current connectivity state of a channel */
grpc_connectivity_state grpc_subchannel_check_connectivity(
grpc_subchannel* channel, grpc_error** error);
@@ -121,17 +119,12 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(
void grpc_subchannel_notify_on_state_change(
grpc_subchannel* channel, grpc_pollset_set* interested_parties,
grpc_connectivity_state* state, grpc_closure* notify);
-void grpc_connected_subchannel_notify_on_state_change(
- grpc_connected_subchannel* channel, grpc_pollset_set* interested_parties,
- grpc_connectivity_state* state, grpc_closure* notify);
-void grpc_connected_subchannel_ping(grpc_connected_subchannel* channel,
- grpc_closure* on_initiate,
- grpc_closure* on_ack);
-
-/** retrieve the grpc_connected_subchannel - or NULL if called before
- the subchannel becomes connected */
-grpc_connected_subchannel* grpc_subchannel_get_connected_subchannel(
- grpc_subchannel* subchannel);
+
+/** retrieve the grpc_core::ConnectedSubchannel - or nullptr if not connected
+ * (which may happen before it initially connects or during transient failures)
+ * */
+grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel>
+grpc_subchannel_get_connected_subchannel(grpc_subchannel* c);
/** return the subchannel index key for \a subchannel */
const grpc_subchannel_key* grpc_subchannel_get_key(
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc
index 1ab7e516de..aa14d5931a 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.cc
+++ b/src/core/lib/iomgr/ev_epoll1_linux.cc
@@ -20,12 +20,14 @@
#include <grpc/support/log.h>
-/* This polling engine is only relevant on linux kernels supporting epoll() */
+/* This polling engine is only relevant on linux kernels supporting epoll
+ epoll_create() or epoll_create1() */
#ifdef GRPC_LINUX_EPOLL
#include "src/core/lib/iomgr/ev_epoll1_linux.h"
#include <assert.h>
#include <errno.h>
+#include <fcntl.h>
#include <limits.h>
#include <poll.h>
#include <pthread.h>
@@ -84,11 +86,28 @@ typedef struct epoll_set {
/* The global singleton epoll set */
static epoll_set g_epoll_set;
+static int epoll_create_and_cloexec() {
+#ifdef GRPC_LINUX_EPOLL_CREATE1
+ int fd = epoll_create1(EPOLL_CLOEXEC);
+ if (fd < 0) {
+ gpr_log(GPR_ERROR, "epoll_create1 unavailable");
+ }
+#else
+ int fd = epoll_create(MAX_EPOLL_EVENTS);
+ if (fd < 0) {
+ gpr_log(GPR_ERROR, "epoll_create unavailable");
+ } else if (fcntl(fd, F_SETFD, FD_CLOEXEC) != 0) {
+ gpr_log(GPR_ERROR, "fcntl following epoll_create failed");
+ return -1;
+ }
+#endif
+ return fd;
+}
+
/* Must be called *only* once */
static bool epoll_set_init() {
- g_epoll_set.epfd = epoll_create1(EPOLL_CLOEXEC);
+ g_epoll_set.epfd = epoll_create_and_cloexec();
if (g_epoll_set.epfd < 0) {
- gpr_log(GPR_ERROR, "epoll unavailable");
return false;
}
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc
index 5f5f45a7a5..e4a2d67e4b 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.cc
+++ b/src/core/lib/iomgr/ev_epollex_linux.cc
@@ -21,7 +21,7 @@
#include <grpc/support/log.h>
/* This polling engine is only relevant on linux kernels supporting epoll() */
-#ifdef GRPC_LINUX_EPOLL
+#ifdef GRPC_LINUX_EPOLL_CREATE1
#include "src/core/lib/iomgr/ev_epollex_linux.h"
@@ -1442,15 +1442,15 @@ const grpc_event_engine_vtable* grpc_init_epollex_linux(
return &vtable;
}
-#else /* defined(GRPC_LINUX_EPOLL) */
+#else /* defined(GRPC_LINUX_EPOLL_CREATE1) */
#if defined(GRPC_POSIX_SOCKET)
#include "src/core/lib/iomgr/ev_epollex_linux.h"
-/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
- * NULL */
+/* If GRPC_LINUX_EPOLL_CREATE1 is not defined, it means
+ epoll_create1 is not available. Return NULL */
const grpc_event_engine_vtable* grpc_init_epollex_linux(
bool explicitly_requested) {
return nullptr;
}
#endif /* defined(GRPC_POSIX_SOCKET) */
-#endif /* !defined(GRPC_LINUX_EPOLL) */
+#endif /* !defined(GRPC_LINUX_EPOLL_CREATE1) */
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc
index 8072a6cbed..3544d4f3a4 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.cc
+++ b/src/core/lib/iomgr/ev_epollsig_linux.cc
@@ -22,7 +22,7 @@
#include <grpc/support/log.h>
/* This polling engine is only relevant on linux kernels supporting epoll() */
-#ifdef GRPC_LINUX_EPOLL
+#ifdef GRPC_LINUX_EPOLL_CREATE1
#include "src/core/lib/iomgr/ev_epollsig_linux.h"
@@ -1725,11 +1725,11 @@ const grpc_event_engine_vtable* grpc_init_epollsig_linux(
return &vtable;
}
-#else /* defined(GRPC_LINUX_EPOLL) */
+#else /* defined(GRPC_LINUX_EPOLL_CREATE1) */
#if defined(GRPC_POSIX_SOCKET)
#include "src/core/lib/iomgr/ev_epollsig_linux.h"
-/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
- * NULL */
+/* If GRPC_LINUX_EPOLL_CREATE1 is not defined, it means
+ epoll_create1 is not available. Return NULL */
const grpc_event_engine_vtable* grpc_init_epollsig_linux(
bool explicit_request) {
return nullptr;
@@ -1737,4 +1737,4 @@ const grpc_event_engine_vtable* grpc_init_epollsig_linux(
#endif /* defined(GRPC_POSIX_SOCKET) */
void grpc_use_signal(int signum) {}
-#endif /* !defined(GRPC_LINUX_EPOLL) */
+#endif /* !defined(GRPC_LINUX_EPOLL_CREATE1) */
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.h b/src/core/lib/iomgr/ev_epollsig_linux.h
index 5b8aba9d9f..48178d3713 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.h
+++ b/src/core/lib/iomgr/ev_epollsig_linux.h
@@ -24,10 +24,10 @@
const grpc_event_engine_vtable* grpc_init_epollsig_linux(bool explicit_request);
-#ifdef GRPC_LINUX_EPOLL
+#ifdef GRPC_LINUX_EPOLL_CREATE1
void* grpc_fd_get_polling_island(grpc_fd* fd);
void* grpc_pollset_get_polling_island(grpc_pollset* ps);
bool grpc_are_polling_islands_equal(void* p, void* q);
-#endif /* defined(GRPC_LINUX_EPOLL) */
+#endif /* defined(GRPC_LINUX_EPOLL_CREATE1) */
#endif /* GRPC_CORE_LIB_IOMGR_EV_EPOLLSIG_LINUX_H */
diff --git a/src/core/lib/iomgr/is_epollexclusive_available.cc b/src/core/lib/iomgr/is_epollexclusive_available.cc
index e5803532e7..08f9cf2b69 100644
--- a/src/core/lib/iomgr/is_epollexclusive_available.cc
+++ b/src/core/lib/iomgr/is_epollexclusive_available.cc
@@ -20,7 +20,7 @@
#include "src/core/lib/iomgr/is_epollexclusive_available.h"
-#ifdef GRPC_LINUX_EPOLL
+#ifdef GRPC_LINUX_EPOLL_CREATE1
#include <grpc/support/log.h>
diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h
index 9fae8c0052..25090898ed 100644
--- a/src/core/lib/iomgr/port.h
+++ b/src/core/lib/iomgr/port.h
@@ -37,6 +37,7 @@
#define GRPC_POSIX_SOCKETUTILS 1
#define GRPC_POSIX_WAKEUP_FD 1
#define GRPC_TIMER_USE_GENERIC 1
+#define GRPC_LINUX_EPOLL 1
#elif defined(GPR_WINDOWS)
#define GRPC_TIMER_USE_GENERIC 1
#define GRPC_WINSOCK_SOCKET 1
@@ -67,8 +68,11 @@
#define GRPC_POSIX_WAKEUP_FD 1
#define GRPC_TIMER_USE_GENERIC 1
#ifdef __GLIBC_PREREQ
-#if __GLIBC_PREREQ(2, 9)
+#if __GLIBC_PREREQ(2, 4)
#define GRPC_LINUX_EPOLL 1
+#endif
+#if __GLIBC_PREREQ(2, 9)
+#define GRPC_LINUX_EPOLL_CREATE1 1
#define GRPC_LINUX_EVENTFD 1
#endif
#if __GLIBC_PREREQ(2, 10)
@@ -77,6 +81,7 @@
#endif
#ifndef __GLIBC__
#define GRPC_LINUX_EPOLL 1
+#define GRPC_LINUX_EPOLL_CREATE1 1
#define GRPC_LINUX_EVENTFD 1
#define GRPC_MSG_IOVLEN_TYPE int
#endif
diff --git a/src/core/lib/support/abstract.h b/src/core/lib/support/abstract.h
index 5498769a7d..1dffa30128 100644
--- a/src/core/lib/support/abstract.h
+++ b/src/core/lib/support/abstract.h
@@ -26,4 +26,9 @@
#define GRPC_ABSTRACT_BASE_CLASS \
static void operator delete(void* p) { abort(); }
+// gRPC currently can't depend on libstdc++, so we can't use "= 0" for
+// pure virtual methods. Instead, we use this macro.
+#define GRPC_ABSTRACT \
+ { GPR_ASSERT(false); }
+
#endif /* GRPC_CORE_LIB_SUPPORT_ABSTRACT_H */
diff --git a/src/core/lib/support/orphanable.h b/src/core/lib/support/orphanable.h
new file mode 100644
index 0000000000..2f537573fd
--- /dev/null
+++ b/src/core/lib/support/orphanable.h
@@ -0,0 +1,171 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_SUPPORT_ORPHANABLE_H
+#define GRPC_CORE_LIB_SUPPORT_ORPHANABLE_H
+
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+
+#include <memory>
+
+#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/support/abstract.h"
+#include "src/core/lib/support/debug_location.h"
+#include "src/core/lib/support/memory.h"
+
+namespace grpc_core {
+
+// A base class for orphanable objects, which have one external owner
+// but are not necessarily destroyed immediately when the external owner
+// gives up ownership. Instead, the owner calls the object's Orphan()
+// method, and the object then takes responsibility for its own cleanup
+// and destruction.
+class Orphanable {
+ public:
+ // Gives up ownership of the object. The implementation must arrange
+ // to eventually destroy the object without further interaction from the
+ // caller.
+ virtual void Orphan() GRPC_ABSTRACT;
+
+ // Not copyable or movable.
+ Orphanable(const Orphanable&) = delete;
+ Orphanable& operator=(const Orphanable&) = delete;
+
+ GRPC_ABSTRACT_BASE_CLASS
+
+ protected:
+ Orphanable() {}
+ virtual ~Orphanable() {}
+};
+
+template <typename T>
+class OrphanableDelete {
+ public:
+ void operator()(T* p) { p->Orphan(); }
+};
+
+template <typename T, typename Deleter = OrphanableDelete<T>>
+using OrphanablePtr = std::unique_ptr<T, Deleter>;
+
+template <typename T, typename... Args>
+inline OrphanablePtr<T> MakeOrphanable(Args&&... args) {
+ return OrphanablePtr<T>(New<T>(std::forward<Args>(args)...));
+}
+
+// A type of Orphanable with internal ref-counting.
+class InternallyRefCounted : public Orphanable {
+ public:
+ // Not copyable nor movable.
+ InternallyRefCounted(const InternallyRefCounted&) = delete;
+ InternallyRefCounted& operator=(const InternallyRefCounted&) = delete;
+
+ GRPC_ABSTRACT_BASE_CLASS
+
+ protected:
+ InternallyRefCounted() { gpr_ref_init(&refs_, 1); }
+ virtual ~InternallyRefCounted() {}
+
+ void Ref() { gpr_ref(&refs_); }
+
+ void Unref() {
+ if (gpr_unref(&refs_)) {
+ Delete(this);
+ }
+ }
+
+ // Allow Delete() to access destructor.
+ template <typename T>
+ friend void Delete(T*);
+
+ private:
+ gpr_refcount refs_;
+};
+
+// An alternative version of the InternallyRefCounted base class that
+// supports tracing. This is intended to be used in cases where the
+// object will be handled both by idiomatic C++ code using smart
+// pointers and legacy code that is manually calling Ref() and Unref().
+// Once all of our code is converted to idiomatic C++, we may be able to
+// eliminate this class.
+class InternallyRefCountedWithTracing : public Orphanable {
+ public:
+ // Not copyable nor movable.
+ InternallyRefCountedWithTracing(const InternallyRefCountedWithTracing&) =
+ delete;
+ InternallyRefCountedWithTracing& operator=(
+ const InternallyRefCountedWithTracing&) = delete;
+
+ GRPC_ABSTRACT_BASE_CLASS
+
+ protected:
+ // Allow Delete() to access destructor.
+ template <typename T>
+ friend void Delete(T*);
+
+ InternallyRefCountedWithTracing()
+ : InternallyRefCountedWithTracing(static_cast<TraceFlag*>(nullptr)) {}
+
+ explicit InternallyRefCountedWithTracing(TraceFlag* trace_flag)
+ : trace_flag_(trace_flag) {
+ gpr_ref_init(&refs_, 1);
+ }
+
+#ifdef NDEBUG
+ explicit InternallyRefCountedWithTracing(DebugOnlyTraceFlag* trace_flag)
+ : InternallyRefCountedWithTracing() {}
+#endif
+
+ virtual ~InternallyRefCountedWithTracing() {}
+
+ void Ref() { gpr_ref(&refs_); }
+
+ void Ref(const DebugLocation& location, const char* reason) {
+ if (location.Log() && trace_flag_ != nullptr && trace_flag_->enabled()) {
+ gpr_atm old_refs = gpr_atm_no_barrier_load(&refs_.count);
+ gpr_log(GPR_DEBUG, "%s:%p %s:%d ref %" PRIdPTR " -> %" PRIdPTR " %s",
+ trace_flag_->name(), this, location.file(), location.line(),
+ old_refs, old_refs + 1, reason);
+ }
+ Ref();
+ }
+
+ void Unref() {
+ if (gpr_unref(&refs_)) {
+ Delete(this);
+ }
+ }
+
+ void Unref(const DebugLocation& location, const char* reason) {
+ if (location.Log() && trace_flag_ != nullptr && trace_flag_->enabled()) {
+ gpr_atm old_refs = gpr_atm_no_barrier_load(&refs_.count);
+ gpr_log(GPR_DEBUG, "%s:%p %s:%d unref %" PRIdPTR " -> %" PRIdPTR " %s",
+ trace_flag_->name(), this, location.file(), location.line(),
+ old_refs, old_refs - 1, reason);
+ }
+ Unref();
+ }
+
+ private:
+ TraceFlag* trace_flag_ = nullptr;
+ gpr_refcount refs_;
+};
+
+} // namespace grpc_core
+
+#endif /* GRPC_CORE_LIB_SUPPORT_ORPHANABLE_H */
diff --git a/src/core/lib/support/ref_counted.h b/src/core/lib/support/ref_counted.h
index 4c662f9119..48c11f7bbf 100644
--- a/src/core/lib/support/ref_counted.h
+++ b/src/core/lib/support/ref_counted.h
@@ -23,6 +23,7 @@
#include <grpc/support/sync.h>
#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/support/abstract.h"
#include "src/core/lib/support/debug_location.h"
#include "src/core/lib/support/memory.h"
@@ -45,6 +46,8 @@ class RefCounted {
RefCounted(const RefCounted&) = delete;
RefCounted& operator=(const RefCounted&) = delete;
+ GRPC_ABSTRACT_BASE_CLASS
+
protected:
// Allow Delete() to access destructor.
template <typename T>
@@ -98,18 +101,26 @@ class RefCountedWithTracing {
RefCountedWithTracing(const RefCountedWithTracing&) = delete;
RefCountedWithTracing& operator=(const RefCountedWithTracing&) = delete;
+ GRPC_ABSTRACT_BASE_CLASS
+
protected:
// Allow Delete() to access destructor.
template <typename T>
friend void Delete(T*);
- RefCountedWithTracing() : RefCountedWithTracing(nullptr) {}
+ RefCountedWithTracing()
+ : RefCountedWithTracing(static_cast<TraceFlag*>(nullptr)) {}
explicit RefCountedWithTracing(TraceFlag* trace_flag)
: trace_flag_(trace_flag) {
gpr_ref_init(&refs_, 1);
}
+#ifdef NDEBUG
+ explicit RefCountedWithTracing(DebugOnlyTraceFlag* trace_flag)
+ : RefCountedWithTracing() {}
+#endif
+
virtual ~RefCountedWithTracing() {}
private:
diff --git a/src/core/lib/support/ref_counted_ptr.h b/src/core/lib/support/ref_counted_ptr.h
index dc2385e369..76ff0bba66 100644
--- a/src/core/lib/support/ref_counted_ptr.h
+++ b/src/core/lib/support/ref_counted_ptr.h
@@ -76,6 +76,15 @@ class RefCountedPtr {
T& operator*() const { return *value_; }
T* operator->() const { return value_; }
+ bool operator==(const RefCountedPtr& other) const {
+ return value_ == other.value_;
+ }
+ bool operator==(const T* other) const { return value_ == other; }
+ bool operator!=(const RefCountedPtr& other) const {
+ return value_ != other.value_;
+ }
+ bool operator!=(const T* other) const { return value_ != other; }
+
private:
T* value_ = nullptr;
};
diff --git a/src/objective-c/BoringSSL.podspec b/src/objective-c/BoringSSL.podspec
index c61afc1a8f..6e406b0dc9 100644
--- a/src/objective-c/BoringSSL.podspec
+++ b/src/objective-c/BoringSSL.podspec
@@ -136,12 +136,12 @@ Pod::Spec.new do |s|
# Replace "const BIGNUM *I" in rsa.h with a lowercase i, as the former fails when including
# OpenSSL in a Swift bridging header (complex.h defines "I", and it's as if the compiler
# included it in every bridged header).
- sed -E -i '.back' 's/\\*I,/*i,/g' include/openssl/rsa.h
+ sed -E -i'.back' 's/\\*I,/*i,/g' include/openssl/rsa.h
# Replace `#include "../crypto/internal.h"` in e_tls.c with `#include "../internal.h"`. The
# former assumes crypto/ is in the headers search path, which is hard to enforce when using
# dynamic frameworks. The latters always works, being relative to the current file.
- sed -E -i '.back' 's/crypto\\///g' crypto/cipher/e_tls.c
+ sed -E -i'.back' 's/crypto\\///g' crypto/cipher/e_tls.c
# Add a module map and an umbrella header
cat > include/openssl/umbrella.h <<EOF
@@ -197,11 +197,11 @@ Pod::Spec.new do |s|
# https://github.com/libgit2/libgit2/commit/1ddada422caf8e72ba97dca2568d2bf879fed5f2 and libvpx
# in https://chromium.googlesource.com/webm/libvpx/+/1bec0c5a7e885ec792f6bb658eb3f34ad8f37b15
# work around it by removing the include. We need four of its macros, so we expand them here.
- sed -E -i '.back' '/<inttypes.h>/d' include/openssl/bn.h
- sed -E -i '.back' 's/PRIu32/"u"/g' include/openssl/bn.h
- sed -E -i '.back' 's/PRIx32/"x"/g' include/openssl/bn.h
- sed -E -i '.back' 's/PRIu64/"llu"/g' include/openssl/bn.h
- sed -E -i '.back' 's/PRIx64/"llx"/g' include/openssl/bn.h
+ sed -E -i'.back' '/<inttypes.h>/d' include/openssl/bn.h
+ sed -E -i'.back' 's/PRIu32/"u"/g' include/openssl/bn.h
+ sed -E -i'.back' 's/PRIx32/"x"/g' include/openssl/bn.h
+ sed -E -i'.back' 's/PRIu64/"llu"/g' include/openssl/bn.h
+ sed -E -i'.back' 's/PRIx64/"llx"/g' include/openssl/bn.h
# This is a bit ridiculous, but requiring people to install Go in order to build is slightly
# more ridiculous IMO. To save you from scrolling, this is the last part of the podspec.
diff --git a/src/objective-c/README.md b/src/objective-c/README.md
index e76ee173ea..40aba0317b 100644
--- a/src/objective-c/README.md
+++ b/src/objective-c/README.md
@@ -1,5 +1,12 @@
[![Cocoapods](https://img.shields.io/cocoapods/v/gRPC.svg)](https://cocoapods.org/pods/gRPC)
# gRPC for Objective-C
+gRPC Objective C library provides Objective C API for users to make gRPC calls on iOS or OS X
+platforms. Currently, the minimum supported iOS version is 7.0 and OS X version is 10.9 (Mavericks).
+
+While gRPC doesn't require the use of an IDL to describe the API of services, using one simplifies
+usage and adds some interoperability guarantees. Here we use [Protocol Buffers][], and provide a
+plugin for the Protobuf Compiler (_protoc_) to generate client libraries to communicate with gRPC
+services.
- [Write your API declaration in proto format](#write-protos)
- [Integrate a proto library in your project](#cocoapods)
@@ -10,11 +17,6 @@
- [Install protoc and the gRPC plugin without using Homebrew](#no-homebrew)
- [Integrate the generated gRPC library without using Cocoapods](#no-cocoapods)
-While gRPC doesn't require the use of an IDL to describe the API of services, using one simplifies
-usage and adds some interoperability guarantees. Here we use [Protocol Buffers][], and provide a
-plugin for the Protobuf Compiler (_protoc_) to generate client libraries to communicate with gRPC
-services.
-
<a name="write-protos"></a>
## Write your API declaration in proto format
diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py
index db410d307b..79793a710e 100644
--- a/src/python/grpcio/grpc/__init__.py
+++ b/src/python/grpcio/grpc/__init__.py
@@ -35,152 +35,152 @@ class FutureCancelledError(Exception):
class Future(six.with_metaclass(abc.ABCMeta)):
"""A representation of a computation in another control flow.
- Computations represented by a Future may be yet to be begun, may be ongoing,
- or may have already completed.
- """
+ Computations represented by a Future may be yet to be begun,
+ may be ongoing, or may have already completed.
+ """
@abc.abstractmethod
def cancel(self):
"""Attempts to cancel the computation.
- This method does not block.
+ This method does not block.
- Returns:
- bool:
- Returns True if the computation was canceled.
- Returns False under all other circumstances, for example:
- 1. computation has begun and could not be canceled.
- 2. computation has finished
- 3. computation is scheduled for execution and it is impossible to
- determine its state without blocking.
- """
+ Returns:
+ bool:
+ Returns True if the computation was canceled.
+ Returns False under all other circumstances, for example:
+ 1. computation has begun and could not be canceled.
+ 2. computation has finished
+ 3. computation is scheduled for execution and it is impossible
+ to determine its state without blocking.
+ """
raise NotImplementedError()
@abc.abstractmethod
def cancelled(self):
"""Describes whether the computation was cancelled.
- This method does not block.
+ This method does not block.
- Returns:
- bool:
- Returns True if the computation was cancelled before its result became
- available.
- False under all other circumstances, for example:
- 1. computation was not cancelled.
- 2. computation's result is available.
- """
+ Returns:
+ bool:
+ Returns True if the computation was cancelled before its result became
+ available.
+ False under all other circumstances, for example:
+ 1. computation was not cancelled.
+ 2. computation's result is available.
+ """
raise NotImplementedError()
@abc.abstractmethod
def running(self):
"""Describes whether the computation is taking place.
- This method does not block.
+ This method does not block.
- Returns:
- bool:
- Returns True if the computation is scheduled for execution or currently
- executing.
- Returns False if the computation already executed or was cancelled.
- """
+ Returns:
+ bool:
+ Returns True if the computation is scheduled for execution or
+ currently executing.
+ Returns False if the computation already executed or was cancelled.
+ """
raise NotImplementedError()
@abc.abstractmethod
def done(self):
"""Describes whether the computation has taken place.
- This method does not block.
+ This method does not block.
- Returns:
- bool:
- Returns True if the computation already executed or was cancelled.
- Returns False if the computation is scheduled for execution or currently
- executing.
- This is exactly opposite of the running() method's result.
- """
+ Returns:
+ bool:
+ Returns True if the computation already executed or was cancelled.
+ Returns False if the computation is scheduled for execution or
+ currently executing.
+ This is exactly opposite of the running() method's result.
+ """
raise NotImplementedError()
@abc.abstractmethod
def result(self, timeout=None):
"""Returns the result of the computation or raises its exception.
- This method may return immediately or may block.
+ This method may return immediately or may block.
- Args:
- timeout: The length of time in seconds to wait for the computation to
- finish or be cancelled. If None, the call will block until the
- computations's termination.
+ Args:
+ timeout: The length of time in seconds to wait for the computation to
+ finish or be cancelled. If None, the call will block until the
+ computations's termination.
- Returns:
- The return value of the computation.
+ Returns:
+ The return value of the computation.
- Raises:
- FutureTimeoutError: If a timeout value is passed and the computation does
- not terminate within the allotted time.
- FutureCancelledError: If the computation was cancelled.
- Exception: If the computation raised an exception, this call will raise
- the same exception.
- """
+ Raises:
+ FutureTimeoutError: If a timeout value is passed and the computation
+ does not terminate within the allotted time.
+ FutureCancelledError: If the computation was cancelled.
+ Exception: If the computation raised an exception, this call will
+ raise the same exception.
+ """
raise NotImplementedError()
@abc.abstractmethod
def exception(self, timeout=None):
"""Return the exception raised by the computation.
- This method may return immediately or may block.
+ This method may return immediately or may block.
- Args:
- timeout: The length of time in seconds to wait for the computation to
- terminate or be cancelled. If None, the call will block until the
- computations's termination.
+ Args:
+ timeout: The length of time in seconds to wait for the computation to
+ terminate or be cancelled. If None, the call will block until the
+ computations's termination.
- Returns:
- The exception raised by the computation, or None if the computation did
- not raise an exception.
+ Returns:
+ The exception raised by the computation, or None if the computation
+ did not raise an exception.
- Raises:
- FutureTimeoutError: If a timeout value is passed and the computation does
- not terminate within the allotted time.
- FutureCancelledError: If the computation was cancelled.
- """
+ Raises:
+ FutureTimeoutError: If a timeout value is passed and the computation
+ does not terminate within the allotted time.
+ FutureCancelledError: If the computation was cancelled.
+ """
raise NotImplementedError()
@abc.abstractmethod
def traceback(self, timeout=None):
"""Access the traceback of the exception raised by the computation.
- This method may return immediately or may block.
+ This method may return immediately or may block.
- Args:
- timeout: The length of time in seconds to wait for the computation to
- terminate or be cancelled. If None, the call will block until the
- computations's termination.
+ Args:
+ timeout: The length of time in seconds to wait for the computation
+ to terminate or be cancelled. If None, the call will block until
+ the computation's termination.
- Returns:
- The traceback of the exception raised by the computation, or None if the
- computation did not raise an exception.
+ Returns:
+ The traceback of the exception raised by the computation, or None
+ if the computation did not raise an exception.
- Raises:
- FutureTimeoutError: If a timeout value is passed and the computation does
- not terminate within the allotted time.
- FutureCancelledError: If the computation was cancelled.
- """
+ Raises:
+ FutureTimeoutError: If a timeout value is passed and the computation
+ does not terminate within the allotted time.
+ FutureCancelledError: If the computation was cancelled.
+ """
raise NotImplementedError()
@abc.abstractmethod
def add_done_callback(self, fn):
"""Adds a function to be called at completion of the computation.
- The callback will be passed this Future object describing the outcome of
- the computation.
+ The callback will be passed this Future object describing the outcome
+ of the computation.
- If the computation has already completed, the callback will be called
- immediately.
+ If the computation has already completed, the callback will be called
+ immediately.
- Args:
- fn: A callable taking this Future object as its single parameter.
- """
+ Args:
+ fn: A callable taking this Future object as its single parameter.
+ """
raise NotImplementedError()
@@ -191,14 +191,14 @@ class Future(six.with_metaclass(abc.ABCMeta)):
class ChannelConnectivity(enum.Enum):
"""Mirrors grpc_connectivity_state in the gRPC Core.
- Attributes:
- IDLE: The channel is idle.
- CONNECTING: The channel is connecting.
- READY: The channel is ready to conduct RPCs.
- TRANSIENT_FAILURE: The channel has seen a failure from which it expects to
- recover.
- SHUTDOWN: The channel has seen a failure from which it cannot recover.
- """
+ Attributes:
+ IDLE: The channel is idle.
+ CONNECTING: The channel is connecting.
+ READY: The channel is ready to conduct RPCs.
+ TRANSIENT_FAILURE: The channel has seen a failure from which it expects
+ to recover.
+ SHUTDOWN: The channel has seen a failure from which it cannot recover.
+ """
IDLE = (_cygrpc.ConnectivityState.idle, 'idle')
CONNECTING = (_cygrpc.ConnectivityState.connecting, 'connecting')
READY = (_cygrpc.ConnectivityState.ready, 'ready')
@@ -250,44 +250,44 @@ class RpcContext(six.with_metaclass(abc.ABCMeta)):
def is_active(self):
"""Describes whether the RPC is active or has terminated.
- Returns:
- bool:
- True if RPC is active, False otherwise.
- """
+ Returns:
+ bool:
+ True if RPC is active, False otherwise.
+ """
raise NotImplementedError()
@abc.abstractmethod
def time_remaining(self):
"""Describes the length of allowed time remaining for the RPC.
- Returns:
- A nonnegative float indicating the length of allowed time in seconds
- remaining for the RPC to complete before it is considered to have timed
- out, or None if no deadline was specified for the RPC.
- """
+ Returns:
+ A nonnegative float indicating the length of allowed time in seconds
+ remaining for the RPC to complete before it is considered to have
+ timed out, or None if no deadline was specified for the RPC.
+ """
raise NotImplementedError()
@abc.abstractmethod
def cancel(self):
"""Cancels the RPC.
- Idempotent and has no effect if the RPC has already terminated.
- """
+ Idempotent and has no effect if the RPC has already terminated.
+ """
raise NotImplementedError()
@abc.abstractmethod
def add_callback(self, callback):
"""Registers a callback to be called on RPC termination.
- Args:
- callback: A no-parameter callable to be called on RPC termination.
+ Args:
+ callback: A no-parameter callable to be called on RPC termination.
- Returns:
- bool:
- True if the callback was added and will be called later; False if the
- callback was not added and will not be called (because the RPC
- already terminated or some other reason).
- """
+ Returns:
+ bool:
+ True if the callback was added and will be called later; False if
+ the callback was not added and will not be called (because the RPC
+ already terminated or some other reason).
+ """
raise NotImplementedError()
@@ -301,44 +301,44 @@ class Call(six.with_metaclass(abc.ABCMeta, RpcContext)):
def initial_metadata(self):
"""Accesses the initial metadata sent by the server.
- This method blocks until the value is available.
+ This method blocks until the value is available.
- Returns:
- The initial :term:`metadata`.
- """
+ Returns:
+ The initial :term:`metadata`.
+ """
raise NotImplementedError()
@abc.abstractmethod
def trailing_metadata(self):
"""Accesses the trailing metadata sent by the server.
- This method blocks until the value is available.
+ This method blocks until the value is available.
- Returns:
- The trailing :term:`metadata`.
- """
+ Returns:
+ The trailing :term:`metadata`.
+ """
raise NotImplementedError()
@abc.abstractmethod
def code(self):
"""Accesses the status code sent by the server.
- This method blocks until the value is available.
+ This method blocks until the value is available.
- Returns:
- The StatusCode value for the RPC.
- """
+ Returns:
+ The StatusCode value for the RPC.
+ """
raise NotImplementedError()
@abc.abstractmethod
def details(self):
"""Accesses the details sent by the server.
- This method blocks until the value is available.
+ This method blocks until the value is available.
- Returns:
- The details string of the RPC.
- """
+ Returns:
+ The details string of the RPC.
+ """
raise NotImplementedError()
@@ -578,9 +578,9 @@ class AuthMetadataPlugin(six.with_metaclass(abc.ABCMeta)):
class ServerCredentials(object):
"""An encapsulation of the data required to open a secure port on a Server.
- This class has no supported interface - it exists to define the type of its
- instances and its instances exist to be passed to other functions.
- """
+ This class has no supported interface - it exists to define the type of its
+ instances and its instances exist to be passed to other functions.
+ """
def __init__(self, credentials):
self._credentials = credentials
@@ -611,61 +611,65 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
def __call__(self, request, timeout=None, metadata=None, credentials=None):
"""Synchronously invokes the underlying RPC.
- Args:
- request: The request value for the RPC.
- timeout: An optional duration of time in seconds to allow for the RPC.
- metadata: Optional :term:`metadata` to be transmitted to the
- service-side of the RPC.
- credentials: An optional CallCredentials for the RPC.
+ Args:
+ request: The request value for the RPC.
+ timeout: An optional duration of time in seconds to allow
+ for the RPC.
+ metadata: Optional :term:`metadata` to be transmitted to the
+ service-side of the RPC.
+ credentials: An optional CallCredentials for the RPC.
- Returns:
- The response value for the RPC.
+ Returns:
+ The response value for the RPC.
- Raises:
- RpcError: Indicating that the RPC terminated with non-OK status. The
- raised RpcError will also be a Call for the RPC affording the RPC's
- metadata, status code, and details.
- """
+ Raises:
+ RpcError: Indicating that the RPC terminated with non-OK status. The
+ raised RpcError will also be a Call for the RPC affording the RPC's
+ metadata, status code, and details.
+ """
raise NotImplementedError()
@abc.abstractmethod
def with_call(self, request, timeout=None, metadata=None, credentials=None):
"""Synchronously invokes the underlying RPC.
- Args:
- request: The request value for the RPC.
- timeout: An optional durating of time in seconds to allow for the RPC.
- metadata: Optional :term:`metadata` to be transmitted to the
- service-side of the RPC.
- credentials: An optional CallCredentials for the RPC.
+ Args:
+ request: The request value for the RPC.
+ timeout: An optional durating of time in seconds to allow for
+ the RPC.
+ metadata: Optional :term:`metadata` to be transmitted to the
+ service-side of the RPC.
+ credentials: An optional CallCredentials for the RPC.
- Returns:
- The response value for the RPC and a Call value for the RPC.
+ Returns:
+ The response value for the RPC and a Call value for the RPC.
- Raises:
- RpcError: Indicating that the RPC terminated with non-OK status. The
- raised RpcError will also be a Call for the RPC affording the RPC's
- metadata, status code, and details.
- """
+ Raises:
+ RpcError: Indicating that the RPC terminated with non-OK status. The
+ raised RpcError will also be a Call for the RPC affording the RPC's
+ metadata, status code, and details.
+ """
raise NotImplementedError()
@abc.abstractmethod
def future(self, request, timeout=None, metadata=None, credentials=None):
"""Asynchronously invokes the underlying RPC.
- Args:
- request: The request value for the RPC.
- timeout: An optional duration of time in seconds to allow for the RPC.
- metadata: Optional :term:`metadata` to be transmitted to the
- service-side of the RPC.
- credentials: An optional CallCredentials for the RPC.
+ Args:
+ request: The request value for the RPC.
+ timeout: An optional duration of time in seconds to allow for
+ the RPC.
+ metadata: Optional :term:`metadata` to be transmitted to the
+ service-side of the RPC.
+ credentials: An optional CallCredentials for the RPC.
- Returns:
- An object that is both a Call for the RPC and a Future. In the event of
- RPC completion, the return Call-Future's result value will be the
- response message of the RPC. Should the event terminate with non-OK
- status, the returned Call-Future's exception value will be an RpcError.
- """
+ Returns:
+ An object that is both a Call for the RPC and a Future.
+ In the event of RPC completion, the return Call-Future's result
+ value will be the response message of the RPC.
+ Should the event terminate with non-OK status,
+ the returned Call-Future's exception value will be an RpcError.
+ """
raise NotImplementedError()
@@ -676,19 +680,20 @@ class UnaryStreamMultiCallable(six.with_metaclass(abc.ABCMeta)):
def __call__(self, request, timeout=None, metadata=None, credentials=None):
"""Invokes the underlying RPC.
- Args:
- request: The request value for the RPC.
- timeout: An optional duration of time in seconds to allow for the RPC.
- If None, the timeout is considered infinite.
- metadata: An optional :term:`metadata` to be transmitted to the
- service-side of the RPC.
- credentials: An optional CallCredentials for the RPC.
+ Args:
+ request: The request value for the RPC.
+ timeout: An optional duration of time in seconds to allow for
+ the RPC. If None, the timeout is considered infinite.
+ metadata: An optional :term:`metadata` to be transmitted to the
+ service-side of the RPC.
+ credentials: An optional CallCredentials for the RPC.
- Returns:
- An object that is both a Call for the RPC and an iterator of response
- values. Drawing response values from the returned Call-iterator may
- raise RpcError indicating termination of the RPC with non-OK status.
- """
+ Returns:
+ An object that is both a Call for the RPC and an iterator of
+ response values. Drawing response values from the returned
+ Call-iterator may raise RpcError indicating termination of the
+ RPC with non-OK status.
+ """
raise NotImplementedError()
@@ -703,22 +708,23 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
credentials=None):
"""Synchronously invokes the underlying RPC.
- Args:
- request_iterator: An iterator that yields request values for the RPC.
- timeout: An optional duration of time in seconds to allow for the RPC.
- If None, the timeout is considered infinite.
- metadata: Optional :term:`metadata` to be transmitted to the
- service-side of the RPC.
- credentials: An optional CallCredentials for the RPC.
+ Args:
+ request_iterator: An iterator that yields request values for
+ the RPC.
+ timeout: An optional duration of time in seconds to allow for
+ the RPC. If None, the timeout is considered infinite.
+ metadata: Optional :term:`metadata` to be transmitted to the
+ service-side of the RPC.
+ credentials: An optional CallCredentials for the RPC.
- Returns:
- The response value for the RPC.
+ Returns:
+ The response value for the RPC.
- Raises:
- RpcError: Indicating that the RPC terminated with non-OK status. The
- raised RpcError will also implement grpc.Call, affording methods
- such as metadata, code, and details.
- """
+ Raises:
+ RpcError: Indicating that the RPC terminated with non-OK status. The
+ raised RpcError will also implement grpc.Call, affording methods
+ such as metadata, code, and details.
+ """
raise NotImplementedError()
@abc.abstractmethod
@@ -729,22 +735,23 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
credentials=None):
"""Synchronously invokes the underlying RPC on the client.
- Args:
- request_iterator: An iterator that yields request values for the RPC.
- timeout: An optional duration of time in seconds to allow for the RPC.
- If None, the timeout is considered infinite.
- metadata: Optional :term:`metadata` to be transmitted to the
- service-side of the RPC.
- credentials: An optional CallCredentials for the RPC.
+ Args:
+ request_iterator: An iterator that yields request values for
+ the RPC.
+ timeout: An optional duration of time in seconds to allow for
+ the RPC. If None, the timeout is considered infinite.
+ metadata: Optional :term:`metadata` to be transmitted to the
+ service-side of the RPC.
+ credentials: An optional CallCredentials for the RPC.
- Returns:
- The response value for the RPC and a Call object for the RPC.
+ Returns:
+ The response value for the RPC and a Call object for the RPC.
- Raises:
- RpcError: Indicating that the RPC terminated with non-OK status. The
- raised RpcError will also be a Call for the RPC affording the RPC's
- metadata, status code, and details.
- """
+ Raises:
+ RpcError: Indicating that the RPC terminated with non-OK status. The
+ raised RpcError will also be a Call for the RPC affording the RPC's
+ metadata, status code, and details.
+ """
raise NotImplementedError()
@abc.abstractmethod
@@ -755,20 +762,21 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
credentials=None):
"""Asynchronously invokes the underlying RPC on the client.
- Args:
- request_iterator: An iterator that yields request values for the RPC.
- timeout: An optional duration of time in seconds to allow for the RPC.
- If None, the timeout is considered infinite.
- metadata: Optional :term:`metadata` to be transmitted to the
- service-side of the RPC.
- credentials: An optional CallCredentials for the RPC.
+ Args:
+ request_iterator: An iterator that yields request values for the RPC.
+ timeout: An optional duration of time in seconds to allow for
+ the RPC. If None, the timeout is considered infinite.
+ metadata: Optional :term:`metadata` to be transmitted to the
+ service-side of the RPC.
+ credentials: An optional CallCredentials for the RPC.
- Returns:
- An object that is both a Call for the RPC and a Future. In the event of
- RPC completion, the return Call-Future's result value will be the
- response message of the RPC. Should the event terminate with non-OK
- status, the returned Call-Future's exception value will be an RpcError.
- """
+ Returns:
+ An object that is both a Call for the RPC and a Future.
+ In the event of RPC completion, the return Call-Future's result value
+ will be the response message of the RPC. Should the event terminate
+ with non-OK status, the returned Call-Future's exception value will
+ be an RpcError.
+ """
raise NotImplementedError()
@@ -783,19 +791,20 @@ class StreamStreamMultiCallable(six.with_metaclass(abc.ABCMeta)):
credentials=None):
"""Invokes the underlying RPC on the client.
- Args:
- request_iterator: An iterator that yields request values for the RPC.
- timeout: An optional duration of time in seconds to allow for the RPC.
- if not specified the timeout is considered infinite.
- metadata: Optional :term:`metadata` to be transmitted to the
- service-side of the RPC.
- credentials: An optional CallCredentials for the RPC.
+ Args:
+ request_iterator: An iterator that yields request values for the RPC.
+ timeout: An optional duration of time in seconds to allow for
+ the RPC. If not specified, the timeout is considered infinite.
+ metadata: Optional :term:`metadata` to be transmitted to the
+ service-side of the RPC.
+ credentials: An optional CallCredentials for the RPC.
- Returns:
- An object that is both a Call for the RPC and an iterator of response
- values. Drawing response values from the returned Call-iterator may
- raise RpcError indicating termination of the RPC with non-OK status.
- """
+ Returns:
+ An object that is both a Call for the RPC and an iterator of
+ response values. Drawing response values from the returned
+ Call-iterator may raise RpcError indicating termination of the
+ RPC with non-OK status.
+ """
raise NotImplementedError()
@@ -809,31 +818,31 @@ class Channel(six.with_metaclass(abc.ABCMeta)):
def subscribe(self, callback, try_to_connect=False):
"""Subscribe to this Channel's connectivity state machine.
- A Channel may be in any of the states described by ChannelConnectivity.
- This method allows application to monitor the state transitions.
- The typical use case is to debug or gain better visibility into gRPC
- runtime's state.
+ A Channel may be in any of the states described by ChannelConnectivity.
+ This method allows application to monitor the state transitions.
+ The typical use case is to debug or gain better visibility into gRPC
+ runtime's state.
- Args:
- callback: A callable to be invoked with ChannelConnectivity argument.
- ChannelConnectivity describes current state of the channel.
- The callable will be invoked immediately upon subscription and again for
- every change to ChannelConnectivity until it is unsubscribed or this
- Channel object goes out of scope.
- try_to_connect: A boolean indicating whether or not this Channel should
- attempt to connect immediately. If set to False, gRPC runtime decides
- when to connect.
- """
+ Args:
+ callback: A callable to be invoked with ChannelConnectivity argument.
+ ChannelConnectivity describes current state of the channel.
+ The callable will be invoked immediately upon subscription
+ and again for every change to ChannelConnectivity until it
+ is unsubscribed or this Channel object goes out of scope.
+ try_to_connect: A boolean indicating whether or not this Channel
+ should attempt to connect immediately. If set to False, gRPC
+ runtime decides when to connect.
+ """
raise NotImplementedError()
@abc.abstractmethod
def unsubscribe(self, callback):
"""Unsubscribes a subscribed callback from this Channel's connectivity.
- Args:
- callback: A callable previously registered with this Channel from having
- been passed to its "subscribe" method.
- """
+ Args:
+ callback: A callable previously registered with this Channel from
+ having been passed to its "subscribe" method.
+ """
raise NotImplementedError()
@abc.abstractmethod
@@ -843,16 +852,17 @@ class Channel(six.with_metaclass(abc.ABCMeta)):
response_deserializer=None):
"""Creates a UnaryUnaryMultiCallable for a unary-unary method.
- Args:
- method: The name of the RPC method.
- request_serializer: Optional behaviour for serializing the request
- message. Request goes unserialized in case None is passed.
- response_deserializer: Optional behaviour for deserializing the response
- message. Response goes undeserialized in case None is passed.
+ Args:
+ method: The name of the RPC method.
+ request_serializer: Optional behaviour for serializing the request
+ message. Request goes unserialized in case None is passed.
+ response_deserializer: Optional behaviour for deserializing the
+ response message. Response goes undeserialized in case None
+ is passed.
- Returns:
- A UnaryUnaryMultiCallable value for the named unary-unary method.
- """
+ Returns:
+ A UnaryUnaryMultiCallable value for the named unary-unary method.
+ """
raise NotImplementedError()
@abc.abstractmethod
@@ -862,16 +872,17 @@ class Channel(six.with_metaclass(abc.ABCMeta)):
response_deserializer=None):
"""Creates a UnaryStreamMultiCallable for a unary-stream method.
- Args:
- method: The name of the RPC method.
- request_serializer: Optional behaviour for serializing the request
- message. Request goes unserialized in case None is passed.
- response_deserializer: Optional behaviour for deserializing the response
- message. Response goes undeserialized in case None is passed.
+ Args:
+ method: The name of the RPC method.
+ request_serializer: Optional behaviour for serializing the request
+ message. Request goes unserialized in case None is passed.
+ response_deserializer: Optional behaviour for deserializing the
+ response message. Response goes undeserialized in case None is
+ passed.
- Returns:
- A UnaryStreamMultiCallable value for the name unary-stream method.
- """
+ Returns:
+ A UnaryStreamMultiCallable value for the name unary-stream method.
+ """
raise NotImplementedError()
@abc.abstractmethod
@@ -881,16 +892,17 @@ class Channel(six.with_metaclass(abc.ABCMeta)):
response_deserializer=None):
"""Creates a StreamUnaryMultiCallable for a stream-unary method.
- Args:
- method: The name of the RPC method.
- request_serializer: Optional behaviour for serializing the request
- message. Request goes unserialized in case None is passed.
- response_deserializer: Optional behaviour for deserializing the response
- message. Response goes undeserialized in case None is passed.
+ Args:
+ method: The name of the RPC method.
+ request_serializer: Optional behaviour for serializing the request
+ message. Request goes unserialized in case None is passed.
+ response_deserializer: Optional behaviour for deserializing the
+ response message. Response goes undeserialized in case None is
+ passed.
- Returns:
- A StreamUnaryMultiCallable value for the named stream-unary method.
- """
+ Returns:
+ A StreamUnaryMultiCallable value for the named stream-unary method.
+ """
raise NotImplementedError()
@abc.abstractmethod
@@ -900,16 +912,17 @@ class Channel(six.with_metaclass(abc.ABCMeta)):
response_deserializer=None):
"""Creates a StreamStreamMultiCallable for a stream-stream method.
- Args:
- method: The name of the RPC method.
- request_serializer: Optional behaviour for serializing the request
- message. Request goes unserialized in case None is passed.
- response_deserializer: Optional behaviour for deserializing the response
- message. Response goes undeserialized in case None is passed.
+ Args:
+ method: The name of the RPC method.
+ request_serializer: Optional behaviour for serializing the request
+ message. Request goes unserialized in case None is passed.
+ response_deserializer: Optional behaviour for deserializing the
+ response message. Response goes undeserialized in case None
+ is passed.
- Returns:
- A StreamStreamMultiCallable value for the named stream-stream method.
- """
+ Returns:
+ A StreamStreamMultiCallable value for the named stream-stream method.
+ """
raise NotImplementedError()
@@ -923,79 +936,79 @@ class ServicerContext(six.with_metaclass(abc.ABCMeta, RpcContext)):
def invocation_metadata(self):
"""Accesses the metadata from the sent by the client.
- Returns:
- The invocation :term:`metadata`.
- """
+ Returns:
+ The invocation :term:`metadata`.
+ """
raise NotImplementedError()
@abc.abstractmethod
def peer(self):
"""Identifies the peer that invoked the RPC being serviced.
- Returns:
- A string identifying the peer that invoked the RPC being serviced.
- The string format is determined by gRPC runtime.
- """
+ Returns:
+ A string identifying the peer that invoked the RPC being serviced.
+ The string format is determined by gRPC runtime.
+ """
raise NotImplementedError()
@abc.abstractmethod
def peer_identities(self):
"""Gets one or more peer identity(s).
- Equivalent to
- servicer_context.auth_context().get(
- servicer_context.peer_identity_key())
+ Equivalent to
+ servicer_context.auth_context().get(
+ servicer_context.peer_identity_key())
- Returns:
- An iterable of the identities, or None if the call is not authenticated.
- Each identity is returned as a raw bytes type.
- """
+ Returns:
+ An iterable of the identities, or None if the call is not
+ authenticated. Each identity is returned as a raw bytes type.
+ """
raise NotImplementedError()
@abc.abstractmethod
def peer_identity_key(self):
"""The auth property used to identify the peer.
- For example, "x509_common_name" or "x509_subject_alternative_name" are
- used to identify an SSL peer.
+ For example, "x509_common_name" or "x509_subject_alternative_name" are
+ used to identify an SSL peer.
- Returns:
- The auth property (string) that indicates the
- peer identity, or None if the call is not authenticated.
- """
+ Returns:
+ The auth property (string) that indicates the
+ peer identity, or None if the call is not authenticated.
+ """
raise NotImplementedError()
@abc.abstractmethod
def auth_context(self):
"""Gets the auth context for the call.
- Returns:
- A map of strings to an iterable of bytes for each auth property.
- """
+ Returns:
+ A map of strings to an iterable of bytes for each auth property.
+ """
raise NotImplementedError()
@abc.abstractmethod
def send_initial_metadata(self, initial_metadata):
"""Sends the initial metadata value to the client.
- This method need not be called by implementations if they have no
- metadata to add to what the gRPC runtime will transmit.
+ This method need not be called by implementations if they have no
+ metadata to add to what the gRPC runtime will transmit.
- Args:
- initial_metadata: The initial :term:`metadata`.
- """
+ Args:
+ initial_metadata: The initial :term:`metadata`.
+ """
raise NotImplementedError()
@abc.abstractmethod
def set_trailing_metadata(self, trailing_metadata):
"""Sends the trailing metadata for the RPC.
- This method need not be called by implementations if they have no
- metadata to add to what the gRPC runtime will transmit.
+ This method need not be called by implementations if they have no
+ metadata to add to what the gRPC runtime will transmit.
- Args:
- trailing_metadata: The trailing :term:`metadata`.
- """
+ Args:
+ trailing_metadata: The trailing :term:`metadata`.
+ """
raise NotImplementedError()
@abc.abstractmethod
@@ -1049,44 +1062,45 @@ class ServicerContext(six.with_metaclass(abc.ABCMeta, RpcContext)):
class RpcMethodHandler(six.with_metaclass(abc.ABCMeta)):
"""An implementation of a single RPC method.
- Attributes:
- request_streaming: Whether the RPC supports exactly one request message or
- any arbitrary number of request messages.
- response_streaming: Whether the RPC supports exactly one response message or
- any arbitrary number of response messages.
- request_deserializer: A callable behavior that accepts a byte string and
- returns an object suitable to be passed to this object's business logic,
- or None to indicate that this object's business logic should be passed the
- raw request bytes.
- response_serializer: A callable behavior that accepts an object produced by
- this object's business logic and returns a byte string, or None to
- indicate that the byte strings produced by this object's business logic
- should be transmitted on the wire as they are.
- unary_unary: This object's application-specific business logic as a callable
- value that takes a request value and a ServicerContext object and returns
- a response value. Only non-None if both request_streaming and
- response_streaming are False.
- unary_stream: This object's application-specific business logic as a
- callable value that takes a request value and a ServicerContext object and
- returns an iterator of response values. Only non-None if request_streaming
- is False and response_streaming is True.
- stream_unary: This object's application-specific business logic as a
- callable value that takes an iterator of request values and a
- ServicerContext object and returns a response value. Only non-None if
- request_streaming is True and response_streaming is False.
- stream_stream: This object's application-specific business logic as a
- callable value that takes an iterator of request values and a
- ServicerContext object and returns an iterator of response values. Only
- non-None if request_streaming and response_streaming are both True.
- """
+ Attributes:
+ request_streaming: Whether the RPC supports exactly one request message
+ or any arbitrary number of request messages.
+ response_streaming: Whether the RPC supports exactly one response message
+ or any arbitrary number of response messages.
+ request_deserializer: A callable behavior that accepts a byte string and
+ returns an object suitable to be passed to this object's business
+ logic, or None to indicate that this object's business logic should be
+ passed the raw request bytes.
+ response_serializer: A callable behavior that accepts an object produced
+ by this object's business logic and returns a byte string, or None to
+ indicate that the byte strings produced by this object's business logic
+ should be transmitted on the wire as they are.
+ unary_unary: This object's application-specific business logic as a
+ callable value that takes a request value and a ServicerContext object
+ and returns a response value. Only non-None if both request_streaming
+ and response_streaming are False.
+ unary_stream: This object's application-specific business logic as a
+ callable value that takes a request value and a ServicerContext object
+ and returns an iterator of response values. Only non-None if
+ request_streaming is False and response_streaming is True.
+ stream_unary: This object's application-specific business logic as a
+ callable value that takes an iterator of request values and a
+ ServicerContext object and returns a response value. Only non-None if
+ request_streaming is True and response_streaming is False.
+ stream_stream: This object's application-specific business logic as a
+ callable value that takes an iterator of request values and a
+ ServicerContext object and returns an iterator of response values.
+ Only non-None if request_streaming and response_streaming are both
+ True.
+ """
class HandlerCallDetails(six.with_metaclass(abc.ABCMeta)):
"""Describes an RPC that has just arrived for service.
- Attributes:
- method: The method name of the RPC.
- invocation_metadata: The :term:`metadata` sent by the client.
- """
+ Attributes:
+ method: The method name of the RPC.
+ invocation_metadata: The :term:`metadata` sent by the client.
+ """
class GenericRpcHandler(six.with_metaclass(abc.ABCMeta)):
@@ -1096,33 +1110,33 @@ class GenericRpcHandler(six.with_metaclass(abc.ABCMeta)):
def service(self, handler_call_details):
"""Returns the handler for servicing the RPC.
- Args:
- handler_call_details: A HandlerCallDetails describing the RPC.
+ Args:
+ handler_call_details: A HandlerCallDetails describing the RPC.
- Returns:
- An RpcMethodHandler with which the RPC may be serviced if the
- implementation chooses to service this RPC, or None otherwise.
- """
+ Returns:
+ An RpcMethodHandler with which the RPC may be serviced if the
+ implementation chooses to service this RPC, or None otherwise.
+ """
raise NotImplementedError()
class ServiceRpcHandler(six.with_metaclass(abc.ABCMeta, GenericRpcHandler)):
"""An implementation of RPC methods belonging to a service.
- A service handles RPC methods with structured names of the form
- '/Service.Name/Service.Method', where 'Service.Name' is the value
- returned by service_name(), and 'Service.Method' is the method
- name. A service can have multiple method names, but only a single
- service name.
- """
+ A service handles RPC methods with structured names of the form
+ '/Service.Name/Service.Method', where 'Service.Name' is the value
+ returned by service_name(), and 'Service.Method' is the method
+ name. A service can have multiple method names, but only a single
+ service name.
+ """
@abc.abstractmethod
def service_name(self):
"""Returns this service's name.
- Returns:
- The service name.
- """
+ Returns:
+ The service name.
+ """
raise NotImplementedError()
@@ -1164,83 +1178,84 @@ class Server(six.with_metaclass(abc.ABCMeta)):
def add_generic_rpc_handlers(self, generic_rpc_handlers):
"""Registers GenericRpcHandlers with this Server.
- This method is only safe to call before the server is started.
+ This method is only safe to call before the server is started.
- Args:
- generic_rpc_handlers: An iterable of GenericRpcHandlers that will be used
- to service RPCs.
- """
+ Args:
+ generic_rpc_handlers: An iterable of GenericRpcHandlers that will be
+ used to service RPCs.
+ """
raise NotImplementedError()
@abc.abstractmethod
def add_insecure_port(self, address):
"""Opens an insecure port for accepting RPCs.
- This method may only be called before starting the server.
+ This method may only be called before starting the server.
- Args:
- address: The address for which to open a port.
- if the port is 0, or not specified in the address, then gRPC runtime
- will choose a port.
+ Args:
+ address: The address for which to open a port.
+ if the port is 0, or not specified in the address, then gRPC runtime
+ will choose a port.
- Returns:
- integer:
- An integer port on which server will accept RPC requests.
- """
+ Returns:
+ integer:
+ An integer port on which server will accept RPC requests.
+ """
raise NotImplementedError()
@abc.abstractmethod
def add_secure_port(self, address, server_credentials):
"""Opens a secure port for accepting RPCs.
- This method may only be called before starting the server.
+ This method may only be called before starting the server.
- Args:
- address: The address for which to open a port.
- if the port is 0, or not specified in the address, then gRPC runtime
- will choose a port.
- server_credentials: A ServerCredentials object.
+ Args:
+ address: The address for which to open a port.
+ if the port is 0, or not specified in the address, then gRPC
+ runtime will choose a port.
+ server_credentials: A ServerCredentials object.
- Returns:
- integer:
- An integer port on which server will accept RPC requests.
- """
+ Returns:
+ integer:
+ An integer port on which server will accept RPC requests.
+ """
raise NotImplementedError()
@abc.abstractmethod
def start(self):
"""Starts this Server.
- This method may only be called once. (i.e. it is not idempotent).
- """
+ This method may only be called once. (i.e. it is not idempotent).
+ """
raise NotImplementedError()
@abc.abstractmethod
def stop(self, grace):
"""Stops this Server.
- This method immediately stop service of new RPCs in all cases.
- If a grace period is specified, this method returns immediately
- and all RPCs active at the end of the grace period are aborted.
+ This method immediately stop service of new RPCs in all cases.
+ If a grace period is specified, this method returns immediately
+ and all RPCs active at the end of the grace period are aborted.
- If a grace period is not specified, then all existing RPCs are
- teriminated immediately and the this method blocks until the last
- RPC handler terminates.
+ If a grace period is not specified, then all existing RPCs are
+ teriminated immediately and the this method blocks until the last
+ RPC handler terminates.
- This method is idempotent and may be called at any time. Passing a smaller
- grace value in subsequentcall will have the effect of stopping the Server
- sooner. Passing a larger grace value in subsequent call *will not* have the
- effect of stopping the server later (i.e. the most restrictive grace
- value is used).
+ This method is idempotent and may be called at any time.
+ Passing a smaller grace value in subsequent call will have
+ the effect of stopping the Server sooner. Passing a larger
+ grace value in subsequent call *will not* have the effect of
+ stopping the server later (i.e. the most restrictive grace
+ value is used).
- Args:
- grace: A duration of time in seconds or None.
+ Args:
+ grace: A duration of time in seconds or None.
- Returns:
- A threading.Event that will be set when this Server has completely
- stopped, i.e. when running RPCs either complete or are aborted and
- all handlers have terminated.
- """
+ Returns:
+ A threading.Event that will be set when this Server has completely
+ stopped, i.e. when running RPCs either complete or are aborted and
+ all handlers have terminated.
+ """
raise NotImplementedError()
@@ -1252,15 +1267,15 @@ def unary_unary_rpc_method_handler(behavior,
response_serializer=None):
"""Creates an RpcMethodHandler for a unary-unary RPC method.
- Args:
- behavior: The implementation of an RPC that accepts one request and returns
- one response.
- request_deserializer: An optional behavior for request deserialization.
- response_serializer: An optional behavior for response serialization.
+ Args:
+ behavior: The implementation of an RPC that accepts one request
+ and returns one response.
+ request_deserializer: An optional behavior for request deserialization.
+ response_serializer: An optional behavior for response serialization.
- Returns:
- An RpcMethodHandler object that is typically used by grpc.Server.
- """
+ Returns:
+ An RpcMethodHandler object that is typically used by grpc.Server.
+ """
from grpc import _utilities # pylint: disable=cyclic-import
return _utilities.RpcMethodHandler(False, False, request_deserializer,
response_serializer, behavior, None,
@@ -1272,15 +1287,15 @@ def unary_stream_rpc_method_handler(behavior,
response_serializer=None):
"""Creates an RpcMethodHandler for a unary-stream RPC method.
- Args:
- behavior: The implementation of an RPC that accepts one request and returns
- an iterator of response values.
- request_deserializer: An optional behavior for request deserialization.
- response_serializer: An optional behavior for response serialization.
+ Args:
+ behavior: The implementation of an RPC that accepts one request
+ and returns an iterator of response values.
+ request_deserializer: An optional behavior for request deserialization.
+ response_serializer: An optional behavior for response serialization.
- Returns:
- An RpcMethodHandler object that is typically used by grpc.Server.
- """
+ Returns:
+ An RpcMethodHandler object that is typically used by grpc.Server.
+ """
from grpc import _utilities # pylint: disable=cyclic-import
return _utilities.RpcMethodHandler(False, True, request_deserializer,
response_serializer, None, behavior,
@@ -1292,15 +1307,15 @@ def stream_unary_rpc_method_handler(behavior,
response_serializer=None):
"""Creates an RpcMethodHandler for a stream-unary RPC method.
- Args:
- behavior: The implementation of an RPC that accepts an iterator of request
- values and returns a single response value.
- request_deserializer: An optional behavior for request deserialization.
- response_serializer: An optional behavior for response serialization.
+ Args:
+ behavior: The implementation of an RPC that accepts an iterator of
+ request values and returns a single response value.
+ request_deserializer: An optional behavior for request deserialization.
+ response_serializer: An optional behavior for response serialization.
- Returns:
- An RpcMethodHandler object that is typically used by grpc.Server.
- """
+ Returns:
+ An RpcMethodHandler object that is typically used by grpc.Server.
+ """
from grpc import _utilities # pylint: disable=cyclic-import
return _utilities.RpcMethodHandler(True, False, request_deserializer,
response_serializer, None, None,
@@ -1312,15 +1327,15 @@ def stream_stream_rpc_method_handler(behavior,
response_serializer=None):
"""Creates an RpcMethodHandler for a stream-stream RPC method.
- Args:
- behavior: The implementation of an RPC that accepts an iterator of request
- values and returns an iterator of response values.
- request_deserializer: An optional behavior for request deserialization.
- response_serializer: An optional behavior for response serialization.
+ Args:
+ behavior: The implementation of an RPC that accepts an iterator of
+ request values and returns an iterator of response values.
+ request_deserializer: An optional behavior for request deserialization.
+ response_serializer: An optional behavior for response serialization.
- Returns:
- An RpcMethodHandler object that is typically used by grpc.Server.
- """
+ Returns:
+ An RpcMethodHandler object that is typically used by grpc.Server.
+ """
from grpc import _utilities # pylint: disable=cyclic-import
return _utilities.RpcMethodHandler(True, True, request_deserializer,
response_serializer, None, None, None,
@@ -1330,15 +1345,16 @@ def stream_stream_rpc_method_handler(behavior,
def method_handlers_generic_handler(service, method_handlers):
"""Creates a GenericRpcHandler from RpcMethodHandlers.
- Args:
- service: The name of the service that is implemented by the method_handlers.
- method_handlers: A dictionary that maps method names to corresponding
- RpcMethodHandler.
+ Args:
+ service: The name of the service that is implemented by the
+ method_handlers.
+ method_handlers: A dictionary that maps method names to corresponding
+ RpcMethodHandler.
- Returns:
- A GenericRpcHandler. This is typically added to the grpc.Server object
- with add_generic_rpc_handlers() before starting the server.
- """
+ Returns:
+ A GenericRpcHandler. This is typically added to the grpc.Server object
+ with add_generic_rpc_handlers() before starting the server.
+ """
from grpc import _utilities # pylint: disable=cyclic-import
return _utilities.DictionaryGenericHandler(service, method_handlers)
@@ -1435,20 +1451,20 @@ def ssl_server_credentials(private_key_certificate_chain_pairs,
require_client_auth=False):
"""Creates a ServerCredentials for use with an SSL-enabled Server.
- Args:
- private_key_certificate_chain_pairs: A list of pairs of the form
- [PEM-encoded private key, PEM-encoded certificate chain].
- root_certificates: An optional byte string of PEM-encoded client root
- certificates that the server will use to verify client authentication.
- If omitted, require_client_auth must also be False.
- require_client_auth: A boolean indicating whether or not to require
- clients to be authenticated. May only be True if root_certificates
- is not None.
-
- Returns:
- A ServerCredentials for use with an SSL-enabled Server. Typically, this
- object is an argument to add_secure_port() method during server setup.
- """
+ Args:
+ private_key_certificate_chain_pairs: A list of pairs of the form
+ [PEM-encoded private key, PEM-encoded certificate chain].
+ root_certificates: An optional byte string of PEM-encoded client root
+ certificates that the server will use to verify client authentication.
+ If omitted, require_client_auth must also be False.
+ require_client_auth: A boolean indicating whether or not to require
+ clients to be authenticated. May only be True if root_certificates
+ is not None.
+
+ Returns:
+ A ServerCredentials for use with an SSL-enabled Server. Typically, this
+ object is an argument to add_secure_port() method during server setup.
+ """
if len(private_key_certificate_chain_pairs) == 0:
raise ValueError(
'At least one private key-certificate chain pair is required!')
@@ -1522,16 +1538,16 @@ def dynamic_ssl_server_credentials(initial_certificate_configuration,
def channel_ready_future(channel):
"""Creates a Future that tracks when a Channel is ready.
- Cancelling the Future does not affect the channel's state machine.
- It merely decouples the Future from channel state machine.
+ Cancelling the Future does not affect the channel's state machine.
+ It merely decouples the Future from channel state machine.
- Args:
- channel: A Channel object.
+ Args:
+ channel: A Channel object.
- Returns:
- A Future object that matures when the channel connectivity is
- ChannelConnectivity.READY.
- """
+ Returns:
+ A Future object that matures when the channel connectivity is
+ ChannelConnectivity.READY.
+ """
from grpc import _utilities # pylint: disable=cyclic-import
return _utilities.channel_ready_future(channel)
@@ -1539,14 +1555,14 @@ def channel_ready_future(channel):
def insecure_channel(target, options=None):
"""Creates an insecure Channel to a server.
- Args:
- target: The server address
- options: An optional list of key-value pairs (channel args in gRPC runtime)
- to configure the channel.
+ Args:
+ target: The server address
+ options: An optional list of key-value pairs (channel args
+ in gRPC Core runtime) to configure the channel.
- Returns:
- A Channel object.
- """
+ Returns:
+ A Channel object.
+ """
from grpc import _channel # pylint: disable=cyclic-import
return _channel.Channel(target, () if options is None else options, None)
@@ -1554,15 +1570,15 @@ def insecure_channel(target, options=None):
def secure_channel(target, credentials, options=None):
"""Creates a secure Channel to a server.
- Args:
- target: The server address.
- credentials: A ChannelCredentials instance.
- options: An optional list of key-value pairs (channel args in gRPC runtime)
- to configure the channel.
+ Args:
+ target: The server address.
+ credentials: A ChannelCredentials instance.
+ options: An optional list of key-value pairs (channel args
+ in gRPC Core runtime) to configure the channel.
- Returns:
- A Channel object.
- """
+ Returns:
+ A Channel object.
+ """
from grpc import _channel # pylint: disable=cyclic-import
return _channel.Channel(target, () if options is None else options,
credentials._credentials)
diff --git a/templates/gRPC-Core.podspec.template b/templates/gRPC-Core.podspec.template
index 9785d150e4..da404e2fef 100644
--- a/templates/gRPC-Core.podspec.template
+++ b/templates/gRPC-Core.podspec.template
@@ -202,6 +202,6 @@
# TODO (mxyan): Instead of this hack, add include path "third_party" to C core's include path?
s.prepare_command = <<-END_OF_COMMAND
- find src/core/ -type f -exec sed -E -i '.back' 's;#include "third_party/nanopb/(.*)";#include <nanopb/\\1>;g' {} \\\;
+ find src/core/ -type f -exec sed -E -i'.back' 's;#include "third_party/nanopb/(.*)";#include <nanopb/\\1>;g' {} \\\;
END_OF_COMMAND
end
diff --git a/templates/tools/dockerfile/clang_format.include b/templates/tools/dockerfile/clang5.include
index 79d0ff286f..11ff442787 100644
--- a/templates/tools/dockerfile/clang_format.include
+++ b/templates/tools/dockerfile/clang5.include
@@ -3,3 +3,5 @@ RUN wget http://releases.llvm.org/5.0.0/clang+llvm-5.0.0-linux-x86_64-ubuntu14.0
RUN tar xf clang+llvm-5.0.0-linux-x86_64-ubuntu14.04.tar.xz
RUN ln -s /clang+llvm-5.0.0-linux-x86_64-ubuntu14.04/bin/clang-format /usr/local/bin/clang-format
ENV CLANG_FORMAT=clang-format
+RUN ln -s /clang+llvm-5.0.0-linux-x86_64-ubuntu14.04/bin/clang-tidy /usr/local/bin/clang-tidy
+ENV CLANG_TIDY=clang-tidy
diff --git a/templates/tools/dockerfile/grpc_clang_format/Dockerfile.template b/templates/tools/dockerfile/grpc_clang_format/Dockerfile.template
index 1ab667c95d..4f24a025c6 100644
--- a/templates/tools/dockerfile/grpc_clang_format/Dockerfile.template
+++ b/templates/tools/dockerfile/grpc_clang_format/Dockerfile.template
@@ -16,8 +16,8 @@
FROM debian:jessie
- <%include file="../clang_format.include"/>
+ <%include file="../clang5.include"/>
ADD clang_format_all_the_things.sh /
CMD ["echo 'Run with tools/distrib/clang_format_code.sh'"]
- \ No newline at end of file
+
diff --git a/templates/tools/dockerfile/grpc_clang_tidy/Dockerfile.template b/templates/tools/dockerfile/grpc_clang_tidy/Dockerfile.template
new file mode 100644
index 0000000000..f5bceaa5f3
--- /dev/null
+++ b/templates/tools/dockerfile/grpc_clang_tidy/Dockerfile.template
@@ -0,0 +1,24 @@
+%YAML 1.2
+--- |
+ # Copyright 2015 gRPC authors.
+ #
+ # Licensed under the Apache License, Version 2.0 (the "License");
+ # you may not use this file except in compliance with the License.
+ # You may obtain a copy of the License at
+ #
+ # http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+ FROM debian:jessie
+
+ <%include file="../clang5.include"/>
+ <%include file="../python_deps.include"/>
+ ADD clang_tidy_all_the_things.sh /
+ CMD ["echo 'Run with tools/distrib/clang_tidy_code.sh'"]
+
+
diff --git a/templates/tools/dockerfile/test/sanity/Dockerfile.template b/templates/tools/dockerfile/test/sanity/Dockerfile.template
index c98f7d4176..7453e6c460 100644
--- a/templates/tools/dockerfile/test/sanity/Dockerfile.template
+++ b/templates/tools/dockerfile/test/sanity/Dockerfile.template
@@ -53,7 +53,7 @@
RUN chmod +x ./bazel-0.4.4-installer-linux-x86_64.sh
RUN ./bazel-0.4.4-installer-linux-x86_64.sh
- <%include file="../../clang_format.include"/>
+ <%include file="../../clang5.include"/>
<%include file="../../run_tests_addons.include"/>
# Define the default command.
diff --git a/test/core/iomgr/ev_epollsig_linux_test.cc b/test/core/iomgr/ev_epollsig_linux_test.cc
index e767e01f21..262470300e 100644
--- a/test/core/iomgr/ev_epollsig_linux_test.cc
+++ b/test/core/iomgr/ev_epollsig_linux_test.cc
@@ -18,7 +18,7 @@
#include "src/core/lib/iomgr/port.h"
/* This test only relevant on linux systems where epoll() is available */
-#ifdef GRPC_LINUX_EPOLL
+#ifdef GRPC_LINUX_EPOLL_CREATE1
#include "src/core/lib/iomgr/ev_epollsig_linux.h"
#include "src/core/lib/iomgr/ev_posix.h"
@@ -319,6 +319,6 @@ int main(int argc, char** argv) {
grpc_shutdown();
return 0;
}
-#else /* defined(GRPC_LINUX_EPOLL) */
+#else /* defined(GRPC_LINUX_EPOLL_CREATE1) */
int main(int argc, char** argv) { return 0; }
-#endif /* !defined(GRPC_LINUX_EPOLL) */
+#endif /* !defined(GRPC_LINUX_EPOLL_CREATE1) */
diff --git a/test/core/iomgr/pollset_set_test.cc b/test/core/iomgr/pollset_set_test.cc
index f27079134b..7d2f59bed4 100644
--- a/test/core/iomgr/pollset_set_test.cc
+++ b/test/core/iomgr/pollset_set_test.cc
@@ -18,7 +18,7 @@
#include "src/core/lib/iomgr/port.h"
/* This test only relevant on linux systems where epoll is available */
-#ifdef GRPC_LINUX_EPOLL
+#ifdef GRPC_LINUX_EPOLL_CREATE1
#include <errno.h>
#include <string.h>
@@ -443,6 +443,6 @@ int main(int argc, char** argv) {
grpc_shutdown();
return 0;
}
-#else /* defined(GRPC_LINUX_EPOLL) */
+#else /* defined(GRPC_LINUX_EPOLL_CREATE1) */
int main(int argc, char** argv) { return 0; }
-#endif /* !defined(GRPC_LINUX_EPOLL) */
+#endif /* !defined(GRPC_LINUX_EPOLL_CREATE1) */
diff --git a/test/core/support/BUILD b/test/core/support/BUILD
index 4372b49b54..c8fa046da1 100644
--- a/test/core/support/BUILD
+++ b/test/core/support/BUILD
@@ -215,6 +215,19 @@ grpc_cc_test(
)
grpc_cc_test(
+ name = "orphanable_test",
+ srcs = ["orphanable_test.cc"],
+ language = "C++",
+ deps = [
+ "//:orphanable",
+ "//test/core/util:gpr_test_util",
+ ],
+ external_deps = [
+ "gtest",
+ ],
+)
+
+grpc_cc_test(
name = "ref_counted_test",
srcs = ["ref_counted_test.cc"],
language = "C++",
diff --git a/test/core/support/orphanable_test.cc b/test/core/support/orphanable_test.cc
new file mode 100644
index 0000000000..e07017ab1e
--- /dev/null
+++ b/test/core/support/orphanable_test.cc
@@ -0,0 +1,114 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "src/core/lib/support/orphanable.h"
+
+#include <gtest/gtest.h>
+
+#include "src/core/lib/support/memory.h"
+#include "test/core/util/test_config.h"
+
+namespace grpc_core {
+namespace testing {
+namespace {
+
+class Foo : public Orphanable {
+ public:
+ Foo() : Foo(0) {}
+ explicit Foo(int value) : value_(value) {}
+ void Orphan() override { Delete(this); }
+ int value() const { return value_; }
+
+ private:
+ int value_;
+};
+
+TEST(Orphanable, Basic) {
+ Foo* foo = New<Foo>();
+ foo->Orphan();
+}
+
+TEST(OrphanablePtr, Basic) {
+ OrphanablePtr<Foo> foo(New<Foo>());
+ EXPECT_EQ(0, foo->value());
+}
+
+TEST(MakeOrphanable, DefaultConstructor) {
+ auto foo = MakeOrphanable<Foo>();
+ EXPECT_EQ(0, foo->value());
+}
+
+TEST(MakeOrphanable, WithParameters) {
+ auto foo = MakeOrphanable<Foo>(5);
+ EXPECT_EQ(5, foo->value());
+}
+
+class Bar : public InternallyRefCounted {
+ public:
+ Bar() : Bar(0) {}
+ explicit Bar(int value) : value_(value) {}
+ void Orphan() override { Unref(); }
+ int value() const { return value_; }
+
+ void StartWork() { Ref(); }
+ void FinishWork() { Unref(); }
+
+ private:
+ int value_;
+};
+
+TEST(OrphanablePtr, InternallyRefCounted) {
+ auto bar = MakeOrphanable<Bar>();
+ bar->StartWork();
+ bar->FinishWork();
+}
+
+// Note: We use DebugOnlyTraceFlag instead of TraceFlag to ensure that
+// things build properly in both debug and non-debug cases.
+DebugOnlyTraceFlag baz_tracer(true, "baz");
+
+class Baz : public InternallyRefCountedWithTracing {
+ public:
+ Baz() : Baz(0) {}
+ explicit Baz(int value)
+ : InternallyRefCountedWithTracing(&baz_tracer), value_(value) {}
+ void Orphan() override { Unref(); }
+ int value() const { return value_; }
+
+ void StartWork() { Ref(DEBUG_LOCATION, "work"); }
+ void FinishWork() { Unref(DEBUG_LOCATION, "work"); }
+
+ private:
+ int value_;
+};
+
+TEST(OrphanablePtr, InternallyRefCountedWithTracing) {
+ auto baz = MakeOrphanable<Baz>();
+ baz->StartWork();
+ baz->FinishWork();
+}
+
+} // namespace
+} // namespace testing
+} // namespace grpc_core
+
+int main(int argc, char** argv) {
+ grpc_test_init(argc, argv);
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/test/core/support/ref_counted_ptr_test.cc b/test/core/support/ref_counted_ptr_test.cc
index 1830edc4e5..ce4975d347 100644
--- a/test/core/support/ref_counted_ptr_test.cc
+++ b/test/core/support/ref_counted_ptr_test.cc
@@ -138,6 +138,19 @@ TEST(RefCountedPtr, DerefernceOperators) {
foo_ref.value();
}
+TEST(RefCountedPtr, EqualityOperators) {
+ RefCountedPtr<Foo> foo(New<Foo>());
+ RefCountedPtr<Foo> bar = foo;
+ RefCountedPtr<Foo> empty;
+ // Test equality between RefCountedPtrs.
+ EXPECT_EQ(foo, bar);
+ EXPECT_NE(foo, empty);
+ // Test equality with bare pointers.
+ EXPECT_EQ(foo, foo.get());
+ EXPECT_EQ(empty, nullptr);
+ EXPECT_NE(foo, nullptr);
+}
+
TEST(MakeRefCounted, NoArgs) {
RefCountedPtr<Foo> foo = MakeRefCounted<Foo>();
EXPECT_EQ(0, foo->value());
diff --git a/test/core/support/ref_counted_test.cc b/test/core/support/ref_counted_test.cc
index be9b6ff7c2..0629e3ff5f 100644
--- a/test/core/support/ref_counted_test.cc
+++ b/test/core/support/ref_counted_test.cc
@@ -44,7 +44,9 @@ TEST(RefCounted, ExtraRef) {
foo->Unref();
}
-TraceFlag foo_tracer(true, "foo");
+// Note: We use DebugOnlyTraceFlag instead of TraceFlag to ensure that
+// things build properly in both debug and non-debug cases.
+DebugOnlyTraceFlag foo_tracer(true, "foo");
class FooWithTracing : public RefCountedWithTracing {
public:
diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc
index c6e9577f0c..5a7e52e9e9 100644
--- a/test/cpp/end2end/client_lb_end2end_test.cc
+++ b/test/cpp/end2end/client_lb_end2end_test.cc
@@ -673,6 +673,42 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) {
GPR_ASSERT(gpr_time_cmp(deadline, now) > 0);
}
+TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) {
+ const int kNumServers = 3;
+ StartServers(kNumServers);
+ const auto ports = GetServersPorts();
+ ResetStub(ports, "round_robin");
+ SetNextResolution(ports);
+ for (size_t i = 0; i < kNumServers; ++i) WaitForServer(i);
+ for (size_t i = 0; i < servers_.size(); ++i) {
+ CheckRpcSendOk();
+ EXPECT_EQ(1, servers_[i]->service_.request_count()) << "for backend #" << i;
+ }
+ // One request should have gone to each server.
+ for (size_t i = 0; i < servers_.size(); ++i) {
+ EXPECT_EQ(1, servers_[i]->service_.request_count());
+ }
+ const auto pre_death = servers_[0]->service_.request_count();
+ // Kill the first server.
+ servers_[0]->Shutdown(true);
+ // Client request still succeed. May need retrying if RR had returned a pick
+ // before noticing the change in the server's connectivity.
+ while (!SendRpc())
+ ; // Retry until success.
+ // Send a bunch of RPCs that should succeed.
+ for (int i = 0; i < 10 * kNumServers; ++i) CheckRpcSendOk();
+ const auto post_death = servers_[0]->service_.request_count();
+ // No requests have gone to the deceased server.
+ EXPECT_EQ(pre_death, post_death);
+ // Bring the first server back up.
+ servers_[0].reset(new ServerData(server_host_, ports[0]));
+ // Requests should start arriving at the first server either right away (if
+ // the server managed to start before the RR policy retried the subchannel) or
+ // after the subchannel retry delay otherwise (RR's subchannel retried before
+ // the server was fully back up).
+ WaitForServer(0);
+}
+
} // namespace
} // namespace testing
} // namespace grpc
diff --git a/tools/distrib/clang_tidy_code.sh b/tools/distrib/clang_tidy_code.sh
new file mode 100755
index 0000000000..5da86aa277
--- /dev/null
+++ b/tools/distrib/clang_tidy_code.sh
@@ -0,0 +1,33 @@
+#!/bin/bash
+# Copyright 2015 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+echo "NOTE: to automagically apply fixes, invoke with --fix"
+
+set -ex
+
+# change to root directory
+cd $(dirname $0)/../..
+REPO_ROOT=$(pwd)
+
+if [ "$CLANG_TIDY_SKIP_DOCKER" == "" ]
+then
+ # build clang-tidy docker image
+ docker build -t grpc_clang_tidy tools/dockerfile/grpc_clang_tidy
+
+ # run clang-tidy against the checked out codebase
+ docker run -e TEST=$TEST -e CHANGED_FILES="$CHANGED_FILES" -e CLANG_TIDY_ROOT="/local-code" --rm=true -v "${REPO_ROOT}":/local-code -t grpc_clang_tidy /clang_tidy_all_the_things.sh "$@"
+else
+ CLANG_TIDY_ROOT="${REPO_ROOT}" tools/dockerfile/grpc_clang_tidy/clang_tidy_all_the_things.sh "$@"
+fi
diff --git a/tools/distrib/run_clang_tidy.py b/tools/distrib/run_clang_tidy.py
index 72d7956b68..bc61d4e79a 100755
--- a/tools/distrib/run_clang_tidy.py
+++ b/tools/distrib/run_clang_tidy.py
@@ -69,4 +69,5 @@ for filename in args.files:
shortname=filename,
)) #verbose_success=True))
-jobset.run(jobs, maxjobs=args.jobs)
+num_fails, res_set = jobset.run(jobs, maxjobs=args.jobs)
+sys.exit(num_fails)
diff --git a/tools/dockerfile/grpc_clang_format/Dockerfile b/tools/dockerfile/grpc_clang_format/Dockerfile
index b3abaef465..8801315bc9 100644
--- a/tools/dockerfile/grpc_clang_format/Dockerfile
+++ b/tools/dockerfile/grpc_clang_format/Dockerfile
@@ -19,6 +19,8 @@ RUN wget http://releases.llvm.org/5.0.0/clang+llvm-5.0.0-linux-x86_64-ubuntu14.0
RUN tar xf clang+llvm-5.0.0-linux-x86_64-ubuntu14.04.tar.xz
RUN ln -s /clang+llvm-5.0.0-linux-x86_64-ubuntu14.04/bin/clang-format /usr/local/bin/clang-format
ENV CLANG_FORMAT=clang-format
+RUN ln -s /clang+llvm-5.0.0-linux-x86_64-ubuntu14.04/bin/clang-tidy /usr/local/bin/clang-tidy
+ENV CLANG_TIDY=clang-tidy
ADD clang_format_all_the_things.sh /
CMD ["echo 'Run with tools/distrib/clang_format_code.sh'"]
diff --git a/tools/dockerfile/grpc_clang_tidy/Dockerfile b/tools/dockerfile/grpc_clang_tidy/Dockerfile
new file mode 100644
index 0000000000..9d9d70185b
--- /dev/null
+++ b/tools/dockerfile/grpc_clang_tidy/Dockerfile
@@ -0,0 +1,41 @@
+# Copyright 2015 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM debian:jessie
+
+RUN apt-get update && apt-get -y install wget xz-utils
+RUN wget http://releases.llvm.org/5.0.0/clang+llvm-5.0.0-linux-x86_64-ubuntu14.04.tar.xz
+RUN tar xf clang+llvm-5.0.0-linux-x86_64-ubuntu14.04.tar.xz
+RUN ln -s /clang+llvm-5.0.0-linux-x86_64-ubuntu14.04/bin/clang-format /usr/local/bin/clang-format
+ENV CLANG_FORMAT=clang-format
+RUN ln -s /clang+llvm-5.0.0-linux-x86_64-ubuntu14.04/bin/clang-tidy /usr/local/bin/clang-tidy
+ENV CLANG_TIDY=clang-tidy
+
+#====================
+# Python dependencies
+
+# Install dependencies
+
+RUN apt-get update && apt-get install -y \
+ python-all-dev \
+ python3-all-dev \
+ python-pip
+
+# Install Python packages from PyPI
+RUN pip install --upgrade pip==9.0.1
+RUN pip install virtualenv
+RUN pip install futures==2.2.0 enum34==1.0.4 protobuf==3.5.0.post1 six==1.10.0 twisted==17.5.0
+
+ADD clang_tidy_all_the_things.sh /
+CMD ["echo 'Run with tools/distrib/clang_tidy_code.sh'"]
diff --git a/tools/run_tests/sanity/check_clang_tidy.sh b/tools/dockerfile/grpc_clang_tidy/clang_tidy_all_the_things.sh
index 6c4caa1ee6..1a82dd52b7 100755
--- a/tools/run_tests/sanity/check_clang_tidy.sh
+++ b/tools/dockerfile/grpc_clang_tidy/clang_tidy_all_the_things.sh
@@ -13,9 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-set -e
+set -ex
-make buildtests \
- -j "$(python -c 'import multiprocessing; print multiprocessing.cpu_count()')"
-find src/core src/cpp test/core test/cpp -print0 -name '*.h' -or -name '*.cc' \
+# clang format command
+CLANG_TIDY=${CLANG_TIDY:-clang-tidy-5.0}
+
+cd ${CLANG_TIDY_ROOT}
+
+find src/core src/cpp test/core test/cpp -name '*.h' -or -name '*.cc' -print0 \
| xargs -0 tools/distrib/run_clang_tidy.py "$@"
diff --git a/tools/dockerfile/test/sanity/Dockerfile b/tools/dockerfile/test/sanity/Dockerfile
index 6e5a133a69..7a8e1c09b1 100644
--- a/tools/dockerfile/test/sanity/Dockerfile
+++ b/tools/dockerfile/test/sanity/Dockerfile
@@ -110,6 +110,8 @@ RUN wget http://releases.llvm.org/5.0.0/clang+llvm-5.0.0-linux-x86_64-ubuntu14.0
RUN tar xf clang+llvm-5.0.0-linux-x86_64-ubuntu14.04.tar.xz
RUN ln -s /clang+llvm-5.0.0-linux-x86_64-ubuntu14.04/bin/clang-format /usr/local/bin/clang-format
ENV CLANG_FORMAT=clang-format
+RUN ln -s /clang+llvm-5.0.0-linux-x86_64-ubuntu14.04/bin/clang-tidy /usr/local/bin/clang-tidy
+ENV CLANG_TIDY=clang-tidy
# Prepare ccache
RUN ln -s /usr/bin/ccache /usr/local/bin/gcc
diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal
index d9184f49a2..85bbeed088 100644
--- a/tools/doxygen/Doxyfile.c++.internal
+++ b/tools/doxygen/Doxyfile.c++.internal
@@ -1038,6 +1038,7 @@ src/core/lib/support/manual_constructor.h \
src/core/lib/support/memory.h \
src/core/lib/support/mpscq.h \
src/core/lib/support/murmur_hash.h \
+src/core/lib/support/orphanable.h \
src/core/lib/support/ref_counted.h \
src/core/lib/support/ref_counted_ptr.h \
src/core/lib/support/spinlock.h \
diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal
index 3d3c6711d0..4bf0fc74d1 100644
--- a/tools/doxygen/Doxyfile.core.internal
+++ b/tools/doxygen/Doxyfile.core.internal
@@ -1304,6 +1304,7 @@ src/core/lib/support/mpscq.cc \
src/core/lib/support/mpscq.h \
src/core/lib/support/murmur_hash.cc \
src/core/lib/support/murmur_hash.h \
+src/core/lib/support/orphanable.h \
src/core/lib/support/ref_counted.h \
src/core/lib/support/ref_counted_ptr.h \
src/core/lib/support/spinlock.h \
diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json
index c934155ecc..df45489d5e 100644
--- a/tools/run_tests/generated/sources_and_headers.json
+++ b/tools/run_tests/generated/sources_and_headers.json
@@ -3713,6 +3713,25 @@
"gpr_test_util",
"grpc",
"grpc++",
+ "grpc++_test",
+ "grpc_test_util"
+ ],
+ "headers": [],
+ "is_filegroup": false,
+ "language": "c++",
+ "name": "orphanable_test",
+ "src": [
+ "test/core/support/orphanable_test.cc"
+ ],
+ "third_party": false,
+ "type": "target"
+ },
+ {
+ "deps": [
+ "gpr",
+ "gpr_test_util",
+ "grpc",
+ "grpc++",
"grpc++_proto_reflection_desc_db",
"grpc++_reflection",
"grpc++_test_util",
@@ -8286,6 +8305,7 @@
"src/core/lib/slice/slice_internal.h",
"src/core/lib/slice/slice_string_helpers.h",
"src/core/lib/support/debug_location.h",
+ "src/core/lib/support/orphanable.h",
"src/core/lib/support/ref_counted.h",
"src/core/lib/support/ref_counted_ptr.h",
"src/core/lib/support/vector.h",
@@ -8426,6 +8446,7 @@
"src/core/lib/slice/slice_internal.h",
"src/core/lib/slice/slice_string_helpers.h",
"src/core/lib/support/debug_location.h",
+ "src/core/lib/support/orphanable.h",
"src/core/lib/support/ref_counted.h",
"src/core/lib/support/ref_counted_ptr.h",
"src/core/lib/support/vector.h",
diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json
index 6f36dff820..dc2b39eb21 100644
--- a/tools/run_tests/generated/tests.json
+++ b/tools/run_tests/generated/tests.json
@@ -4092,6 +4092,30 @@
"flaky": false,
"gtest": true,
"language": "c++",
+ "name": "orphanable_test",
+ "platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ],
+ "uses_polling": true
+ },
+ {
+ "args": [],
+ "benchmark": false,
+ "ci_platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ],
+ "cpu_cost": 1.0,
+ "exclude_configs": [],
+ "exclude_iomgrs": [],
+ "flaky": false,
+ "gtest": true,
+ "language": "c++",
"name": "proto_server_reflection_test",
"platforms": [
"linux",
diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py
index c8e917f117..6b27d6f875 100755
--- a/tools/run_tests/run_tests.py
+++ b/tools/run_tests/run_tests.py
@@ -1127,6 +1127,7 @@ class Sanity(object):
environ = {'TEST': 'true'}
if _is_use_docker_child():
environ['CLANG_FORMAT_SKIP_DOCKER'] = 'true'
+ environ['CLANG_TIDY_SKIP_DOCKER'] = 'true'
return [
self.config.job_spec(
cmd['script'].split(),
diff --git a/tools/run_tests/sanity/sanity_tests.yaml b/tools/run_tests/sanity/sanity_tests.yaml
index dab991a7b1..efdb4d84b5 100644
--- a/tools/run_tests/sanity/sanity_tests.yaml
+++ b/tools/run_tests/sanity/sanity_tests.yaml
@@ -14,11 +14,11 @@
cpu_cost: 3
- script: tools/distrib/check_copyright.py
- script: tools/distrib/clang_format_code.sh
+- script: tools/distrib/clang_tidy_code.sh
- script: tools/distrib/check_trailing_newlines.sh
- script: tools/distrib/check_nanopb_output.sh
- script: tools/distrib/check_include_guards.py
- script: tools/distrib/pylint_code.sh
- script: tools/distrib/yapf_code.sh
- script: tools/distrib/python/check_grpcio_tools.py
-- script: tools/run_tests/sanity/check_clang_tidy.sh
cpu_cost: 1000