aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--.gitignore10
-rw-r--r--CMakeLists.txt32
-rw-r--r--Makefile36
-rw-r--r--build.yaml10
-rw-r--r--grpc.def3
-rw-r--r--include/grpc/slice.h6
-rw-r--r--src/core/ext/filters/client_channel/client_channel.c76
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c27
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h97
-rw-r--r--src/core/ext/filters/client_channel/lb_policy_factory.c33
-rw-r--r--src/core/ext/filters/client_channel/lb_policy_factory.h22
-rw-r--r--src/core/ext/filters/client_channel/parse_address.c44
-rw-r--r--src/core/ext/filters/client_channel/parse_address.h19
-rw-r--r--src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c6
-rw-r--r--src/core/ext/filters/client_channel/subchannel.c8
-rw-r--r--src/core/ext/filters/client_channel/uri_parser.c58
-rw-r--r--src/core/ext/filters/client_channel/uri_parser.h2
-rw-r--r--src/core/ext/filters/http/client/http_client_filter.c18
-rw-r--r--src/core/ext/filters/http/message_compress/message_compress_filter.c14
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c351
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_data.c365
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_data.h24
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h51
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.c5
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c17
-rw-r--r--src/core/lib/channel/channel_args.c11
-rw-r--r--src/core/lib/channel/channel_args.h3
-rw-r--r--src/core/lib/iomgr/sockaddr_utils.h2
-rw-r--r--src/core/lib/iomgr/udp_server.c38
-rw-r--r--src/core/lib/iomgr/udp_server.h4
-rw-r--r--src/core/lib/security/credentials/fake/fake_credentials.c23
-rw-r--r--src/core/lib/security/credentials/fake/fake_credentials.h21
-rw-r--r--src/core/lib/security/transport/security_connector.c8
-rw-r--r--src/core/lib/surface/call.c44
-rw-r--r--src/core/lib/transport/byte_stream.c32
-rw-r--r--src/core/lib/transport/byte_stream.h21
-rw-r--r--src/proto/grpc/lb/v1/load_balancer.proto57
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.c6
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.h9
-rw-r--r--test/core/client_channel/parse_address_test.c26
-rw-r--r--test/core/client_channel/resolvers/BUILD40
-rw-r--r--test/core/client_channel/resolvers/fake_resolver_test.c187
-rw-r--r--test/core/end2end/BUILD79
-rw-r--r--test/core/end2end/fake_resolver.c211
-rw-r--r--test/core/end2end/fake_resolver.h34
-rw-r--r--test/core/end2end/fuzzers/api_fuzzer.c3
-rw-r--r--test/core/end2end/fuzzers/api_fuzzer_corpus/clusterfuzz-testcase-5867145026076672bin0 -> 46 bytes
-rw-r--r--test/core/iomgr/udp_server_test.c14
-rw-r--r--test/cpp/grpclb/grpclb_api_test.cc12
-rw-r--r--test/cpp/grpclb/grpclb_test.cc45
-rw-r--r--test/cpp/microbenchmarks/bm_chttp2_transport.cc9
-rwxr-xr-xtools/jenkins/run_performance.sh2
-rwxr-xr-xtools/profiling/microbenchmarks/bm_diff.py152
-rw-r--r--tools/profiling/microbenchmarks/speedup.py67
-rw-r--r--tools/run_tests/generated/sources_and_headers.json17
-rw-r--r--tools/run_tests/generated/tests.json45
-rw-r--r--vsprojects/buildtests_c.sln27
-rw-r--r--vsprojects/vcxproj/test/fake_resolver_test/fake_resolver_test.vcxproj199
-rw-r--r--vsprojects/vcxproj/test/fake_resolver_test/fake_resolver_test.vcxproj.filters24
59 files changed, 2048 insertions, 758 deletions
diff --git a/.gitignore b/.gitignore
index ad3ec64ab5..5e38f5fa01 100644
--- a/.gitignore
+++ b/.gitignore
@@ -118,3 +118,13 @@ gdb.txt
# ctags file
tags
+
+# perf data
+perf.data
+perf.data.old
+
+# bm_diff
+bm_diff_new/
+bm_diff_old/
+bm_*.json
+
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 0dad894cc4..a6a2af65d5 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -391,6 +391,7 @@ add_dependencies(buildtests_c error_test)
if(_gRPC_PLATFORM_LINUX)
add_dependencies(buildtests_c ev_epoll_linux_test)
endif()
+add_dependencies(buildtests_c fake_resolver_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_c fd_conservation_posix_test)
endif()
@@ -5446,6 +5447,37 @@ target_link_libraries(ev_epoll_linux_test
endif()
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
+
+add_executable(fake_resolver_test
+ test/core/client_channel/resolvers/fake_resolver_test.c
+)
+
+
+target_include_directories(fake_resolver_test
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
+ PRIVATE ${BORINGSSL_ROOT_DIR}/include
+ PRIVATE ${PROTOBUF_ROOT_DIR}/src
+ PRIVATE ${BENCHMARK_ROOT_DIR}/include
+ PRIVATE ${ZLIB_ROOT_DIR}
+ PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib
+ PRIVATE ${CARES_BUILD_INCLUDE_DIR}
+ PRIVATE ${CARES_INCLUDE_DIR}
+ PRIVATE ${CARES_PLATFORM_INCLUDE_DIR}
+ PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/cares/cares
+ PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/gflags/include
+)
+
+target_link_libraries(fake_resolver_test
+ ${_gRPC_ALLTARGETS_LIBRARIES}
+ grpc_test_util
+ grpc
+ gpr_test_util
+ gpr
+)
+
+endif (gRPC_BUILD_TESTS)
+if (gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_executable(fd_conservation_posix_test
diff --git a/Makefile b/Makefile
index b3d8737ecc..8898939af9 100644
--- a/Makefile
+++ b/Makefile
@@ -981,6 +981,7 @@ dualstack_socket_test: $(BINDIR)/$(CONFIG)/dualstack_socket_test
endpoint_pair_test: $(BINDIR)/$(CONFIG)/endpoint_pair_test
error_test: $(BINDIR)/$(CONFIG)/error_test
ev_epoll_linux_test: $(BINDIR)/$(CONFIG)/ev_epoll_linux_test
+fake_resolver_test: $(BINDIR)/$(CONFIG)/fake_resolver_test
fd_conservation_posix_test: $(BINDIR)/$(CONFIG)/fd_conservation_posix_test
fd_posix_test: $(BINDIR)/$(CONFIG)/fd_posix_test
fling_client: $(BINDIR)/$(CONFIG)/fling_client
@@ -1366,6 +1367,7 @@ buildtests_c: privatelibs_c \
$(BINDIR)/$(CONFIG)/endpoint_pair_test \
$(BINDIR)/$(CONFIG)/error_test \
$(BINDIR)/$(CONFIG)/ev_epoll_linux_test \
+ $(BINDIR)/$(CONFIG)/fake_resolver_test \
$(BINDIR)/$(CONFIG)/fd_conservation_posix_test \
$(BINDIR)/$(CONFIG)/fd_posix_test \
$(BINDIR)/$(CONFIG)/fling_client \
@@ -1777,6 +1779,8 @@ test_c: buildtests_c
$(Q) $(BINDIR)/$(CONFIG)/error_test || ( echo test error_test failed ; exit 1 )
$(E) "[RUN] Testing ev_epoll_linux_test"
$(Q) $(BINDIR)/$(CONFIG)/ev_epoll_linux_test || ( echo test ev_epoll_linux_test failed ; exit 1 )
+ $(E) "[RUN] Testing fake_resolver_test"
+ $(Q) $(BINDIR)/$(CONFIG)/fake_resolver_test || ( echo test fake_resolver_test failed ; exit 1 )
$(E) "[RUN] Testing fd_conservation_posix_test"
$(Q) $(BINDIR)/$(CONFIG)/fd_conservation_posix_test || ( echo test fd_conservation_posix_test failed ; exit 1 )
$(E) "[RUN] Testing fd_posix_test"
@@ -9401,6 +9405,38 @@ endif
endif
+FAKE_RESOLVER_TEST_SRC = \
+ test/core/client_channel/resolvers/fake_resolver_test.c \
+
+FAKE_RESOLVER_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(FAKE_RESOLVER_TEST_SRC))))
+ifeq ($(NO_SECURE),true)
+
+# You can't build secure targets if you don't have OpenSSL.
+
+$(BINDIR)/$(CONFIG)/fake_resolver_test: openssl_dep_error
+
+else
+
+
+
+$(BINDIR)/$(CONFIG)/fake_resolver_test: $(FAKE_RESOLVER_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+ $(E) "[LD] Linking $@"
+ $(Q) mkdir -p `dirname $@`
+ $(Q) $(LD) $(LDFLAGS) $(FAKE_RESOLVER_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/fake_resolver_test
+
+endif
+
+$(OBJDIR)/$(CONFIG)/test/core/client_channel/resolvers/fake_resolver_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+
+deps_fake_resolver_test: $(FAKE_RESOLVER_TEST_OBJS:.o=.dep)
+
+ifneq ($(NO_SECURE),true)
+ifneq ($(NO_DEPS),true)
+-include $(FAKE_RESOLVER_TEST_OBJS:.o=.dep)
+endif
+endif
+
+
FD_CONSERVATION_POSIX_TEST_SRC = \
test/core/iomgr/fd_conservation_posix_test.c \
diff --git a/build.yaml b/build.yaml
index cfa479c4ac..f4461e40ef 100644
--- a/build.yaml
+++ b/build.yaml
@@ -1820,6 +1820,16 @@ targets:
- uv
platforms:
- linux
+- name: fake_resolver_test
+ build: test
+ language: c
+ src:
+ - test/core/client_channel/resolvers/fake_resolver_test.c
+ deps:
+ - grpc_test_util
+ - grpc
+ - gpr_test_util
+ - gpr
- name: fd_conservation_posix_test
build: test
language: c
diff --git a/grpc.def b/grpc.def
index 1589316a58..baaf470a60 100644
--- a/grpc.def
+++ b/grpc.def
@@ -145,6 +145,7 @@ EXPORTS
grpc_slice_new_with_user_data
grpc_slice_new_with_len
grpc_slice_malloc
+ grpc_slice_malloc_large
grpc_slice_intern
grpc_slice_from_copied_string
grpc_slice_from_copied_buffer
@@ -153,6 +154,7 @@ EXPORTS
grpc_slice_sub
grpc_slice_sub_no_ref
grpc_slice_split_tail
+ grpc_slice_split_tail_maybe_ref
grpc_slice_split_head
grpc_empty_slice
grpc_slice_default_hash_impl
@@ -181,6 +183,7 @@ EXPORTS
grpc_slice_buffer_move_into
grpc_slice_buffer_trim_end
grpc_slice_buffer_move_first
+ grpc_slice_buffer_move_first_no_ref
grpc_slice_buffer_move_first_into_buffer
grpc_slice_buffer_take_first
grpc_slice_buffer_undo_take_first
diff --git a/include/grpc/slice.h b/include/grpc/slice.h
index 86a455b42c..d65bde49ee 100644
--- a/include/grpc/slice.h
+++ b/include/grpc/slice.h
@@ -109,9 +109,9 @@ GPRAPI grpc_slice grpc_slice_from_static_string(const char *source);
/* Create a slice pointing to constant memory */
GPRAPI grpc_slice grpc_slice_from_static_buffer(const void *source, size_t len);
-/* Return a result slice derived from s, which shares a ref count with s, where
- result.data==s.data+begin, and result.length==end-begin.
- The ref count of s is increased by one.
+/* Return a result slice derived from s, which shares a ref count with \a s,
+ where result.data==s.data+begin, and result.length==end-begin. The ref count
+ of \a s is increased by one. Do not assign result back to \a s.
Requires s initialized, begin <= end, begin <= s.length, and
end <= source->length. */
GPRAPI grpc_slice grpc_slice_sub(grpc_slice s, size_t begin, size_t end);
diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c
index 16be2c70e9..ce9abdad61 100644
--- a/src/core/ext/filters/client_channel/client_channel.c
+++ b/src/core/ext/filters/client_channel/client_channel.c
@@ -238,14 +238,23 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
grpc_connectivity_state state,
grpc_error *error,
const char *reason) {
- if ((state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
- state == GRPC_CHANNEL_SHUTDOWN) &&
- chand->lb_policy != NULL) {
- /* cancel picks with wait_for_ready=false */
- grpc_lb_policy_cancel_picks_locked(
- exec_ctx, chand->lb_policy,
- /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
- /* check= */ 0, GRPC_ERROR_REF(error));
+ /* TODO: Improve failure handling:
+ * - Make it possible for policies to return GRPC_CHANNEL_TRANSIENT_FAILURE.
+ * - Hand over pending picks from old policies during the switch that happens
+ * when resolver provides an update. */
+ if (chand->lb_policy != NULL) {
+ if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ /* cancel picks with wait_for_ready=false */
+ grpc_lb_policy_cancel_picks_locked(
+ exec_ctx, chand->lb_policy,
+ /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
+ /* check= */ 0, GRPC_ERROR_REF(error));
+ } else if (state == GRPC_CHANNEL_SHUTDOWN) {
+ /* cancel all picks */
+ grpc_lb_policy_cancel_picks_locked(exec_ctx, chand->lb_policy,
+ /* mask= */ 0, /* check= */ 0,
+ GRPC_ERROR_REF(error));
+ }
}
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error,
reason);
@@ -348,6 +357,33 @@ static void parse_retry_throttle_params(const grpc_json *field, void *arg) {
}
}
+// Wrap a closure associated with \a lb_policy. The associated callback (\a
+// wrapped_on_pick_closure_cb) is responsible for unref'ing \a lb_policy after
+// scheduling \a wrapped_closure.
+typedef struct wrapped_on_pick_closure_arg {
+ /* the closure instance using this struct as argument */
+ grpc_closure wrapper_closure;
+
+ /* the original closure. Usually a on_complete/notify cb for pick() and ping()
+ * calls against the internal RR instance, respectively. */
+ grpc_closure *wrapped_closure;
+
+ /* The policy instance related to the closure */
+ grpc_lb_policy *lb_policy;
+} wrapped_on_pick_closure_arg;
+
+// Invoke \a arg->wrapped_closure, unref \a arg->lb_policy and free \a arg.
+static void wrapped_on_pick_closure_cb(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ wrapped_on_pick_closure_arg *wc_arg = arg;
+ GPR_ASSERT(wc_arg != NULL);
+ GPR_ASSERT(wc_arg->wrapped_closure != NULL);
+ GPR_ASSERT(wc_arg->lb_policy != NULL);
+ grpc_closure_run(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error));
+ GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->lb_policy, "pick_subchannel_wrapping");
+ gpr_free(wc_arg);
+}
+
static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
void *arg, grpc_error *error) {
channel_data *chand = arg;
@@ -1037,11 +1073,29 @@ static bool pick_subchannel_locked(
const grpc_lb_policy_pick_args inputs = {
initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem,
gpr_inf_future(GPR_CLOCK_MONOTONIC)};
- const bool result = grpc_lb_policy_pick_locked(
- exec_ctx, lb_policy, &inputs, connected_subchannel, NULL, on_ready);
+
+ // Wrap the user-provided callback in order to hold a strong reference to
+ // the LB policy for the duration of the pick.
+ wrapped_on_pick_closure_arg *w_on_pick_arg =
+ gpr_zalloc(sizeof(*w_on_pick_arg));
+ grpc_closure_init(&w_on_pick_arg->wrapper_closure,
+ wrapped_on_pick_closure_cb, w_on_pick_arg,
+ grpc_schedule_on_exec_ctx);
+ w_on_pick_arg->wrapped_closure = on_ready;
+ GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel_wrapping");
+ w_on_pick_arg->lb_policy = lb_policy;
+ const bool pick_done = grpc_lb_policy_pick_locked(
+ exec_ctx, lb_policy, &inputs, connected_subchannel, NULL,
+ &w_on_pick_arg->wrapper_closure);
+ if (pick_done) {
+ /* synchronous grpc_lb_policy_pick call. Unref the LB policy. */
+ GRPC_LB_POLICY_UNREF(exec_ctx, w_on_pick_arg->lb_policy,
+ "pick_subchannel_wrapping");
+ gpr_free(w_on_pick_arg);
+ }
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel");
GPR_TIMER_END("pick_subchannel", 0);
- return result;
+ return pick_done;
}
if (chand->resolver != NULL && !chand->started_resolving) {
chand->started_resolving = true;
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
index e9adf98711..fb119c7fc8 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
@@ -16,6 +16,12 @@ const pb_field_t grpc_lb_v1_Duration_fields[3] = {
PB_LAST_FIELD
};
+const pb_field_t grpc_lb_v1_Timestamp_fields[3] = {
+ PB_FIELD( 1, INT64 , OPTIONAL, STATIC , FIRST, grpc_lb_v1_Timestamp, seconds, seconds, 0),
+ PB_FIELD( 2, INT32 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Timestamp, nanos, seconds, 0),
+ PB_LAST_FIELD
+};
+
const pb_field_t grpc_lb_v1_LoadBalanceRequest_fields[3] = {
PB_FIELD( 1, MESSAGE , OPTIONAL, STATIC , FIRST, grpc_lb_v1_LoadBalanceRequest, initial_request, initial_request, &grpc_lb_v1_InitialLoadBalanceRequest_fields),
PB_FIELD( 2, MESSAGE , OPTIONAL, STATIC , OTHER, grpc_lb_v1_LoadBalanceRequest, client_stats, initial_request, &grpc_lb_v1_ClientStats_fields),
@@ -27,10 +33,14 @@ const pb_field_t grpc_lb_v1_InitialLoadBalanceRequest_fields[2] = {
PB_LAST_FIELD
};
-const pb_field_t grpc_lb_v1_ClientStats_fields[4] = {
- PB_FIELD( 1, INT64 , OPTIONAL, STATIC , FIRST, grpc_lb_v1_ClientStats, total_requests, total_requests, 0),
- PB_FIELD( 2, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, client_rpc_errors, total_requests, 0),
- PB_FIELD( 3, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, dropped_requests, client_rpc_errors, 0),
+const pb_field_t grpc_lb_v1_ClientStats_fields[8] = {
+ PB_FIELD( 1, MESSAGE , OPTIONAL, STATIC , FIRST, grpc_lb_v1_ClientStats, timestamp, timestamp, &grpc_lb_v1_Timestamp_fields),
+ PB_FIELD( 2, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_started, timestamp, 0),
+ PB_FIELD( 3, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished, num_calls_started, 0),
+ PB_FIELD( 4, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_drop_for_rate_limiting, num_calls_finished, 0),
+ PB_FIELD( 5, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_drop_for_load_balancing, num_calls_finished_with_drop_for_rate_limiting, 0),
+ PB_FIELD( 6, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_client_failed_to_send, num_calls_finished_with_drop_for_load_balancing, 0),
+ PB_FIELD( 7, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_known_received, num_calls_finished_with_client_failed_to_send, 0),
PB_LAST_FIELD
};
@@ -52,11 +62,12 @@ const pb_field_t grpc_lb_v1_ServerList_fields[3] = {
PB_LAST_FIELD
};
-const pb_field_t grpc_lb_v1_Server_fields[5] = {
+const pb_field_t grpc_lb_v1_Server_fields[6] = {
PB_FIELD( 1, BYTES , OPTIONAL, STATIC , FIRST, grpc_lb_v1_Server, ip_address, ip_address, 0),
PB_FIELD( 2, INT32 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, port, ip_address, 0),
PB_FIELD( 3, STRING , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, load_balance_token, port, 0),
- PB_FIELD( 4, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop_request, load_balance_token, 0),
+ PB_FIELD( 4, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop_for_rate_limiting, load_balance_token, 0),
+ PB_FIELD( 5, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop_for_load_balancing, drop_for_rate_limiting, 0),
PB_LAST_FIELD
};
@@ -70,7 +81,7 @@ const pb_field_t grpc_lb_v1_Server_fields[5] = {
* numbers or field sizes that are larger than what can fit in 8 or 16 bit
* field descriptors.
*/
-PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 65536 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 65536 && pb_membersize(grpc_lb_v1_ServerList, servers) < 65536 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 65536), YOU_MUST_DEFINE_PB_FIELD_32BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
+PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 65536 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 65536 && pb_membersize(grpc_lb_v1_ServerList, servers) < 65536 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 65536), YOU_MUST_DEFINE_PB_FIELD_32BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
#endif
#if !defined(PB_FIELD_16BIT) && !defined(PB_FIELD_32BIT)
@@ -81,7 +92,7 @@ PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request)
* numbers or field sizes that are larger than what can fit in the default
* 8 bit descriptors.
*/
-PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 256 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 256 && pb_membersize(grpc_lb_v1_ServerList, servers) < 256 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 256), YOU_MUST_DEFINE_PB_FIELD_16BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
+PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 256 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 256 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 256 && pb_membersize(grpc_lb_v1_ServerList, servers) < 256 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 256), YOU_MUST_DEFINE_PB_FIELD_16BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
#endif
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
index 725aa7e386..d3ae919ec2 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
@@ -14,16 +14,6 @@ extern "C" {
#endif
/* Struct definitions */
-typedef struct _grpc_lb_v1_ClientStats {
- bool has_total_requests;
- int64_t total_requests;
- bool has_client_rpc_errors;
- int64_t client_rpc_errors;
- bool has_dropped_requests;
- int64_t dropped_requests;
-/* @@protoc_insertion_point(struct:grpc_lb_v1_ClientStats) */
-} grpc_lb_v1_ClientStats;
-
typedef struct _grpc_lb_v1_Duration {
bool has_seconds;
int64_t seconds;
@@ -46,11 +36,39 @@ typedef struct _grpc_lb_v1_Server {
int32_t port;
bool has_load_balance_token;
char load_balance_token[50];
- bool has_drop_request;
- bool drop_request;
+ bool has_drop_for_rate_limiting;
+ bool drop_for_rate_limiting;
+ bool has_drop_for_load_balancing;
+ bool drop_for_load_balancing;
/* @@protoc_insertion_point(struct:grpc_lb_v1_Server) */
} grpc_lb_v1_Server;
+typedef struct _grpc_lb_v1_Timestamp {
+ bool has_seconds;
+ int64_t seconds;
+ bool has_nanos;
+ int32_t nanos;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_Timestamp) */
+} grpc_lb_v1_Timestamp;
+
+typedef struct _grpc_lb_v1_ClientStats {
+ bool has_timestamp;
+ grpc_lb_v1_Timestamp timestamp;
+ bool has_num_calls_started;
+ int64_t num_calls_started;
+ bool has_num_calls_finished;
+ int64_t num_calls_finished;
+ bool has_num_calls_finished_with_drop_for_rate_limiting;
+ int64_t num_calls_finished_with_drop_for_rate_limiting;
+ bool has_num_calls_finished_with_drop_for_load_balancing;
+ int64_t num_calls_finished_with_drop_for_load_balancing;
+ bool has_num_calls_finished_with_client_failed_to_send;
+ int64_t num_calls_finished_with_client_failed_to_send;
+ bool has_num_calls_finished_known_received;
+ int64_t num_calls_finished_known_received;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_ClientStats) */
+} grpc_lb_v1_ClientStats;
+
typedef struct _grpc_lb_v1_InitialLoadBalanceResponse {
bool has_load_balancer_delegate;
char load_balancer_delegate[64];
@@ -59,6 +77,13 @@ typedef struct _grpc_lb_v1_InitialLoadBalanceResponse {
/* @@protoc_insertion_point(struct:grpc_lb_v1_InitialLoadBalanceResponse) */
} grpc_lb_v1_InitialLoadBalanceResponse;
+typedef struct _grpc_lb_v1_ServerList {
+ pb_callback_t servers;
+ bool has_expiration_interval;
+ grpc_lb_v1_Duration expiration_interval;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_ServerList) */
+} grpc_lb_v1_ServerList;
+
typedef struct _grpc_lb_v1_LoadBalanceRequest {
bool has_initial_request;
grpc_lb_v1_InitialLoadBalanceRequest initial_request;
@@ -67,13 +92,6 @@ typedef struct _grpc_lb_v1_LoadBalanceRequest {
/* @@protoc_insertion_point(struct:grpc_lb_v1_LoadBalanceRequest) */
} grpc_lb_v1_LoadBalanceRequest;
-typedef struct _grpc_lb_v1_ServerList {
- pb_callback_t servers;
- bool has_expiration_interval;
- grpc_lb_v1_Duration expiration_interval;
-/* @@protoc_insertion_point(struct:grpc_lb_v1_ServerList) */
-} grpc_lb_v1_ServerList;
-
typedef struct _grpc_lb_v1_LoadBalanceResponse {
bool has_initial_response;
grpc_lb_v1_InitialLoadBalanceResponse initial_response;
@@ -86,61 +104,72 @@ typedef struct _grpc_lb_v1_LoadBalanceResponse {
/* Initializer values for message structs */
#define grpc_lb_v1_Duration_init_default {false, 0, false, 0}
+#define grpc_lb_v1_Timestamp_init_default {false, 0, false, 0}
#define grpc_lb_v1_LoadBalanceRequest_init_default {false, grpc_lb_v1_InitialLoadBalanceRequest_init_default, false, grpc_lb_v1_ClientStats_init_default}
#define grpc_lb_v1_InitialLoadBalanceRequest_init_default {false, ""}
-#define grpc_lb_v1_ClientStats_init_default {false, 0, false, 0, false, 0}
+#define grpc_lb_v1_ClientStats_init_default {false, grpc_lb_v1_Timestamp_init_default, false, 0, false, 0, false, 0, false, 0, false, 0, false, 0}
#define grpc_lb_v1_LoadBalanceResponse_init_default {false, grpc_lb_v1_InitialLoadBalanceResponse_init_default, false, grpc_lb_v1_ServerList_init_default}
#define grpc_lb_v1_InitialLoadBalanceResponse_init_default {false, "", false, grpc_lb_v1_Duration_init_default}
#define grpc_lb_v1_ServerList_init_default {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_default}
-#define grpc_lb_v1_Server_init_default {false, {0, {0}}, false, 0, false, "", false, 0}
+#define grpc_lb_v1_Server_init_default {false, {0, {0}}, false, 0, false, "", false, 0, false, 0}
#define grpc_lb_v1_Duration_init_zero {false, 0, false, 0}
+#define grpc_lb_v1_Timestamp_init_zero {false, 0, false, 0}
#define grpc_lb_v1_LoadBalanceRequest_init_zero {false, grpc_lb_v1_InitialLoadBalanceRequest_init_zero, false, grpc_lb_v1_ClientStats_init_zero}
#define grpc_lb_v1_InitialLoadBalanceRequest_init_zero {false, ""}
-#define grpc_lb_v1_ClientStats_init_zero {false, 0, false, 0, false, 0}
+#define grpc_lb_v1_ClientStats_init_zero {false, grpc_lb_v1_Timestamp_init_zero, false, 0, false, 0, false, 0, false, 0, false, 0, false, 0}
#define grpc_lb_v1_LoadBalanceResponse_init_zero {false, grpc_lb_v1_InitialLoadBalanceResponse_init_zero, false, grpc_lb_v1_ServerList_init_zero}
#define grpc_lb_v1_InitialLoadBalanceResponse_init_zero {false, "", false, grpc_lb_v1_Duration_init_zero}
#define grpc_lb_v1_ServerList_init_zero {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_zero}
-#define grpc_lb_v1_Server_init_zero {false, {0, {0}}, false, 0, false, "", false, 0}
+#define grpc_lb_v1_Server_init_zero {false, {0, {0}}, false, 0, false, "", false, 0, false, 0}
/* Field tags (for use in manual encoding/decoding) */
-#define grpc_lb_v1_ClientStats_total_requests_tag 1
-#define grpc_lb_v1_ClientStats_client_rpc_errors_tag 2
-#define grpc_lb_v1_ClientStats_dropped_requests_tag 3
#define grpc_lb_v1_Duration_seconds_tag 1
#define grpc_lb_v1_Duration_nanos_tag 2
#define grpc_lb_v1_InitialLoadBalanceRequest_name_tag 1
#define grpc_lb_v1_Server_ip_address_tag 1
#define grpc_lb_v1_Server_port_tag 2
#define grpc_lb_v1_Server_load_balance_token_tag 3
-#define grpc_lb_v1_Server_drop_request_tag 4
+#define grpc_lb_v1_Server_drop_for_rate_limiting_tag 4
+#define grpc_lb_v1_Server_drop_for_load_balancing_tag 5
+#define grpc_lb_v1_Timestamp_seconds_tag 1
+#define grpc_lb_v1_Timestamp_nanos_tag 2
+#define grpc_lb_v1_ClientStats_timestamp_tag 1
+#define grpc_lb_v1_ClientStats_num_calls_started_tag 2
+#define grpc_lb_v1_ClientStats_num_calls_finished_tag 3
+#define grpc_lb_v1_ClientStats_num_calls_finished_with_drop_for_rate_limiting_tag 4
+#define grpc_lb_v1_ClientStats_num_calls_finished_with_drop_for_load_balancing_tag 5
+#define grpc_lb_v1_ClientStats_num_calls_finished_with_client_failed_to_send_tag 6
+#define grpc_lb_v1_ClientStats_num_calls_finished_known_received_tag 7
#define grpc_lb_v1_InitialLoadBalanceResponse_load_balancer_delegate_tag 1
#define grpc_lb_v1_InitialLoadBalanceResponse_client_stats_report_interval_tag 2
-#define grpc_lb_v1_LoadBalanceRequest_initial_request_tag 1
-#define grpc_lb_v1_LoadBalanceRequest_client_stats_tag 2
#define grpc_lb_v1_ServerList_servers_tag 1
#define grpc_lb_v1_ServerList_expiration_interval_tag 3
+#define grpc_lb_v1_LoadBalanceRequest_initial_request_tag 1
+#define grpc_lb_v1_LoadBalanceRequest_client_stats_tag 2
#define grpc_lb_v1_LoadBalanceResponse_initial_response_tag 1
#define grpc_lb_v1_LoadBalanceResponse_server_list_tag 2
/* Struct field encoding specification for nanopb */
extern const pb_field_t grpc_lb_v1_Duration_fields[3];
+extern const pb_field_t grpc_lb_v1_Timestamp_fields[3];
extern const pb_field_t grpc_lb_v1_LoadBalanceRequest_fields[3];
extern const pb_field_t grpc_lb_v1_InitialLoadBalanceRequest_fields[2];
-extern const pb_field_t grpc_lb_v1_ClientStats_fields[4];
+extern const pb_field_t grpc_lb_v1_ClientStats_fields[8];
extern const pb_field_t grpc_lb_v1_LoadBalanceResponse_fields[3];
extern const pb_field_t grpc_lb_v1_InitialLoadBalanceResponse_fields[3];
extern const pb_field_t grpc_lb_v1_ServerList_fields[3];
-extern const pb_field_t grpc_lb_v1_Server_fields[5];
+extern const pb_field_t grpc_lb_v1_Server_fields[6];
/* Maximum encoded size of messages (where known) */
#define grpc_lb_v1_Duration_size 22
-#define grpc_lb_v1_LoadBalanceRequest_size 169
+#define grpc_lb_v1_Timestamp_size 22
+#define grpc_lb_v1_LoadBalanceRequest_size 226
#define grpc_lb_v1_InitialLoadBalanceRequest_size 131
-#define grpc_lb_v1_ClientStats_size 33
+#define grpc_lb_v1_ClientStats_size 90
#define grpc_lb_v1_LoadBalanceResponse_size (98 + grpc_lb_v1_ServerList_size)
#define grpc_lb_v1_InitialLoadBalanceResponse_size 90
/* grpc_lb_v1_ServerList_size depends on runtime parameters */
-#define grpc_lb_v1_Server_size 83
+#define grpc_lb_v1_Server_size 85
/* Message IDs (where set with "msgid" option) */
#ifdef PB_MSGID
diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.c b/src/core/ext/filters/client_channel/lb_policy_factory.c
index e2af216b89..89b8bf8951 100644
--- a/src/core/ext/filters/client_channel/lb_policy_factory.c
+++ b/src/core/ext/filters/client_channel/lb_policy_factory.c
@@ -36,16 +36,18 @@
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
+#include "src/core/lib/channel/channel_args.h"
+
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
+#include "src/core/ext/filters/client_channel/parse_address.h"
grpc_lb_addresses* grpc_lb_addresses_create(
size_t num_addresses, const grpc_lb_user_data_vtable* user_data_vtable) {
- grpc_lb_addresses* addresses = gpr_malloc(sizeof(grpc_lb_addresses));
+ grpc_lb_addresses* addresses = gpr_zalloc(sizeof(grpc_lb_addresses));
addresses->num_addresses = num_addresses;
addresses->user_data_vtable = user_data_vtable;
const size_t addresses_size = sizeof(grpc_lb_address) * num_addresses;
- addresses->addresses = gpr_malloc(addresses_size);
- memset(addresses->addresses, 0, addresses_size);
+ addresses->addresses = gpr_zalloc(addresses_size);
return addresses;
}
@@ -69,7 +71,7 @@ grpc_lb_addresses* grpc_lb_addresses_copy(const grpc_lb_addresses* addresses) {
void grpc_lb_addresses_set_address(grpc_lb_addresses* addresses, size_t index,
void* address, size_t address_len,
- bool is_balancer, char* balancer_name,
+ bool is_balancer, const char* balancer_name,
void* user_data) {
GPR_ASSERT(index < addresses->num_addresses);
if (user_data != NULL) GPR_ASSERT(addresses->user_data_vtable != NULL);
@@ -77,10 +79,22 @@ void grpc_lb_addresses_set_address(grpc_lb_addresses* addresses, size_t index,
memcpy(target->address.addr, address, address_len);
target->address.len = address_len;
target->is_balancer = is_balancer;
- target->balancer_name = balancer_name;
+ target->balancer_name = gpr_strdup(balancer_name);
target->user_data = user_data;
}
+bool grpc_lb_addresses_set_address_from_uri(grpc_lb_addresses* addresses,
+ size_t index, const grpc_uri* uri,
+ bool is_balancer,
+ const char* balancer_name,
+ void* user_data) {
+ grpc_resolved_address address;
+ if (!grpc_parse_uri(uri, &address)) return false;
+ grpc_lb_addresses_set_address(addresses, index, address.addr, address.len,
+ is_balancer, balancer_name, user_data);
+ return true;
+}
+
int grpc_lb_addresses_cmp(const grpc_lb_addresses* addresses1,
const grpc_lb_addresses* addresses2) {
if (addresses1->num_addresses > addresses2->num_addresses) return 1;
@@ -147,6 +161,15 @@ grpc_arg grpc_lb_addresses_create_channel_arg(
return arg;
}
+grpc_lb_addresses* grpc_lb_addresses_find_channel_arg(
+ const grpc_channel_args* channel_args) {
+ const grpc_arg* lb_addresses_arg =
+ grpc_channel_args_find(channel_args, GRPC_ARG_LB_ADDRESSES);
+ if (lb_addresses_arg == NULL || lb_addresses_arg->type != GRPC_ARG_POINTER)
+ return NULL;
+ return lb_addresses_arg->value.pointer.p;
+}
+
void grpc_lb_policy_factory_ref(grpc_lb_policy_factory* factory) {
factory->vtable->ref(factory);
}
diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.h b/src/core/ext/filters/client_channel/lb_policy_factory.h
index 81ab12ec8f..9d6c0fc139 100644
--- a/src/core/ext/filters/client_channel/lb_policy_factory.h
+++ b/src/core/ext/filters/client_channel/lb_policy_factory.h
@@ -34,12 +34,13 @@
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_FACTORY_H
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_FACTORY_H
-#include "src/core/ext/filters/client_channel/client_channel_factory.h"
-#include "src/core/ext/filters/client_channel/lb_policy.h"
-
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/resolve_address.h"
+#include "src/core/ext/filters/client_channel/client_channel_factory.h"
+#include "src/core/ext/filters/client_channel/lb_policy.h"
+#include "src/core/ext/filters/client_channel/uri_parser.h"
+
// Channel arg key for grpc_lb_addresses.
#define GRPC_ARG_LB_ADDRESSES "grpc.lb_addresses"
@@ -88,9 +89,18 @@ grpc_lb_addresses *grpc_lb_addresses_copy(const grpc_lb_addresses *addresses);
* Takes ownership of \a balancer_name. */
void grpc_lb_addresses_set_address(grpc_lb_addresses *addresses, size_t index,
void *address, size_t address_len,
- bool is_balancer, char *balancer_name,
+ bool is_balancer, const char *balancer_name,
void *user_data);
+/** Sets the value of the address at index \a index of \a addresses from \a uri.
+ * Returns true upon success, false otherwise. Takes ownership of \a
+ * balancer_name. */
+bool grpc_lb_addresses_set_address_from_uri(grpc_lb_addresses *addresses,
+ size_t index, const grpc_uri *uri,
+ bool is_balancer,
+ const char *balancer_name,
+ void *user_data);
+
/** Compares \a addresses1 and \a addresses2. */
int grpc_lb_addresses_cmp(const grpc_lb_addresses *addresses1,
const grpc_lb_addresses *addresses2);
@@ -103,6 +113,10 @@ void grpc_lb_addresses_destroy(grpc_exec_ctx *exec_ctx,
grpc_arg grpc_lb_addresses_create_channel_arg(
const grpc_lb_addresses *addresses);
+/** Returns the \a grpc_lb_addresses instance in \a channel_args or NULL */
+grpc_lb_addresses *grpc_lb_addresses_find_channel_arg(
+ const grpc_channel_args *channel_args);
+
/** Arguments passed to LB policies. */
typedef struct grpc_lb_policy_args {
grpc_client_channel_factory *client_channel_factory;
diff --git a/src/core/ext/filters/client_channel/parse_address.c b/src/core/ext/filters/client_channel/parse_address.c
index 0c97062075..edc6ce697d 100644
--- a/src/core/ext/filters/client_channel/parse_address.c
+++ b/src/core/ext/filters/client_channel/parse_address.c
@@ -48,7 +48,12 @@
#ifdef GRPC_HAVE_UNIX_SOCKET
-int parse_unix(grpc_uri *uri, grpc_resolved_address *resolved_addr) {
+bool grpc_parse_unix(const grpc_uri *uri,
+ grpc_resolved_address *resolved_addr) {
+ if (strcmp("unix", uri->scheme) != 0) {
+ gpr_log(GPR_ERROR, "Expected 'unix' scheme, got '%s'", uri->scheme);
+ return false;
+ }
struct sockaddr_un *un = (struct sockaddr_un *)resolved_addr->addr;
const size_t maxlen = sizeof(un->sun_path);
const size_t path_len = strnlen(uri->path, maxlen);
@@ -61,21 +66,29 @@ int parse_unix(grpc_uri *uri, grpc_resolved_address *resolved_addr) {
#else /* GRPC_HAVE_UNIX_SOCKET */
-int parse_unix(grpc_uri *uri, grpc_resolved_address *resolved_addr) { abort(); }
+bool grpc_parse_unix(const grpc_uri *uri,
+ grpc_resolved_address *resolved_addr) {
+ abort();
+}
#endif /* GRPC_HAVE_UNIX_SOCKET */
-int parse_ipv4(grpc_uri *uri, grpc_resolved_address *resolved_addr) {
+bool grpc_parse_ipv4(const grpc_uri *uri,
+ grpc_resolved_address *resolved_addr) {
+ if (strcmp("ipv4", uri->scheme) != 0) {
+ gpr_log(GPR_ERROR, "Expected 'ipv4' scheme, got '%s'", uri->scheme);
+ return false;
+ }
const char *host_port = uri->path;
char *host;
char *port;
int port_num;
- int result = 0;
+ bool result = false;
struct sockaddr_in *in = (struct sockaddr_in *)resolved_addr->addr;
if (*host_port == '/') ++host_port;
if (!gpr_split_host_port(host_port, &host, &port)) {
- return 0;
+ return false;
}
memset(resolved_addr, 0, sizeof(grpc_resolved_address));
@@ -98,14 +111,19 @@ int parse_ipv4(grpc_uri *uri, grpc_resolved_address *resolved_addr) {
goto done;
}
- result = 1;
+ result = true;
done:
gpr_free(host);
gpr_free(port);
return result;
}
-int parse_ipv6(grpc_uri *uri, grpc_resolved_address *resolved_addr) {
+bool grpc_parse_ipv6(const grpc_uri *uri,
+ grpc_resolved_address *resolved_addr) {
+ if (strcmp("ipv6", uri->scheme) != 0) {
+ gpr_log(GPR_ERROR, "Expected 'ipv6' scheme, got '%s'", uri->scheme);
+ return false;
+ }
const char *host_port = uri->path;
char *host;
char *port;
@@ -168,3 +186,15 @@ done:
gpr_free(port);
return result;
}
+
+bool grpc_parse_uri(const grpc_uri *uri, grpc_resolved_address *resolved_addr) {
+ if (strcmp("unix", uri->scheme) == 0) {
+ return grpc_parse_unix(uri, resolved_addr);
+ } else if (strcmp("ipv4", uri->scheme) == 0) {
+ return grpc_parse_ipv4(uri, resolved_addr);
+ } else if (strcmp("ipv6", uri->scheme) == 0) {
+ return grpc_parse_ipv6(uri, resolved_addr);
+ }
+ gpr_log(GPR_ERROR, "Can't parse scheme '%s'", uri->scheme);
+ return false;
+}
diff --git a/src/core/ext/filters/client_channel/parse_address.h b/src/core/ext/filters/client_channel/parse_address.h
index c8d77baa00..fa7ea33a00 100644
--- a/src/core/ext/filters/client_channel/parse_address.h
+++ b/src/core/ext/filters/client_channel/parse_address.h
@@ -39,16 +39,19 @@
#include "src/core/ext/filters/client_channel/uri_parser.h"
#include "src/core/lib/iomgr/resolve_address.h"
-/** Populate \a addr and \a len from \a uri, whose path is expected to contain a
+/** Populate \a resolved_addr from \a uri, whose path is expected to contain a
* unix socket path. Returns true upon success. */
-int parse_unix(grpc_uri *uri, grpc_resolved_address *resolved_addr);
+bool grpc_parse_unix(const grpc_uri *uri, grpc_resolved_address *resolved_addr);
-/** Populate /a addr and \a len from \a uri, whose path is expected to contain a
- * host:port pair. Returns true upon success. */
-int parse_ipv4(grpc_uri *uri, grpc_resolved_address *resolved_addr);
+/** Populate \a resolved_addr from \a uri, whose path is expected to contain an
+ * IPv4 host:port pair. Returns true upon success. */
+bool grpc_parse_ipv4(const grpc_uri *uri, grpc_resolved_address *resolved_addr);
-/** Populate /a addr and \a len from \a uri, whose path is expected to contain a
- * host:port pair. Returns true upon success. */
-int parse_ipv6(grpc_uri *uri, grpc_resolved_address *resolved_addr);
+/** Populate \a resolved_addr from \a uri, whose path is expected to contain an
+ * IPv6 host:port pair. Returns true upon success. */
+bool grpc_parse_ipv6(const grpc_uri *uri, grpc_resolved_address *resolved_addr);
+
+/** Populate \a resolved_addr from \a uri. Returns true upon success. */
+bool grpc_parse_uri(const grpc_uri *uri, grpc_resolved_address *resolved_addr);
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_PARSE_ADDRESS_H */
diff --git a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c
index 54f020d691..4d7d878c23 100644
--- a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c
+++ b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c
@@ -157,8 +157,8 @@ static void do_nothing(void *ignored) {}
static grpc_resolver *sockaddr_create(grpc_exec_ctx *exec_ctx,
grpc_resolver_args *args,
- int parse(grpc_uri *uri,
- grpc_resolved_address *dst)) {
+ bool parse(const grpc_uri *uri,
+ grpc_resolved_address *dst)) {
if (0 != strcmp(args->uri->authority, "")) {
gpr_log(GPR_ERROR, "authority based uri's not supported by the %s scheme",
args->uri->scheme);
@@ -209,7 +209,7 @@ static void sockaddr_factory_unref(grpc_resolver_factory *factory) {}
static grpc_resolver *name##_factory_create_resolver( \
grpc_exec_ctx *exec_ctx, grpc_resolver_factory *factory, \
grpc_resolver_args *args) { \
- return sockaddr_create(exec_ctx, args, parse_##name); \
+ return sockaddr_create(exec_ctx, args, grpc_parse_##name); \
} \
static const grpc_resolver_factory_vtable name##_factory_vtable = { \
sockaddr_factory_ref, sockaddr_factory_unref, \
diff --git a/src/core/ext/filters/client_channel/subchannel.c b/src/core/ext/filters/client_channel/subchannel.c
index 9a7a7a0ee5..362b4a4a28 100644
--- a/src/core/ext/filters/client_channel/subchannel.c
+++ b/src/core/ext/filters/client_channel/subchannel.c
@@ -797,13 +797,7 @@ static void grpc_uri_to_sockaddr(grpc_exec_ctx *exec_ctx, const char *uri_str,
grpc_resolved_address *addr) {
grpc_uri *uri = grpc_uri_parse(exec_ctx, uri_str, 0 /* suppress_errors */);
GPR_ASSERT(uri != NULL);
- if (strcmp(uri->scheme, "ipv4") == 0) {
- GPR_ASSERT(parse_ipv4(uri, addr));
- } else if (strcmp(uri->scheme, "ipv6") == 0) {
- GPR_ASSERT(parse_ipv6(uri, addr));
- } else {
- GPR_ASSERT(parse_unix(uri, addr));
- }
+ if (!grpc_parse_uri(uri, addr)) memset(addr, 0, sizeof(*addr));
grpc_uri_destroy(uri);
}
diff --git a/src/core/ext/filters/client_channel/uri_parser.c b/src/core/ext/filters/client_channel/uri_parser.c
index 01b99911aa..b233d835cb 100644
--- a/src/core/ext/filters/client_channel/uri_parser.c
+++ b/src/core/ext/filters/client_channel/uri_parser.c
@@ -50,7 +50,7 @@
#define NOT_SET (~(size_t)0)
static grpc_uri *bad_uri(const char *uri_text, size_t pos, const char *section,
- int suppress_errors) {
+ bool suppress_errors) {
char *line_prefix;
size_t pfx_len;
@@ -83,6 +83,11 @@ static char *decode_and_copy_component(grpc_exec_ctx *exec_ctx, const char *src,
return out;
}
+static bool valid_hex(char c) {
+ return ((c >= 'a') && (c <= 'f')) || ((c >= 'A') && (c <= 'F')) ||
+ ((c >= '0') && (c <= '9'));
+}
+
/** Returns how many chars to advance if \a uri_text[i] begins a valid \a pchar
* production. If \a uri_text[i] introduces an invalid \a pchar (such as percent
* sign not followed by two hex digits), NOT_SET is returned. */
@@ -93,27 +98,36 @@ static size_t parse_pchar(const char *uri_text, size_t i) {
* sub-delims = "!" / "$" / "&" / "'" / "(" / ")"
/ "*" / "+" / "," / ";" / "=" */
char c = uri_text[i];
- if (((c >= 'A') && (c <= 'Z')) || ((c >= 'a') && (c <= 'z')) ||
- ((c >= '0') && (c <= '9')) ||
- (c == '-' || c == '.' || c == '_' || c == '~') || /* unreserved */
- (c == '!' || c == '$' || c == '&' || c == '\'' || c == '$' || c == '&' ||
- c == '(' || c == ')' || c == '*' || c == '+' || c == ',' || c == ';' ||
- c == '=') /* sub-delims */) {
- return 1;
- }
- if (c == '%') { /* pct-encoded */
- size_t j;
- if (uri_text[i + 1] == 0 || uri_text[i + 2] == 0) {
- return NOT_SET;
- }
- for (j = i + 1; j < 2; j++) {
- c = uri_text[j];
- if (!(((c >= '0') && (c <= '9')) || ((c >= 'a') && (c <= 'f')) ||
- ((c >= 'A') && (c <= 'F')))) {
- return NOT_SET;
+ switch (c) {
+ default:
+ if (((c >= 'a') && (c <= 'z')) || ((c >= 'A') && (c <= 'Z')) ||
+ ((c >= '0') && (c <= '9'))) {
+ return 1;
}
- }
- return 2;
+ break;
+ case ':':
+ case '@':
+ case '-':
+ case '.':
+ case '_':
+ case '~':
+ case '!':
+ case '$':
+ case '&':
+ case '\'':
+ case '(':
+ case ')':
+ case '*':
+ case '+':
+ case ',':
+ case ';':
+ case '=':
+ return 1;
+ case '%': /* pct-encoded */
+ if (valid_hex(uri_text[i + 1]) && valid_hex(uri_text[i + 2])) {
+ return 2;
+ }
+ return NOT_SET;
}
return 0;
}
@@ -183,7 +197,7 @@ static void parse_query_parts(grpc_uri *uri) {
}
grpc_uri *grpc_uri_parse(grpc_exec_ctx *exec_ctx, const char *uri_text,
- int suppress_errors) {
+ bool suppress_errors) {
grpc_uri *uri;
size_t scheme_begin = 0;
size_t scheme_end = NOT_SET;
diff --git a/src/core/ext/filters/client_channel/uri_parser.h b/src/core/ext/filters/client_channel/uri_parser.h
index 2698d448d8..b889040b16 100644
--- a/src/core/ext/filters/client_channel/uri_parser.h
+++ b/src/core/ext/filters/client_channel/uri_parser.h
@@ -53,7 +53,7 @@ typedef struct {
/** parse a uri, return NULL on failure */
grpc_uri *grpc_uri_parse(grpc_exec_ctx *exec_ctx, const char *uri_text,
- int suppress_errors);
+ bool suppress_errors);
/** return the part of a query string after the '=' in "?key=xxx&...", or NULL
* if key is not present */
diff --git a/src/core/ext/filters/http/client/http_client_filter.c b/src/core/ext/filters/http/client/http_client_filter.c
index 8aabbe6ae9..d8ae080eee 100644
--- a/src/core/ext/filters/http/client/http_client_filter.c
+++ b/src/core/ext/filters/http/client/http_client_filter.c
@@ -220,8 +220,11 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx,
call_data *calld = elem->call_data;
uint8_t *wrptr = calld->payload_bytes;
while (grpc_byte_stream_next(
- exec_ctx, calld->send_op->payload->send_message.send_message,
- &calld->incoming_slice, ~(size_t)0, &calld->got_slice)) {
+ exec_ctx, calld->send_op->payload->send_message.send_message, ~(size_t)0,
+ &calld->got_slice)) {
+ grpc_byte_stream_pull(exec_ctx,
+ calld->send_op->payload->send_message.send_message,
+ &calld->incoming_slice);
if (GRPC_SLICE_LENGTH(calld->incoming_slice) > 0) {
memcpy(wrptr, GRPC_SLICE_START_PTR(calld->incoming_slice),
GRPC_SLICE_LENGTH(calld->incoming_slice));
@@ -239,6 +242,13 @@ static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
grpc_call_element *elem = elemp;
call_data *calld = elem->call_data;
calld->send_message_blocked = false;
+ if (GRPC_ERROR_NONE !=
+ grpc_byte_stream_pull(exec_ctx,
+ calld->send_op->payload->send_message.send_message,
+ &calld->incoming_slice)) {
+ /* Should never reach here */
+ abort();
+ }
grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
if (calld->send_length == calld->slices.length) {
/* Pass down the original send_message op that was blocked.*/
@@ -313,7 +323,6 @@ static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx,
estimated_len += grpc_base64_estimate_encoded_size(
op->payload->send_message.send_message->length, k_url_safe,
k_multi_line);
- estimated_len += 1; /* for the trailing 0 */
grpc_slice path_with_query_slice = GRPC_SLICE_MALLOC(estimated_len);
/* memcopy individual pieces into this slice */
@@ -335,7 +344,7 @@ static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx,
char *t = (char *)GRPC_SLICE_START_PTR(path_with_query_slice);
/* safe to use strlen since base64_encode will always add '\0' */
path_with_query_slice =
- grpc_slice_sub(path_with_query_slice, 0, strlen(t));
+ grpc_slice_sub_no_ref(path_with_query_slice, 0, strlen(t));
/* substitute previous path with the new path+query */
grpc_mdelem mdelem_path_and_query = grpc_mdelem_from_slices(
@@ -349,7 +358,6 @@ static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx,
calld->on_complete = op->on_complete;
op->on_complete = &calld->hc_on_complete;
op->send_message = false;
- grpc_slice_unref_internal(exec_ctx, path_with_query_slice);
} else {
/* Not all data is available. Fall back to POST. */
gpr_log(GPR_DEBUG,
diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.c b/src/core/ext/filters/http/message_compress/message_compress_filter.c
index 2b2f3549bd..f414a60eee 100644
--- a/src/core/ext/filters/http/message_compress/message_compress_filter.c
+++ b/src/core/ext/filters/http/message_compress/message_compress_filter.c
@@ -239,6 +239,13 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
grpc_call_element *elem = elemp;
call_data *calld = elem->call_data;
+ if (GRPC_ERROR_NONE !=
+ grpc_byte_stream_pull(exec_ctx,
+ calld->send_op->payload->send_message.send_message,
+ &calld->incoming_slice)) {
+ /* Should never reach here */
+ abort();
+ }
grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
if (calld->send_length == calld->slices.length) {
finish_send_message(exec_ctx, elem);
@@ -251,8 +258,11 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
call_data *calld = elem->call_data;
while (grpc_byte_stream_next(
- exec_ctx, calld->send_op->payload->send_message.send_message,
- &calld->incoming_slice, ~(size_t)0, &calld->got_slice)) {
+ exec_ctx, calld->send_op->payload->send_message.send_message, ~(size_t)0,
+ &calld->got_slice)) {
+ grpc_byte_stream_pull(exec_ctx,
+ calld->send_op->payload->send_message.send_message,
+ &calld->incoming_slice);
grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
if (calld->send_length == calld->slices.length) {
finish_send_message(exec_ctx, elem);
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index bd87c75e71..45176a786d 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -44,6 +44,7 @@
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
+#include "src/core/ext/transport/chttp2/transport/frame_data.h"
#include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/ext/transport/chttp2/transport/varint.h"
#include "src/core/lib/channel/channel_args.h"
@@ -129,6 +130,11 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx,
static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
void *byte_stream,
grpc_error *error_ignored);
+static void incoming_byte_stream_publish_error(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
+ grpc_error *error);
+static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_incoming_byte_stream *bs);
static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error);
@@ -174,6 +180,9 @@ static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
+static void reset_byte_stream(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error);
+
/*******************************************************************************
* CONSTRUCTION/DESTRUCTION/REFCOUNTING
*/
@@ -656,7 +665,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
/* We reserve one 'active stream' that's dropped when the stream is
read-closed. The others are for incoming_byte_streams that are actively
reading */
- gpr_ref_init(&s->active_streams, 1);
GRPC_CHTTP2_STREAM_REF(s, "chttp2");
grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0], arena);
@@ -666,6 +674,11 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
grpc_closure_init(&s->complete_fetch_locked, complete_fetch_locked, s,
grpc_schedule_on_exec_ctx);
+ grpc_slice_buffer_init(&s->unprocessed_incoming_frames_buffer);
+ grpc_slice_buffer_init(&s->frame_storage);
+ s->pending_byte_stream = false;
+ grpc_closure_init(&s->reset_byte_stream, reset_byte_stream, s,
+ grpc_combiner_scheduler(t->combiner, false));
GRPC_CHTTP2_REF_TRANSPORT(t, "stream");
@@ -683,7 +696,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
grpc_error *error) {
- grpc_byte_stream *bs;
grpc_chttp2_stream *s = sp;
grpc_chttp2_transport *t = s->t;
@@ -694,9 +706,9 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, s->id) == NULL);
}
- while ((bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames))) {
- incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
- }
+ grpc_slice_buffer_destroy_internal(exec_ctx,
+ &s->unprocessed_incoming_frames_buffer);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &s->frame_storage);
grpc_chttp2_list_remove_stalled_by_transport(t, s);
grpc_chttp2_list_remove_stalled_by_stream(t, s);
@@ -723,6 +735,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
grpc_slice_buffer_destroy_internal(exec_ctx, &s->flow_controlled_buffer);
GRPC_ERROR_UNREF(s->read_closed_error);
GRPC_ERROR_UNREF(s->write_closed_error);
+ GRPC_ERROR_UNREF(s->byte_stream_error);
if (s->incoming_window_delta > 0) {
GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA(
@@ -1176,8 +1189,9 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx,
s->fetching_send_message = NULL;
return; /* early out */
} else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message,
- &s->fetching_slice, UINT32_MAX,
- &s->complete_fetch_locked)) {
+ UINT32_MAX, &s->complete_fetch_locked)) {
+ grpc_byte_stream_pull(exec_ctx, s->fetching_send_message,
+ &s->fetching_slice);
add_fetched_slice_locked(exec_ctx, t, s);
}
}
@@ -1188,9 +1202,15 @@ static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs,
grpc_chttp2_stream *s = gs;
grpc_chttp2_transport *t = s->t;
if (error == GRPC_ERROR_NONE) {
- add_fetched_slice_locked(exec_ctx, t, s);
- continue_fetching_send_locked(exec_ctx, t, s);
- } else {
+ error = grpc_byte_stream_pull(exec_ctx, s->fetching_send_message,
+ &s->fetching_slice);
+ if (error == GRPC_ERROR_NONE) {
+ add_fetched_slice_locked(exec_ctx, t, s);
+ continue_fetching_send_locked(exec_ctx, t, s);
+ }
+ }
+
+ if (error != GRPC_ERROR_NONE) {
/* TODO(ctiller): what to do here */
abort();
}
@@ -1422,12 +1442,20 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
if (op->recv_message) {
+ size_t already_received;
GPR_ASSERT(s->recv_message_ready == NULL);
+ GPR_ASSERT(!s->pending_byte_stream);
s->recv_message_ready = op_payload->recv_message.recv_message_ready;
s->recv_message = op_payload->recv_message.recv_message;
- if (s->id != 0 &&
- (s->incoming_frames.head == NULL || s->incoming_frames.head->is_tail)) {
- incoming_byte_stream_update_flow_control(exec_ctx, t, s, 5, 0);
+ if (s->id != 0) {
+ if (s->pending_byte_stream) {
+ already_received = s->frame_storage.length;
+ } else {
+ already_received = s->frame_storage.length +
+ s->unprocessed_incoming_frames_buffer.length;
+ }
+ incoming_byte_stream_update_flow_control(exec_ctx, t, s, 5,
+ already_received);
}
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
}
@@ -1615,13 +1643,13 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
- grpc_byte_stream *bs;
if (s->recv_initial_metadata_ready != NULL &&
s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) {
if (s->seen_error) {
- while ((bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) !=
- NULL) {
- incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage);
+ if (!s->pending_byte_stream) {
+ grpc_slice_buffer_reset_and_unref_internal(
+ exec_ctx, &s->unprocessed_incoming_frames_buffer);
}
}
grpc_chttp2_incoming_metadata_buffer_publish(
@@ -1634,39 +1662,65 @@ void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx *exec_ctx,
void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
- grpc_byte_stream *bs;
+ grpc_error *error = GRPC_ERROR_NONE;
if (s->recv_message_ready != NULL) {
- while (s->final_metadata_requested && s->seen_error &&
- (bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) !=
- NULL) {
- incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
+ *s->recv_message = NULL;
+ if (s->final_metadata_requested && s->seen_error) {
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage);
+ if (!s->pending_byte_stream) {
+ grpc_slice_buffer_reset_and_unref_internal(
+ exec_ctx, &s->unprocessed_incoming_frames_buffer);
+ }
}
- if (s->incoming_frames.head != NULL) {
- *s->recv_message =
- grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames);
- GPR_ASSERT(*s->recv_message != NULL);
+ if (!s->pending_byte_stream) {
+ while (s->unprocessed_incoming_frames_buffer.length > 0 ||
+ s->frame_storage.length > 0) {
+ if (s->unprocessed_incoming_frames_buffer.length == 0) {
+ grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer,
+ &s->frame_storage);
+ }
+ error = deframe_unprocessed_incoming_frames(
+ exec_ctx, &s->data_parser, s,
+ &s->unprocessed_incoming_frames_buffer, NULL, s->recv_message);
+ if (error != GRPC_ERROR_NONE) {
+ s->seen_error = true;
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
+ &s->frame_storage);
+ grpc_slice_buffer_reset_and_unref_internal(
+ exec_ctx, &s->unprocessed_incoming_frames_buffer);
+ break;
+ } else if (*s->recv_message != NULL) {
+ break;
+ }
+ }
+ }
+ if (error == GRPC_ERROR_NONE && *s->recv_message != NULL) {
null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE);
} else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) {
*s->recv_message = NULL;
null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE);
}
+ GRPC_ERROR_UNREF(error);
}
}
void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
- grpc_byte_stream *bs;
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
if (s->recv_trailing_metadata_finished != NULL && s->read_closed &&
s->write_closed) {
if (s->seen_error) {
- while ((bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) !=
- NULL) {
- incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage);
+ if (!s->pending_byte_stream) {
+ grpc_slice_buffer_reset_and_unref_internal(
+ exec_ctx, &s->unprocessed_incoming_frames_buffer);
}
}
- if (s->all_incoming_byte_streams_finished &&
+ bool pending_data = s->pending_byte_stream ||
+ s->unprocessed_incoming_frames_buffer.length > 0;
+ if (s->read_closed && s->frame_storage.length == 0 &&
+ (!pending_data || s->seen_error) &&
s->recv_trailing_metadata_finished != NULL) {
grpc_chttp2_incoming_metadata_buffer_publish(
exec_ctx, &s->metadata_buffer[1], s->recv_trailing_metadata);
@@ -1677,14 +1731,6 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
}
}
-static void decrement_active_streams_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s) {
- if ((s->all_incoming_byte_streams_finished = gpr_unref(&s->active_streams))) {
- grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
- }
-}
-
static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
uint32_t id, grpc_error *error) {
grpc_chttp2_stream *s = grpc_chttp2_stream_map_delete(&t->stream_map, id);
@@ -1693,10 +1739,19 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->incoming_stream = NULL;
grpc_chttp2_parsing_become_skip_parser(exec_ctx, t);
}
- if (s->data_parser.parsing_frame != NULL) {
- grpc_chttp2_incoming_byte_stream_finished(
- exec_ctx, s->data_parser.parsing_frame, GRPC_ERROR_REF(error));
- s->data_parser.parsing_frame = NULL;
+ if (s->pending_byte_stream) {
+ if (s->on_next != NULL) {
+ grpc_chttp2_incoming_byte_stream *bs = s->data_parser.parsing_frame;
+ if (error == GRPC_ERROR_NONE) {
+ error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
+ }
+ incoming_byte_stream_publish_error(exec_ctx, bs, error);
+ incoming_byte_stream_unref(exec_ctx, bs);
+ s->data_parser.parsing_frame = NULL;
+ } else {
+ GRPC_ERROR_UNREF(s->byte_stream_error);
+ s->byte_stream_error = GRPC_ERROR_REF(error);
+ }
}
if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
@@ -1882,7 +1937,6 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx,
s->published_metadata[i] = GPRC_METADATA_PUBLISHED_AT_CLOSE;
}
}
- decrement_active_streams_locked(exec_ctx, t, s);
grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s);
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
}
@@ -1933,7 +1987,7 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
GPR_ASSERT(p == GRPC_SLICE_END_PTR(http_status_hdr));
len += (uint32_t)GRPC_SLICE_LENGTH(http_status_hdr);
- content_type_hdr = grpc_slice_malloc(31);
+ content_type_hdr = GRPC_SLICE_MALLOC(31);
p = GRPC_SLICE_START_PTR(content_type_hdr);
*p++ = 0x00;
*p++ = 12;
@@ -2422,12 +2476,28 @@ static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
* BYTE STREAM
*/
+static void reset_byte_stream(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_chttp2_stream *s = (grpc_chttp2_stream *)arg;
+
+ s->pending_byte_stream = false;
+ if (error == GRPC_ERROR_NONE) {
+ grpc_chttp2_maybe_complete_recv_message(exec_ctx, s->t, s);
+ grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, s->t, s);
+ } else {
+ GPR_ASSERT(error != GRPC_ERROR_NONE);
+ grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_REF(error));
+ s->on_next = NULL;
+ GRPC_ERROR_UNREF(s->byte_stream_error);
+ s->byte_stream_error = GRPC_ERROR_NONE;
+ grpc_chttp2_cancel_stream(exec_ctx, s->t, s, GRPC_ERROR_REF(error));
+ s->byte_stream_error = error;
+ }
+}
+
static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
grpc_chttp2_incoming_byte_stream *bs) {
if (gpr_unref(&bs->refs)) {
- GRPC_ERROR_UNREF(bs->error);
- grpc_slice_buffer_destroy_internal(exec_ctx, &bs->slices);
- gpr_mu_destroy(&bs->slice_mu);
gpr_free(bs);
}
}
@@ -2487,47 +2557,90 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t = bs->transport;
grpc_chttp2_stream *s = bs->stream;
- if (bs->is_tail) {
- gpr_mu_lock(&bs->slice_mu);
- size_t cur_length = bs->slices.length;
- gpr_mu_unlock(&bs->slice_mu);
- incoming_byte_stream_update_flow_control(
- exec_ctx, t, s, bs->next_action.max_size_hint, cur_length);
- }
- gpr_mu_lock(&bs->slice_mu);
- if (bs->slices.count > 0) {
- *bs->next_action.slice = grpc_slice_buffer_take_first(&bs->slices);
- grpc_closure_run(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE);
- } else if (bs->error != GRPC_ERROR_NONE) {
- grpc_closure_run(exec_ctx, bs->next_action.on_complete,
- GRPC_ERROR_REF(bs->error));
+ size_t cur_length = s->frame_storage.length;
+ incoming_byte_stream_update_flow_control(
+ exec_ctx, t, s, bs->next_action.max_size_hint, cur_length);
+
+ GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
+ if (s->frame_storage.length > 0) {
+ grpc_slice_buffer_swap(&s->frame_storage,
+ &s->unprocessed_incoming_frames_buffer);
+ grpc_closure_sched(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE);
+ } else if (s->byte_stream_error != GRPC_ERROR_NONE) {
+ grpc_closure_sched(exec_ctx, bs->next_action.on_complete,
+ GRPC_ERROR_REF(s->byte_stream_error));
+ if (s->data_parser.parsing_frame != NULL) {
+ incoming_byte_stream_unref(exec_ctx, s->data_parser.parsing_frame);
+ s->data_parser.parsing_frame = NULL;
+ }
+ } else if (s->read_closed) {
+ if (bs->remaining_bytes != 0) {
+ s->byte_stream_error =
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
+ grpc_closure_sched(exec_ctx, bs->next_action.on_complete,
+ GRPC_ERROR_REF(s->byte_stream_error));
+ if (s->data_parser.parsing_frame != NULL) {
+ incoming_byte_stream_unref(exec_ctx, s->data_parser.parsing_frame);
+ s->data_parser.parsing_frame = NULL;
+ }
+ } else {
+ /* Should never reach here. */
+ GPR_ASSERT(false);
+ }
} else {
- bs->on_next = bs->next_action.on_complete;
- bs->next = bs->next_action.slice;
+ s->on_next = bs->next_action.on_complete;
}
- gpr_mu_unlock(&bs->slice_mu);
incoming_byte_stream_unref(exec_ctx, bs);
}
-static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *byte_stream,
- grpc_slice *slice, size_t max_size_hint,
- grpc_closure *on_complete) {
+static bool incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream,
+ size_t max_size_hint,
+ grpc_closure *on_complete) {
GPR_TIMER_BEGIN("incoming_byte_stream_next", 0);
grpc_chttp2_incoming_byte_stream *bs =
(grpc_chttp2_incoming_byte_stream *)byte_stream;
- gpr_ref(&bs->refs);
- bs->next_action.slice = slice;
- bs->next_action.max_size_hint = max_size_hint;
- bs->next_action.on_complete = on_complete;
- grpc_closure_sched(
- exec_ctx,
- grpc_closure_init(
- &bs->next_action.closure, incoming_byte_stream_next_locked, bs,
- grpc_combiner_scheduler(bs->transport->combiner, false)),
- GRPC_ERROR_NONE);
- GPR_TIMER_END("incoming_byte_stream_next", 0);
- return 0;
+ grpc_chttp2_stream *s = bs->stream;
+ if (s->unprocessed_incoming_frames_buffer.length > 0) {
+ return true;
+ } else {
+ gpr_ref(&bs->refs);
+ bs->next_action.max_size_hint = max_size_hint;
+ bs->next_action.on_complete = on_complete;
+ grpc_closure_sched(
+ exec_ctx,
+ grpc_closure_init(
+ &bs->next_action.closure, incoming_byte_stream_next_locked, bs,
+ grpc_combiner_scheduler(bs->transport->combiner, false)),
+ GRPC_ERROR_NONE);
+ GPR_TIMER_END("incoming_byte_stream_next", 0);
+ return false;
+ }
+}
+
+static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream,
+ grpc_slice *slice) {
+ GPR_TIMER_BEGIN("incoming_byte_stream_pull", 0);
+ grpc_chttp2_incoming_byte_stream *bs =
+ (grpc_chttp2_incoming_byte_stream *)byte_stream;
+ grpc_chttp2_stream *s = bs->stream;
+
+ if (s->unprocessed_incoming_frames_buffer.length > 0) {
+ grpc_error *error = deframe_unprocessed_incoming_frames(
+ exec_ctx, &s->data_parser, s, &s->unprocessed_incoming_frames_buffer,
+ slice, NULL);
+ if (error != GRPC_ERROR_NONE) {
+ return error;
+ }
+ } else {
+ grpc_error *error =
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
+ grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error));
+ return error;
+ }
+ GPR_TIMER_END("incoming_byte_stream_pull", 0);
+ return GRPC_ERROR_NONE;
}
static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
@@ -2537,9 +2650,14 @@ static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
void *byte_stream,
grpc_error *error_ignored) {
grpc_chttp2_incoming_byte_stream *bs = byte_stream;
+ grpc_chttp2_stream *s = bs->stream;
+ grpc_chttp2_transport *t = s->t;
+
GPR_ASSERT(bs->base.destroy == incoming_byte_stream_destroy);
- decrement_active_streams_locked(exec_ctx, bs->transport, bs->stream);
incoming_byte_stream_unref(exec_ctx, bs);
+ s->pending_byte_stream = false;
+ grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
+ grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
}
static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
@@ -2559,50 +2677,53 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
static void incoming_byte_stream_publish_error(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
grpc_error *error) {
+ grpc_chttp2_stream *s = bs->stream;
+
GPR_ASSERT(error != GRPC_ERROR_NONE);
- grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error));
- bs->on_next = NULL;
- GRPC_ERROR_UNREF(bs->error);
+ grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_REF(error));
+ s->on_next = NULL;
+ GRPC_ERROR_UNREF(s->byte_stream_error);
+ s->byte_stream_error = GRPC_ERROR_REF(error);
grpc_chttp2_cancel_stream(exec_ctx, bs->transport, bs->stream,
GRPC_ERROR_REF(error));
- bs->error = error;
}
-void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_incoming_byte_stream *bs,
- grpc_slice slice) {
- gpr_mu_lock(&bs->slice_mu);
+grpc_error *grpc_chttp2_incoming_byte_stream_push(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
+ grpc_slice slice, grpc_slice *slice_out) {
+ grpc_chttp2_stream *s = bs->stream;
+
if (bs->remaining_bytes < GRPC_SLICE_LENGTH(slice)) {
- incoming_byte_stream_publish_error(
- exec_ctx, bs,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream"));
+ grpc_error *error =
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream");
+
+ grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error));
+ grpc_slice_unref_internal(exec_ctx, slice);
+ return error;
} else {
bs->remaining_bytes -= (uint32_t)GRPC_SLICE_LENGTH(slice);
- if (bs->on_next != NULL) {
- *bs->next = slice;
- grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE);
- bs->on_next = NULL;
- } else {
- grpc_slice_buffer_add(&bs->slices, slice);
+ if (slice_out != NULL) {
+ *slice_out = slice;
}
+ return GRPC_ERROR_NONE;
}
- gpr_mu_unlock(&bs->slice_mu);
}
-void grpc_chttp2_incoming_byte_stream_finished(
+grpc_error *grpc_chttp2_incoming_byte_stream_finished(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
- grpc_error *error) {
+ grpc_error *error, bool reset_on_error) {
+ grpc_chttp2_stream *s = bs->stream;
+
if (error == GRPC_ERROR_NONE) {
- gpr_mu_lock(&bs->slice_mu);
if (bs->remaining_bytes != 0) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
}
- gpr_mu_unlock(&bs->slice_mu);
}
- if (error != GRPC_ERROR_NONE) {
- incoming_byte_stream_publish_error(exec_ctx, bs, error);
+ if (error != GRPC_ERROR_NONE && reset_on_error) {
+ grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error));
}
incoming_byte_stream_unref(exec_ctx, bs);
+ return error;
}
grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
@@ -2614,26 +2735,12 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
incoming_byte_stream->remaining_bytes = frame_size;
incoming_byte_stream->base.flags = flags;
incoming_byte_stream->base.next = incoming_byte_stream_next;
+ incoming_byte_stream->base.pull = incoming_byte_stream_pull;
incoming_byte_stream->base.destroy = incoming_byte_stream_destroy;
- gpr_mu_init(&incoming_byte_stream->slice_mu);
gpr_ref_init(&incoming_byte_stream->refs, 2);
- incoming_byte_stream->next_message = NULL;
incoming_byte_stream->transport = t;
incoming_byte_stream->stream = s;
- gpr_ref(&incoming_byte_stream->stream->active_streams);
- grpc_slice_buffer_init(&incoming_byte_stream->slices);
- incoming_byte_stream->on_next = NULL;
- incoming_byte_stream->is_tail = 1;
- incoming_byte_stream->error = GRPC_ERROR_NONE;
- grpc_chttp2_incoming_frame_queue *q = &s->incoming_frames;
- if (q->head == NULL) {
- q->head = incoming_byte_stream;
- } else {
- q->tail->is_tail = 0;
- q->tail->next_message = incoming_byte_stream;
- }
- q->tail = incoming_byte_stream;
- grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
+ s->byte_stream_error = GRPC_ERROR_NONE;
return incoming_byte_stream;
}
diff --git a/src/core/ext/transport/chttp2/transport/frame_data.c b/src/core/ext/transport/chttp2/transport/frame_data.c
index e9cf5324b0..1892de25d4 100644
--- a/src/core/ext/transport/chttp2/transport/frame_data.c
+++ b/src/core/ext/transport/chttp2/transport/frame_data.c
@@ -40,6 +40,7 @@
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
#include "src/core/ext/transport/chttp2/transport/internal.h"
+#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/transport.h"
@@ -53,16 +54,17 @@ grpc_error *grpc_chttp2_data_parser_init(grpc_chttp2_data_parser *parser) {
void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx,
grpc_chttp2_data_parser *parser) {
if (parser->parsing_frame != NULL) {
- grpc_chttp2_incoming_byte_stream_finished(
+ GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished(
exec_ctx, parser->parsing_frame,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Parser destroyed"));
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Parser destroyed"), false));
}
GRPC_ERROR_UNREF(parser->error);
}
grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser,
uint8_t flags,
- uint32_t stream_id) {
+ uint32_t stream_id,
+ grpc_chttp2_stream *s) {
if (flags & ~GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
char *msg;
gpr_asprintf(&msg, "unsupported data flags: 0x%02x", flags);
@@ -74,47 +76,14 @@ grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser,
}
if (flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
- parser->is_last_frame = 1;
+ s->received_last_frame = true;
} else {
- parser->is_last_frame = 0;
+ s->received_last_frame = false;
}
return GRPC_ERROR_NONE;
}
-void grpc_chttp2_incoming_frame_queue_merge(
- grpc_chttp2_incoming_frame_queue *head_dst,
- grpc_chttp2_incoming_frame_queue *tail_src) {
- if (tail_src->head == NULL) {
- return;
- }
-
- if (head_dst->head == NULL) {
- *head_dst = *tail_src;
- memset(tail_src, 0, sizeof(*tail_src));
- return;
- }
-
- head_dst->tail->next_message = tail_src->head;
- head_dst->tail = tail_src->tail;
- memset(tail_src, 0, sizeof(*tail_src));
-}
-
-grpc_byte_stream *grpc_chttp2_incoming_frame_queue_pop(
- grpc_chttp2_incoming_frame_queue *q) {
- grpc_byte_stream *out;
- if (q->head == NULL) {
- return NULL;
- }
- out = &q->head->base;
- if (q->head == q->tail) {
- memset(q, 0, sizeof(*q));
- } else {
- q->head = q->head->next_message;
- }
- return out;
-}
-
void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer *inbuf,
uint32_t write_bytes, int is_eof,
grpc_transport_one_way_stats *stats,
@@ -143,145 +112,217 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer *inbuf,
stats->data_bytes += write_bytes;
}
-static grpc_error *parse_inner(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_data_parser *p,
- grpc_chttp2_transport *t, grpc_chttp2_stream *s,
- grpc_slice slice) {
- uint8_t *const beg = GRPC_SLICE_START_PTR(slice);
- uint8_t *const end = GRPC_SLICE_END_PTR(slice);
- uint8_t *cur = beg;
- uint32_t message_flags;
- grpc_chttp2_incoming_byte_stream *incoming_byte_stream;
- char *msg;
+grpc_error *deframe_unprocessed_incoming_frames(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_data_parser *p,
+ grpc_chttp2_stream *s,
+ grpc_slice_buffer *slices,
+ grpc_slice *slice_out,
+ grpc_byte_stream **stream_out) {
+ grpc_error *error = GRPC_ERROR_NONE;
+ grpc_chttp2_transport *t = s->t;
- if (cur == end) {
- return GRPC_ERROR_NONE;
- }
+ while (slices->count > 0) {
+ uint8_t *beg = NULL;
+ uint8_t *end = NULL;
+ uint8_t *cur = NULL;
- switch (p->state) {
- case GRPC_CHTTP2_DATA_ERROR:
- p->state = GRPC_CHTTP2_DATA_ERROR;
- return GRPC_ERROR_REF(p->error);
- fh_0:
- case GRPC_CHTTP2_DATA_FH_0:
- s->stats.incoming.framing_bytes++;
- p->frame_type = *cur;
- switch (p->frame_type) {
- case 0:
- p->is_frame_compressed = 0; /* GPR_FALSE */
- break;
- case 1:
- p->is_frame_compressed = 1; /* GPR_TRUE */
- break;
- default:
- gpr_asprintf(&msg, "Bad GRPC frame type 0x%02x", p->frame_type);
- p->error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
- p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID,
- (intptr_t)s->id);
- gpr_free(msg);
- msg = grpc_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
- p->error = grpc_error_set_str(p->error, GRPC_ERROR_STR_RAW_BYTES,
- grpc_slice_from_copied_string(msg));
- gpr_free(msg);
- p->error =
- grpc_error_set_int(p->error, GRPC_ERROR_INT_OFFSET, cur - beg);
- p->state = GRPC_CHTTP2_DATA_ERROR;
- return GRPC_ERROR_REF(p->error);
- }
- if (++cur == end) {
- p->state = GRPC_CHTTP2_DATA_FH_1;
- return GRPC_ERROR_NONE;
- }
- /* fallthrough */
- case GRPC_CHTTP2_DATA_FH_1:
- s->stats.incoming.framing_bytes++;
- p->frame_size = ((uint32_t)*cur) << 24;
- if (++cur == end) {
- p->state = GRPC_CHTTP2_DATA_FH_2;
- return GRPC_ERROR_NONE;
- }
- /* fallthrough */
- case GRPC_CHTTP2_DATA_FH_2:
- s->stats.incoming.framing_bytes++;
- p->frame_size |= ((uint32_t)*cur) << 16;
- if (++cur == end) {
- p->state = GRPC_CHTTP2_DATA_FH_3;
- return GRPC_ERROR_NONE;
- }
- /* fallthrough */
- case GRPC_CHTTP2_DATA_FH_3:
- s->stats.incoming.framing_bytes++;
- p->frame_size |= ((uint32_t)*cur) << 8;
- if (++cur == end) {
- p->state = GRPC_CHTTP2_DATA_FH_4;
- return GRPC_ERROR_NONE;
- }
- /* fallthrough */
- case GRPC_CHTTP2_DATA_FH_4:
- s->stats.incoming.framing_bytes++;
- p->frame_size |= ((uint32_t)*cur);
- p->state = GRPC_CHTTP2_DATA_FRAME;
- ++cur;
- message_flags = 0;
- if (p->is_frame_compressed) {
- message_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
- }
- p->parsing_frame = incoming_byte_stream =
- grpc_chttp2_incoming_byte_stream_create(exec_ctx, t, s, p->frame_size,
- message_flags);
- /* fallthrough */
- case GRPC_CHTTP2_DATA_FRAME:
- if (cur == end) {
- return GRPC_ERROR_NONE;
- }
- uint32_t remaining = (uint32_t)(end - cur);
- if (remaining == p->frame_size) {
- s->stats.incoming.data_bytes += p->frame_size;
- grpc_chttp2_incoming_byte_stream_push(
- exec_ctx, p->parsing_frame,
- grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
- grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame,
- GRPC_ERROR_NONE);
- p->parsing_frame = NULL;
- p->state = GRPC_CHTTP2_DATA_FH_0;
- return GRPC_ERROR_NONE;
- } else if (remaining > p->frame_size) {
- s->stats.incoming.data_bytes += p->frame_size;
- grpc_chttp2_incoming_byte_stream_push(
- exec_ctx, p->parsing_frame,
- grpc_slice_sub(slice, (size_t)(cur - beg),
- (size_t)(cur + p->frame_size - beg)));
- grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame,
- GRPC_ERROR_NONE);
- p->parsing_frame = NULL;
- cur += p->frame_size;
- goto fh_0; /* loop */
- } else {
- GPR_ASSERT(remaining <= p->frame_size);
- grpc_chttp2_incoming_byte_stream_push(
- exec_ctx, p->parsing_frame,
- grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
- p->frame_size -= remaining;
- s->stats.incoming.data_bytes += remaining;
+ grpc_slice slice = grpc_slice_buffer_take_first(slices);
+
+ beg = GRPC_SLICE_START_PTR(slice);
+ end = GRPC_SLICE_END_PTR(slice);
+ cur = beg;
+ uint32_t message_flags;
+ char *msg;
+
+ if (cur == end) {
+ grpc_slice_unref_internal(exec_ctx, slice);
+ continue;
+ }
+
+ switch (p->state) {
+ case GRPC_CHTTP2_DATA_ERROR:
+ p->state = GRPC_CHTTP2_DATA_ERROR;
+ grpc_slice_unref_internal(exec_ctx, slice);
+ return GRPC_ERROR_REF(p->error);
+ case GRPC_CHTTP2_DATA_FH_0:
+ p->frame_type = *cur;
+ switch (p->frame_type) {
+ case 0:
+ p->is_frame_compressed = false; /* GPR_FALSE */
+ break;
+ case 1:
+ p->is_frame_compressed = true; /* GPR_TRUE */
+ break;
+ default:
+ gpr_asprintf(&msg, "Bad GRPC frame type 0x%02x", p->frame_type);
+ p->error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
+ p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID,
+ (intptr_t)s->id);
+ gpr_free(msg);
+ msg = grpc_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ p->error = grpc_error_set_str(p->error, GRPC_ERROR_STR_RAW_BYTES,
+ grpc_slice_from_copied_string(msg));
+ gpr_free(msg);
+ p->error =
+ grpc_error_set_int(p->error, GRPC_ERROR_INT_OFFSET, cur - beg);
+ p->state = GRPC_CHTTP2_DATA_ERROR;
+ grpc_slice_unref_internal(exec_ctx, slice);
+ return GRPC_ERROR_REF(p->error);
+ }
+ if (++cur == end) {
+ p->state = GRPC_CHTTP2_DATA_FH_1;
+ grpc_slice_unref_internal(exec_ctx, slice);
+ continue;
+ }
+ /* fallthrough */
+ case GRPC_CHTTP2_DATA_FH_1:
+ p->frame_size = ((uint32_t)*cur) << 24;
+ if (++cur == end) {
+ p->state = GRPC_CHTTP2_DATA_FH_2;
+ grpc_slice_unref_internal(exec_ctx, slice);
+ continue;
+ }
+ /* fallthrough */
+ case GRPC_CHTTP2_DATA_FH_2:
+ p->frame_size |= ((uint32_t)*cur) << 16;
+ if (++cur == end) {
+ p->state = GRPC_CHTTP2_DATA_FH_3;
+ grpc_slice_unref_internal(exec_ctx, slice);
+ continue;
+ }
+ /* fallthrough */
+ case GRPC_CHTTP2_DATA_FH_3:
+ p->frame_size |= ((uint32_t)*cur) << 8;
+ if (++cur == end) {
+ p->state = GRPC_CHTTP2_DATA_FH_4;
+ grpc_slice_unref_internal(exec_ctx, slice);
+ continue;
+ }
+ /* fallthrough */
+ case GRPC_CHTTP2_DATA_FH_4:
+ GPR_ASSERT(stream_out != NULL);
+ GPR_ASSERT(p->parsing_frame == NULL);
+ p->frame_size |= ((uint32_t)*cur);
+ p->state = GRPC_CHTTP2_DATA_FRAME;
+ ++cur;
+ message_flags = 0;
+ if (p->is_frame_compressed) {
+ message_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
+ }
+ p->parsing_frame = grpc_chttp2_incoming_byte_stream_create(
+ exec_ctx, t, s, p->frame_size, message_flags);
+ *stream_out = &p->parsing_frame->base;
+ if (p->parsing_frame->remaining_bytes == 0) {
+ GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished(
+ exec_ctx, p->parsing_frame, GRPC_ERROR_NONE, true));
+ p->parsing_frame = NULL;
+ p->state = GRPC_CHTTP2_DATA_FH_0;
+ }
+ s->pending_byte_stream = true;
+
+ if (cur != end) {
+ grpc_slice_buffer_undo_take_first(
+ &s->unprocessed_incoming_frames_buffer,
+ grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
+ }
+ grpc_slice_unref_internal(exec_ctx, slice);
return GRPC_ERROR_NONE;
+ case GRPC_CHTTP2_DATA_FRAME: {
+ GPR_ASSERT(p->parsing_frame != NULL);
+ GPR_ASSERT(slice_out != NULL);
+ if (cur == end) {
+ grpc_slice_unref_internal(exec_ctx, slice);
+ continue;
+ }
+ uint32_t remaining = (uint32_t)(end - cur);
+ if (remaining == p->frame_size) {
+ if (GRPC_ERROR_NONE != (error = grpc_chttp2_incoming_byte_stream_push(
+ exec_ctx, p->parsing_frame,
+ grpc_slice_sub(slice, (size_t)(cur - beg),
+ (size_t)(end - beg)),
+ slice_out))) {
+ grpc_slice_unref_internal(exec_ctx, slice);
+ return error;
+ }
+ if (GRPC_ERROR_NONE !=
+ (error = grpc_chttp2_incoming_byte_stream_finished(
+ exec_ctx, p->parsing_frame, GRPC_ERROR_NONE, true))) {
+ grpc_slice_unref_internal(exec_ctx, slice);
+ return error;
+ }
+ p->parsing_frame = NULL;
+ p->state = GRPC_CHTTP2_DATA_FH_0;
+ grpc_slice_unref_internal(exec_ctx, slice);
+ return GRPC_ERROR_NONE;
+ } else if (remaining < p->frame_size) {
+ if (GRPC_ERROR_NONE != (error = grpc_chttp2_incoming_byte_stream_push(
+ exec_ctx, p->parsing_frame,
+ grpc_slice_sub(slice, (size_t)(cur - beg),
+ (size_t)(end - beg)),
+ slice_out))) {
+ return error;
+ }
+ p->frame_size -= remaining;
+ grpc_slice_unref_internal(exec_ctx, slice);
+ return GRPC_ERROR_NONE;
+ } else {
+ GPR_ASSERT(remaining > p->frame_size);
+ if (GRPC_ERROR_NONE !=
+ (grpc_chttp2_incoming_byte_stream_push(
+ exec_ctx, p->parsing_frame,
+ grpc_slice_sub(slice, (size_t)(cur - beg),
+ (size_t)(cur + p->frame_size - beg)),
+ slice_out))) {
+ grpc_slice_unref_internal(exec_ctx, slice);
+ return error;
+ }
+ if (GRPC_ERROR_NONE !=
+ (error = grpc_chttp2_incoming_byte_stream_finished(
+ exec_ctx, p->parsing_frame, GRPC_ERROR_NONE, true))) {
+ grpc_slice_unref_internal(exec_ctx, slice);
+ return error;
+ }
+ p->parsing_frame = NULL;
+ p->state = GRPC_CHTTP2_DATA_FH_0;
+ cur += p->frame_size;
+ grpc_slice_buffer_undo_take_first(
+ &s->unprocessed_incoming_frames_buffer,
+ grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
+ grpc_slice_unref_internal(exec_ctx, slice);
+ return GRPC_ERROR_NONE;
+ }
}
+ }
}
- GPR_UNREACHABLE_CODE(
- return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Should never reach here"));
+ return GRPC_ERROR_NONE;
}
grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s,
grpc_slice slice, int is_last) {
- grpc_chttp2_data_parser *p = parser;
- grpc_error *error = parse_inner(exec_ctx, p, t, s, slice);
+ /* grpc_error *error = parse_inner_buffer(exec_ctx, p, t, s, slice); */
+ s->stats.incoming.framing_bytes += GRPC_SLICE_LENGTH(slice);
+ if (!s->pending_byte_stream) {
+ grpc_slice_ref_internal(slice);
+ grpc_slice_buffer_add(&s->frame_storage, slice);
+ grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
+ } else if (s->on_next) {
+ GPR_ASSERT(s->frame_storage.length == 0);
+ grpc_slice_ref_internal(slice);
+ grpc_slice_buffer_add(&s->unprocessed_incoming_frames_buffer, slice);
+ grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_NONE);
+ s->on_next = NULL;
+ } else {
+ grpc_slice_ref_internal(slice);
+ grpc_slice_buffer_add(&s->frame_storage, slice);
+ }
- if (is_last && p->is_last_frame) {
+ if (is_last && s->received_last_frame) {
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, false,
GRPC_ERROR_NONE);
}
- return error;
+ return GRPC_ERROR_NONE;
}
diff --git a/src/core/ext/transport/chttp2/transport/frame_data.h b/src/core/ext/transport/chttp2/transport/frame_data.h
index 264ad14608..2fb8983c38 100644
--- a/src/core/ext/transport/chttp2/transport/frame_data.h
+++ b/src/core/ext/transport/chttp2/transport/frame_data.h
@@ -56,28 +56,16 @@ typedef enum {
typedef struct grpc_chttp2_incoming_byte_stream
grpc_chttp2_incoming_byte_stream;
-typedef struct grpc_chttp2_incoming_frame_queue {
- grpc_chttp2_incoming_byte_stream *head;
- grpc_chttp2_incoming_byte_stream *tail;
-} grpc_chttp2_incoming_frame_queue;
-
typedef struct {
grpc_chttp2_stream_state state;
- uint8_t is_last_frame;
uint8_t frame_type;
uint32_t frame_size;
grpc_error *error;
- int is_frame_compressed;
+ bool is_frame_compressed;
grpc_chttp2_incoming_byte_stream *parsing_frame;
} grpc_chttp2_data_parser;
-void grpc_chttp2_incoming_frame_queue_merge(
- grpc_chttp2_incoming_frame_queue *head_dst,
- grpc_chttp2_incoming_frame_queue *tail_src);
-grpc_byte_stream *grpc_chttp2_incoming_frame_queue_pop(
- grpc_chttp2_incoming_frame_queue *q);
-
/* initialize per-stream state for data frame parsing */
grpc_error *grpc_chttp2_data_parser_init(grpc_chttp2_data_parser *parser);
@@ -87,7 +75,8 @@ void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx,
/* start processing a new data frame */
grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser,
uint8_t flags,
- uint32_t stream_id);
+ uint32_t stream_id,
+ grpc_chttp2_stream *s);
/* handle a slice of a data frame - is_last indicates the last slice of a
frame */
@@ -101,4 +90,11 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer *inbuf,
grpc_transport_one_way_stats *stats,
grpc_slice_buffer *outbuf);
+grpc_error *deframe_unprocessed_incoming_frames(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_data_parser *p,
+ grpc_chttp2_stream *s,
+ grpc_slice_buffer *slices,
+ grpc_slice *slice_out,
+ grpc_byte_stream **stream_out);
+
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_DATA_H */
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 0452081dd5..0aaa4aebe5 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -195,22 +195,20 @@ typedef struct grpc_chttp2_write_cb {
struct grpc_chttp2_incoming_byte_stream {
grpc_byte_stream base;
gpr_refcount refs;
- struct grpc_chttp2_incoming_byte_stream *next_message;
- grpc_error *error;
- grpc_chttp2_transport *transport;
- grpc_chttp2_stream *stream;
- bool is_tail;
+ grpc_chttp2_transport *transport; /* immutable */
+ grpc_chttp2_stream *stream; /* immutable */
- gpr_mu slice_mu; // protects slices, on_next
- grpc_slice_buffer slices;
- grpc_closure *on_next;
- grpc_slice *next;
+ /* Accessed only by transport thread when stream->pending_byte_stream == false
+ * Accessed only by application thread when stream->pending_byte_stream ==
+ * true */
uint32_t remaining_bytes;
+ /* Accessed only by transport thread when stream->pending_byte_stream == false
+ * Accessed only by application thread when stream->pending_byte_stream ==
+ * true */
struct {
grpc_closure closure;
- grpc_slice *slice;
size_t max_size_hint;
grpc_closure *on_complete;
} next_action;
@@ -446,8 +444,8 @@ struct grpc_chttp2_stream {
uint32_t id;
/** window available for us to send to peer, over or under the initial window
- * size of the transport... ie:
- * outgoing_window = outgoing_window_delta + transport.initial_window_size */
+ * size of the transport... ie:
+ * outgoing_window = outgoing_window_delta + transport.initial_window_size */
int64_t outgoing_window_delta;
/** things the upper layers would like to send */
grpc_metadata_batch *send_initial_metadata;
@@ -474,9 +472,6 @@ struct grpc_chttp2_stream {
grpc_transport_stream_stats *collecting_stats;
grpc_transport_stream_stats stats;
- /** number of streams that are currently being read */
- gpr_refcount active_streams;
-
/** Is this stream closed for writing. */
bool write_closed;
/** Is this stream reading half-closed. */
@@ -500,7 +495,17 @@ struct grpc_chttp2_stream {
grpc_chttp2_incoming_metadata_buffer metadata_buffer[2];
- grpc_chttp2_incoming_frame_queue incoming_frames;
+ grpc_slice_buffer frame_storage; /* protected by t combiner */
+
+ /* Accessed only by transport thread when stream->pending_byte_stream == false
+ * Accessed only by application thread when stream->pending_byte_stream ==
+ * true */
+ grpc_slice_buffer unprocessed_incoming_frames_buffer;
+ grpc_closure *on_next; /* protected by t combiner */
+ bool pending_byte_stream; /* protected by t combiner */
+ grpc_closure reset_byte_stream;
+ grpc_error *byte_stream_error; /* protected by t combiner */
+ bool received_last_frame; /* protected by t combiner */
gpr_timespec deadline;
@@ -513,6 +518,9 @@ struct grpc_chttp2_stream {
* incoming_window = incoming_window_delta + transport.initial_window_size */
int64_t incoming_window_delta;
/** parsing state for data frames */
+ /* Accessed only by transport thread when stream->pending_byte_stream == false
+ * Accessed only by application thread when stream->pending_byte_stream ==
+ * true */
grpc_chttp2_data_parser data_parser;
/** number of bytes received - reset at end of parse thread execution */
int64_t received_bytes;
@@ -791,10 +799,13 @@ void grpc_chttp2_ref_transport(grpc_chttp2_transport *t);
grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s,
uint32_t frame_size, uint32_t flags);
-void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_incoming_byte_stream *bs,
- grpc_slice slice);
-void grpc_chttp2_incoming_byte_stream_finished(
+grpc_error *grpc_chttp2_incoming_byte_stream_push(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
+ grpc_slice slice, grpc_slice *slice_out);
+grpc_error *grpc_chttp2_incoming_byte_stream_finished(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
+ grpc_error *error, bool reset_on_error);
+void grpc_chttp2_incoming_byte_stream_notify(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
grpc_error *error);
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index 7e457ced27..638b137316 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -458,12 +458,13 @@ static grpc_error *init_data_frame_parser(grpc_exec_ctx *exec_ctx,
return init_skip_frame_parser(exec_ctx, t, 0);
}
if (err == GRPC_ERROR_NONE) {
- err = grpc_chttp2_data_parser_begin_frame(&s->data_parser,
- t->incoming_frame_flags, s->id);
+ err = grpc_chttp2_data_parser_begin_frame(
+ &s->data_parser, t->incoming_frame_flags, s->id, s);
}
error_handler:
if (err == GRPC_ERROR_NONE) {
t->incoming_stream = s;
+ /* t->parser = grpc_chttp2_data_parser_parse;*/
t->parser = grpc_chttp2_data_parser_parse;
t->parser_data = &s->data_parser;
return GRPC_ERROR_NONE;
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index e0d332fd95..d4e89d6a6c 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -973,9 +973,20 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer write_slice_buffer;
grpc_slice slice;
grpc_slice_buffer_init(&write_slice_buffer);
- grpc_byte_stream_next(
- NULL, stream_op->payload->send_message.send_message, &slice,
- stream_op->payload->send_message.send_message->length, NULL);
+ if (1 != grpc_byte_stream_next(
+ exec_ctx, stream_op->payload->send_message.send_message,
+ stream_op->payload->send_message.send_message->length,
+ NULL)) {
+ /* Should never reach here */
+ GPR_ASSERT(false);
+ }
+ if (GRPC_ERROR_NONE !=
+ grpc_byte_stream_pull(exec_ctx,
+ stream_op->payload->send_message.send_message,
+ &slice)) {
+ /* Should never reach here */
+ GPR_ASSERT(false);
+ }
grpc_slice_buffer_add(&write_slice_buffer, slice);
if (write_slice_buffer.count != 1) {
/* Empty request not handled yet */
diff --git a/src/core/lib/channel/channel_args.c b/src/core/lib/channel/channel_args.c
index 3de31d99da..238d176dfa 100644
--- a/src/core/lib/channel/channel_args.c
+++ b/src/core/lib/channel/channel_args.c
@@ -31,17 +31,18 @@
*
*/
-#include "src/core/lib/channel/channel_args.h"
-#include <grpc/grpc.h>
-#include "src/core/lib/support/string.h"
+#include <limits.h>
+#include <string.h>
#include <grpc/compression.h>
+#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
-#include <string.h>
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/support/string.h"
static grpc_arg copy_arg(const grpc_arg *src) {
grpc_arg dst;
@@ -330,7 +331,7 @@ const grpc_arg *grpc_channel_args_find(const grpc_channel_args *args,
}
int grpc_channel_arg_get_integer(const grpc_arg *arg,
- grpc_integer_options options) {
+ const grpc_integer_options options) {
if (arg == NULL) return options.default_value;
if (arg->type != GRPC_ARG_INTEGER) {
gpr_log(GPR_ERROR, "%s ignored: it must be an integer", arg->key);
diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h
index 5ffcacb7fd..f0f603e251 100644
--- a/src/core/lib/channel/channel_args.h
+++ b/src/core/lib/channel/channel_args.h
@@ -120,9 +120,10 @@ typedef struct grpc_integer_options {
int min_value;
int max_value;
} grpc_integer_options;
+
/** Returns the value of \a arg, subject to the contraints in \a options. */
int grpc_channel_arg_get_integer(const grpc_arg *arg,
- grpc_integer_options options);
+ const grpc_integer_options options);
bool grpc_channel_arg_get_bool(const grpc_arg *arg, bool default_value);
diff --git a/src/core/lib/iomgr/sockaddr_utils.h b/src/core/lib/iomgr/sockaddr_utils.h
index 2b22f11b49..be3ea2038f 100644
--- a/src/core/lib/iomgr/sockaddr_utils.h
+++ b/src/core/lib/iomgr/sockaddr_utils.h
@@ -50,7 +50,7 @@ int grpc_sockaddr_to_v4mapped(const grpc_resolved_address *addr,
grpc_resolved_address *addr6_out);
/* If addr is ::, 0.0.0.0, or ::ffff:0.0.0.0, writes the port number to
- *port_out (if not NULL) and returns true, otherwise returns false. */
+ *port_out (if not NULL) and returns true, otherwise returns false. */
int grpc_sockaddr_is_wildcard(const grpc_resolved_address *addr, int *port_out);
/* Writes 0.0.0.0:port and [::]:port to separate sockaddrs. */
diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c
index 60579e18ba..ca283d034f 100644
--- a/src/core/lib/iomgr/udp_server.c
+++ b/src/core/lib/iomgr/udp_server.c
@@ -79,10 +79,15 @@ struct grpc_udp_listener {
grpc_resolved_address addr;
grpc_closure read_closure;
grpc_closure write_closure;
+ // To be called when corresponding QuicGrpcServer closes all active
+ // connections.
+ grpc_closure orphan_fd_closure;
grpc_closure destroyed_closure;
grpc_udp_server_read_cb read_cb;
grpc_udp_server_write_cb write_cb;
grpc_udp_server_orphan_cb orphan_cb;
+ // True if orphan_cb is trigered.
+ bool orphan_notified;
struct grpc_udp_listener *next;
};
@@ -146,6 +151,14 @@ grpc_udp_server *grpc_udp_server_create(const grpc_channel_args *args) {
return s;
}
+static void shutdown_fd(grpc_exec_ctx *exec_ctx, void *fd, grpc_error *error) {
+ grpc_fd_shutdown(exec_ctx, (grpc_fd *)fd, GRPC_ERROR_REF(error));
+}
+
+static void dummy_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+ // No-op.
+}
+
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
if (s->shutdown_complete != NULL) {
grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
@@ -195,12 +208,16 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
grpc_closure_init(&sp->destroyed_closure, destroyed_port, s,
grpc_schedule_on_exec_ctx);
-
- /* Call the orphan_cb to signal that the FD is about to be closed and
- * should no longer be used. */
- GPR_ASSERT(sp->orphan_cb);
- sp->orphan_cb(exec_ctx, sp->emfd, sp->server->user_data);
-
+ if (!sp->orphan_notified) {
+ /* Call the orphan_cb to signal that the FD is about to be closed and
+ * should no longer be used. Because at this point, all listening ports
+ * have been shutdown already, no need to shutdown again.*/
+ grpc_closure_init(&sp->orphan_fd_closure, dummy_cb, sp->emfd,
+ grpc_schedule_on_exec_ctx);
+ GPR_ASSERT(sp->orphan_cb);
+ sp->orphan_cb(exec_ctx, sp->emfd, &sp->orphan_fd_closure,
+ sp->server->user_data);
+ }
grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL,
"udp_listener_shutdown");
}
@@ -225,9 +242,11 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
if (s->active_ports) {
for (sp = s->head; sp; sp = sp->next) {
GPR_ASSERT(sp->orphan_cb);
- sp->orphan_cb(exec_ctx, sp->emfd, sp->server->user_data);
- grpc_fd_shutdown(exec_ctx, sp->emfd, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "Server destroyed"));
+ grpc_closure_init(&sp->orphan_fd_closure, shutdown_fd, sp->emfd,
+ grpc_schedule_on_exec_ctx);
+ sp->orphan_cb(exec_ctx, sp->emfd, &sp->orphan_fd_closure,
+ sp->server->user_data);
+ sp->orphan_notified = true;
}
gpr_mu_unlock(&s->mu);
} else {
@@ -391,6 +410,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd,
sp->read_cb = read_cb;
sp->write_cb = write_cb;
sp->orphan_cb = orphan_cb;
+ sp->orphan_notified = false;
GPR_ASSERT(sp->emfd);
gpr_mu_unlock(&s->mu);
gpr_free(name);
diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h
index 9df3fe4d1f..8006037644 100644
--- a/src/core/lib/iomgr/udp_server.h
+++ b/src/core/lib/iomgr/udp_server.h
@@ -55,7 +55,9 @@ typedef void (*grpc_udp_server_write_cb)(grpc_exec_ctx *exec_ctx, grpc_fd *emfd,
/* Called when the grpc_fd is about to be orphaned (and the FD closed). */
typedef void (*grpc_udp_server_orphan_cb)(grpc_exec_ctx *exec_ctx,
- grpc_fd *emfd, void *user_data);
+ grpc_fd *emfd,
+ grpc_closure *shutdown_fd_callback,
+ void *user_data);
/* Create a server, initially not bound to any ports */
grpc_udp_server *grpc_udp_server_create(const grpc_channel_args *args);
diff --git a/src/core/lib/security/credentials/fake/fake_credentials.c b/src/core/lib/security/credentials/fake/fake_credentials.c
index 68636ba208..3fdb67fb91 100644
--- a/src/core/lib/security/credentials/fake/fake_credentials.c
+++ b/src/core/lib/security/credentials/fake/fake_credentials.c
@@ -39,11 +39,15 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
+#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/support/string.h"
/* -- Fake transport security credentials. -- */
+#define GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS \
+ "grpc.fake_security.expected_targets"
+
static grpc_security_status fake_transport_security_create_security_connector(
grpc_exec_ctx *exec_ctx, grpc_channel_credentials *c,
grpc_call_credentials *call_creds, const char *target,
@@ -88,6 +92,25 @@ grpc_server_credentials *grpc_fake_transport_security_server_credentials_create(
return c;
}
+grpc_arg grpc_fake_transport_expected_targets_arg(char *expected_targets) {
+ grpc_arg arg;
+ arg.type = GRPC_ARG_STRING;
+ arg.key = GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS;
+ arg.value.string = expected_targets;
+ return arg;
+}
+
+const char *grpc_fake_transport_get_expected_targets(
+ const grpc_channel_args *args) {
+ const grpc_arg *expected_target_arg =
+ grpc_channel_args_find(args, GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS);
+ if (expected_target_arg != NULL &&
+ expected_target_arg->type == GRPC_ARG_STRING) {
+ return expected_target_arg->value.string;
+ }
+ return NULL;
+}
+
/* -- Metadata-only test credentials. -- */
static void md_only_test_destruct(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/security/credentials/fake/fake_credentials.h b/src/core/lib/security/credentials/fake/fake_credentials.h
index 0fe98417c6..a28b545a67 100644
--- a/src/core/lib/security/credentials/fake/fake_credentials.h
+++ b/src/core/lib/security/credentials/fake/fake_credentials.h
@@ -38,10 +38,17 @@
/* -- Fake transport security credentials. -- */
+/* Creates a fake transport security credentials object for testing. */
+grpc_channel_credentials *grpc_fake_transport_security_credentials_create(void);
+
+/* Creates a fake server transport security credentials object for testing. */
+grpc_server_credentials *grpc_fake_transport_security_server_credentials_create(
+ void);
+
/* Used to verify the target names given to the fake transport security
* connector.
*
- * Its syntax by example:
+ * The syntax of \a expected_targets by example:
* For LB channels:
* "backend_target_1,backend_target_2,...;lb_target_1,lb_target_2,..."
* For regular channels:
@@ -50,15 +57,11 @@
* That is to say, LB channels have a heading list of LB targets separated from
* the list of backend targets by a semicolon. For non-LB channels, only the
* latter is present. */
-#define GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS \
- "grpc.test_only.fake_security.expected_target"
+grpc_arg grpc_fake_transport_expected_targets_arg(char *expected_targets);
-/* Creates a fake transport security credentials object for testing. */
-grpc_channel_credentials *grpc_fake_transport_security_credentials_create(void);
-
-/* Creates a fake server transport security credentials object for testing. */
-grpc_server_credentials *grpc_fake_transport_security_server_credentials_create(
- void);
+/* Return the value associated with the expected targets channel arg or NULL */
+const char *grpc_fake_transport_get_expected_targets(
+ const grpc_channel_args *args);
/* -- Metadata-only Test credentials. -- */
diff --git a/src/core/lib/security/transport/security_connector.c b/src/core/lib/security/transport/security_connector.c
index dbe3263f92..b15196e677 100644
--- a/src/core/lib/security/transport/security_connector.c
+++ b/src/core/lib/security/transport/security_connector.c
@@ -423,12 +423,8 @@ grpc_channel_security_connector *grpc_fake_channel_security_connector_create(
c->base.check_call_host = fake_channel_check_call_host;
c->base.add_handshakers = fake_channel_add_handshakers;
c->target = gpr_strdup(target);
- const grpc_arg *expected_target_arg =
- grpc_channel_args_find(args, GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS);
- if (expected_target_arg != NULL) {
- GPR_ASSERT(expected_target_arg->type == GRPC_ARG_STRING);
- c->expected_targets = gpr_strdup(expected_target_arg->value.string);
- }
+ const char *expected_targets = grpc_fake_transport_get_expected_targets(args);
+ c->expected_targets = gpr_strdup(expected_targets);
c->is_lb_channel = (grpc_lb_targets_info_find_in_args(args) != NULL);
return &c->base;
}
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 97d50a91be..3e96d09798 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -1187,6 +1187,7 @@ static void finish_batch_step(grpc_exec_ctx *exec_ctx, batch_control *bctl) {
static void continue_receiving_slices(grpc_exec_ctx *exec_ctx,
batch_control *bctl) {
+ grpc_error *error;
grpc_call *call = bctl->call;
for (;;) {
size_t remaining = call->receiving_stream->length -
@@ -1198,11 +1199,22 @@ static void continue_receiving_slices(grpc_exec_ctx *exec_ctx,
finish_batch_step(exec_ctx, bctl);
return;
}
- if (grpc_byte_stream_next(exec_ctx, call->receiving_stream,
- &call->receiving_slice, remaining,
+ if (grpc_byte_stream_next(exec_ctx, call->receiving_stream, remaining,
&call->receiving_slice_ready)) {
- grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
- call->receiving_slice);
+ error = grpc_byte_stream_pull(exec_ctx, call->receiving_stream,
+ &call->receiving_slice);
+ if (error == GRPC_ERROR_NONE) {
+ grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
+ call->receiving_slice);
+ } else {
+ grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
+ call->receiving_stream = NULL;
+ grpc_byte_buffer_destroy(*call->receiving_buffer);
+ *call->receiving_buffer = NULL;
+ call->receiving_message = 0;
+ finish_batch_step(exec_ctx, bctl);
+ return;
+ }
} else {
return;
}
@@ -1213,12 +1225,24 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_error *error) {
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
+ grpc_byte_stream *bs = call->receiving_stream;
+ bool release_error = false;
if (error == GRPC_ERROR_NONE) {
- grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
- call->receiving_slice);
- continue_receiving_slices(exec_ctx, bctl);
- } else {
+ grpc_slice slice;
+ error = grpc_byte_stream_pull(exec_ctx, bs, &slice);
+ if (error == GRPC_ERROR_NONE) {
+ grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
+ slice);
+ continue_receiving_slices(exec_ctx, bctl);
+ } else {
+ /* Error returned by grpc_byte_stream_pull needs to be released manually
+ */
+ release_error = true;
+ }
+ }
+
+ if (error != GRPC_ERROR_NONE) {
if (grpc_trace_operation_failures) {
GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error));
}
@@ -1226,7 +1250,11 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
call->receiving_stream = NULL;
grpc_byte_buffer_destroy(*call->receiving_buffer);
*call->receiving_buffer = NULL;
+ call->receiving_message = 0;
finish_batch_step(exec_ctx, bctl);
+ if (release_error) {
+ GRPC_ERROR_UNREF(error);
+ }
}
}
diff --git a/src/core/lib/transport/byte_stream.c b/src/core/lib/transport/byte_stream.c
index 4d4206189e..5800c70ef4 100644
--- a/src/core/lib/transport/byte_stream.c
+++ b/src/core/lib/transport/byte_stream.c
@@ -40,10 +40,15 @@
#include "src/core/lib/slice/slice_internal.h"
int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *byte_stream, grpc_slice *slice,
- size_t max_size_hint, grpc_closure *on_complete) {
- return byte_stream->next(exec_ctx, byte_stream, slice, max_size_hint,
- on_complete);
+ grpc_byte_stream *byte_stream, size_t max_size_hint,
+ grpc_closure *on_complete) {
+ return byte_stream->next(exec_ctx, byte_stream, max_size_hint, on_complete);
+}
+
+grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream,
+ grpc_slice *slice) {
+ return byte_stream->pull(exec_ctx, byte_stream, slice);
}
void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
@@ -53,16 +58,24 @@ void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
/* slice_buffer_stream */
-static int slice_buffer_stream_next(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *byte_stream,
- grpc_slice *slice, size_t max_size_hint,
- grpc_closure *on_complete) {
+static bool slice_buffer_stream_next(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream,
+ size_t max_size_hint,
+ grpc_closure *on_complete) {
+ grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream;
+ GPR_ASSERT(stream->cursor < stream->backing_buffer->count);
+ return true;
+}
+
+static grpc_error *slice_buffer_stream_pull(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream,
+ grpc_slice *slice) {
grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream;
GPR_ASSERT(stream->cursor < stream->backing_buffer->count);
*slice =
grpc_slice_ref_internal(stream->backing_buffer->slices[stream->cursor]);
stream->cursor++;
- return 1;
+ return GRPC_ERROR_NONE;
}
static void slice_buffer_stream_destroy(grpc_exec_ctx *exec_ctx,
@@ -75,6 +88,7 @@ void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream,
stream->base.length = (uint32_t)slice_buffer->length;
stream->base.flags = flags;
stream->base.next = slice_buffer_stream_next;
+ stream->base.pull = slice_buffer_stream_pull;
stream->base.destroy = slice_buffer_stream_destroy;
stream->backing_buffer = slice_buffer;
stream->cursor = 0;
diff --git a/src/core/lib/transport/byte_stream.h b/src/core/lib/transport/byte_stream.h
index 1fdd5b4d77..381c65fb04 100644
--- a/src/core/lib/transport/byte_stream.h
+++ b/src/core/lib/transport/byte_stream.h
@@ -49,9 +49,10 @@ typedef struct grpc_byte_stream grpc_byte_stream;
struct grpc_byte_stream {
uint32_t length;
uint32_t flags;
- int (*next)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream,
- grpc_slice *slice, size_t max_size_hint,
- grpc_closure *on_complete);
+ bool (*next)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream,
+ size_t max_size_hint, grpc_closure *on_complete);
+ grpc_error *(*pull)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream,
+ grpc_slice *slice);
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream);
};
@@ -61,12 +62,20 @@ struct grpc_byte_stream {
*
* max_size_hint can be set as a hint as to the maximum number
* of bytes that would be acceptable to read.
+ */
+int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream, size_t max_size_hint,
+ grpc_closure *on_complete);
+
+/* returns the next slice in the byte stream when it is ready (indicated by
+ * either grpc_byte_stream_next returning 1 or on_complete passed to
+ * grpc_byte_stream_next is called).
*
* once a slice is returned into *slice, it is owned by the caller.
*/
-int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *byte_stream, grpc_slice *slice,
- size_t max_size_hint, grpc_closure *on_complete);
+grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream,
+ grpc_slice *slice);
void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream);
diff --git a/src/proto/grpc/lb/v1/load_balancer.proto b/src/proto/grpc/lb/v1/load_balancer.proto
index 44a5150a7e..a2502fb284 100644
--- a/src/proto/grpc/lb/v1/load_balancer.proto
+++ b/src/proto/grpc/lb/v1/load_balancer.proto
@@ -45,6 +45,20 @@ message Duration {
int32 nanos = 2;
}
+message Timestamp {
+
+ // Represents seconds of UTC time since Unix epoch
+ // 1970-01-01T00:00:00Z. Must be from 0001-01-01T00:00:00Z to
+ // 9999-12-31T23:59:59Z inclusive.
+ int64 seconds = 1;
+
+ // Non-negative fractions of a second at nanosecond resolution. Negative
+ // second values with fractions must still have non-negative nanos values
+ // that count forward in time. Must be from 0 to 999,999,999
+ // inclusive.
+ int32 nanos = 2;
+}
+
service LoadBalancer {
// Bidirectional rpc to get a list of servers.
rpc BalanceLoad(stream LoadBalanceRequest)
@@ -63,22 +77,37 @@ message LoadBalanceRequest {
}
message InitialLoadBalanceRequest {
- // Name of load balanced service (IE, service.grpc.gslb.google.com). Its
+ // Name of load balanced service (IE, balancer.service.com)
// length should be less than 256 bytes.
string name = 1;
}
// Contains client level statistics that are useful to load balancing. Each
-// count should be reset to zero after reporting the stats.
+// count except the timestamp should be reset to zero after reporting the stats.
message ClientStats {
- // The total number of requests sent by the client since the last report.
- int64 total_requests = 1;
+ // The timestamp of generating the report.
+ Timestamp timestamp = 1;
- // The number of client rpc errors since the last report.
- int64 client_rpc_errors = 2;
+ // The total number of RPCs that started.
+ int64 num_calls_started = 2;
- // The number of dropped requests since the last report.
- int64 dropped_requests = 3;
+ // The total number of RPCs that finished.
+ int64 num_calls_finished = 3;
+
+ // The total number of RPCs that were dropped by the client because of rate
+ // limiting.
+ int64 num_calls_finished_with_drop_for_rate_limiting = 4;
+
+ // The total number of RPCs that were dropped by the client because of load
+ // balancing.
+ int64 num_calls_finished_with_drop_for_load_balancing = 5;
+
+ // The total number of RPCs that failed to reach a server except dropped RPCs.
+ int64 num_calls_finished_with_client_failed_to_send = 6;
+
+ // The total number of RPCs that finished and are known to have been received
+ // by a server.
+ int64 num_calls_finished_known_received = 7;
}
message LoadBalanceResponse {
@@ -120,6 +149,10 @@ message ServerList {
Duration expiration_interval = 3;
}
+// Contains server information. When none of the [drop_for_*] fields are true,
+// use the other fields. When drop_for_rate_limiting is true, ignore all other
+// fields. Use drop_for_load_balancing only when it is true and
+// drop_for_rate_limiting is false.
message Server {
// A resolved address for the server, serialized in network-byte-order. It may
// either be an IPv4 or IPv6 address.
@@ -137,6 +170,10 @@ message Server {
string load_balance_token = 3;
// Indicates whether this particular request should be dropped by the client
- // when this server is chosen from the list.
- bool drop_request = 4;
+ // for rate limiting.
+ bool drop_for_rate_limiting = 4;
+
+ // Indicates whether this particular request should be dropped by the client
+ // for load balancing.
+ bool drop_for_load_balancing = 5;
}
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
index 063f92114c..5af478c89f 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
@@ -183,6 +183,7 @@ grpc_slice_new_type grpc_slice_new_import;
grpc_slice_new_with_user_data_type grpc_slice_new_with_user_data_import;
grpc_slice_new_with_len_type grpc_slice_new_with_len_import;
grpc_slice_malloc_type grpc_slice_malloc_import;
+grpc_slice_malloc_large_type grpc_slice_malloc_large_import;
grpc_slice_intern_type grpc_slice_intern_import;
grpc_slice_from_copied_string_type grpc_slice_from_copied_string_import;
grpc_slice_from_copied_buffer_type grpc_slice_from_copied_buffer_import;
@@ -191,6 +192,7 @@ grpc_slice_from_static_buffer_type grpc_slice_from_static_buffer_import;
grpc_slice_sub_type grpc_slice_sub_import;
grpc_slice_sub_no_ref_type grpc_slice_sub_no_ref_import;
grpc_slice_split_tail_type grpc_slice_split_tail_import;
+grpc_slice_split_tail_maybe_ref_type grpc_slice_split_tail_maybe_ref_import;
grpc_slice_split_head_type grpc_slice_split_head_import;
grpc_empty_slice_type grpc_empty_slice_import;
grpc_slice_default_hash_impl_type grpc_slice_default_hash_impl_import;
@@ -219,6 +221,7 @@ grpc_slice_buffer_swap_type grpc_slice_buffer_swap_import;
grpc_slice_buffer_move_into_type grpc_slice_buffer_move_into_import;
grpc_slice_buffer_trim_end_type grpc_slice_buffer_trim_end_import;
grpc_slice_buffer_move_first_type grpc_slice_buffer_move_first_import;
+grpc_slice_buffer_move_first_no_ref_type grpc_slice_buffer_move_first_no_ref_import;
grpc_slice_buffer_move_first_into_buffer_type grpc_slice_buffer_move_first_into_buffer_import;
grpc_slice_buffer_take_first_type grpc_slice_buffer_take_first_import;
grpc_slice_buffer_undo_take_first_type grpc_slice_buffer_undo_take_first_import;
@@ -480,6 +483,7 @@ void grpc_rb_load_imports(HMODULE library) {
grpc_slice_new_with_user_data_import = (grpc_slice_new_with_user_data_type) GetProcAddress(library, "grpc_slice_new_with_user_data");
grpc_slice_new_with_len_import = (grpc_slice_new_with_len_type) GetProcAddress(library, "grpc_slice_new_with_len");
grpc_slice_malloc_import = (grpc_slice_malloc_type) GetProcAddress(library, "grpc_slice_malloc");
+ grpc_slice_malloc_large_import = (grpc_slice_malloc_large_type) GetProcAddress(library, "grpc_slice_malloc_large");
grpc_slice_intern_import = (grpc_slice_intern_type) GetProcAddress(library, "grpc_slice_intern");
grpc_slice_from_copied_string_import = (grpc_slice_from_copied_string_type) GetProcAddress(library, "grpc_slice_from_copied_string");
grpc_slice_from_copied_buffer_import = (grpc_slice_from_copied_buffer_type) GetProcAddress(library, "grpc_slice_from_copied_buffer");
@@ -488,6 +492,7 @@ void grpc_rb_load_imports(HMODULE library) {
grpc_slice_sub_import = (grpc_slice_sub_type) GetProcAddress(library, "grpc_slice_sub");
grpc_slice_sub_no_ref_import = (grpc_slice_sub_no_ref_type) GetProcAddress(library, "grpc_slice_sub_no_ref");
grpc_slice_split_tail_import = (grpc_slice_split_tail_type) GetProcAddress(library, "grpc_slice_split_tail");
+ grpc_slice_split_tail_maybe_ref_import = (grpc_slice_split_tail_maybe_ref_type) GetProcAddress(library, "grpc_slice_split_tail_maybe_ref");
grpc_slice_split_head_import = (grpc_slice_split_head_type) GetProcAddress(library, "grpc_slice_split_head");
grpc_empty_slice_import = (grpc_empty_slice_type) GetProcAddress(library, "grpc_empty_slice");
grpc_slice_default_hash_impl_import = (grpc_slice_default_hash_impl_type) GetProcAddress(library, "grpc_slice_default_hash_impl");
@@ -516,6 +521,7 @@ void grpc_rb_load_imports(HMODULE library) {
grpc_slice_buffer_move_into_import = (grpc_slice_buffer_move_into_type) GetProcAddress(library, "grpc_slice_buffer_move_into");
grpc_slice_buffer_trim_end_import = (grpc_slice_buffer_trim_end_type) GetProcAddress(library, "grpc_slice_buffer_trim_end");
grpc_slice_buffer_move_first_import = (grpc_slice_buffer_move_first_type) GetProcAddress(library, "grpc_slice_buffer_move_first");
+ grpc_slice_buffer_move_first_no_ref_import = (grpc_slice_buffer_move_first_no_ref_type) GetProcAddress(library, "grpc_slice_buffer_move_first_no_ref");
grpc_slice_buffer_move_first_into_buffer_import = (grpc_slice_buffer_move_first_into_buffer_type) GetProcAddress(library, "grpc_slice_buffer_move_first_into_buffer");
grpc_slice_buffer_take_first_import = (grpc_slice_buffer_take_first_type) GetProcAddress(library, "grpc_slice_buffer_take_first");
grpc_slice_buffer_undo_take_first_import = (grpc_slice_buffer_undo_take_first_type) GetProcAddress(library, "grpc_slice_buffer_undo_take_first");
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
index e036ff7bd6..59074be035 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
@@ -500,6 +500,9 @@ extern grpc_slice_new_with_len_type grpc_slice_new_with_len_import;
typedef grpc_slice(*grpc_slice_malloc_type)(size_t length);
extern grpc_slice_malloc_type grpc_slice_malloc_import;
#define grpc_slice_malloc grpc_slice_malloc_import
+typedef grpc_slice(*grpc_slice_malloc_large_type)(size_t length);
+extern grpc_slice_malloc_large_type grpc_slice_malloc_large_import;
+#define grpc_slice_malloc_large grpc_slice_malloc_large_import
typedef grpc_slice(*grpc_slice_intern_type)(grpc_slice slice);
extern grpc_slice_intern_type grpc_slice_intern_import;
#define grpc_slice_intern grpc_slice_intern_import
@@ -524,6 +527,9 @@ extern grpc_slice_sub_no_ref_type grpc_slice_sub_no_ref_import;
typedef grpc_slice(*grpc_slice_split_tail_type)(grpc_slice *s, size_t split);
extern grpc_slice_split_tail_type grpc_slice_split_tail_import;
#define grpc_slice_split_tail grpc_slice_split_tail_import
+typedef grpc_slice(*grpc_slice_split_tail_maybe_ref_type)(grpc_slice *s, size_t split, int inc_refs);
+extern grpc_slice_split_tail_maybe_ref_type grpc_slice_split_tail_maybe_ref_import;
+#define grpc_slice_split_tail_maybe_ref grpc_slice_split_tail_maybe_ref_import
typedef grpc_slice(*grpc_slice_split_head_type)(grpc_slice *s, size_t split);
extern grpc_slice_split_head_type grpc_slice_split_head_import;
#define grpc_slice_split_head grpc_slice_split_head_import
@@ -608,6 +614,9 @@ extern grpc_slice_buffer_trim_end_type grpc_slice_buffer_trim_end_import;
typedef void(*grpc_slice_buffer_move_first_type)(grpc_slice_buffer *src, size_t n, grpc_slice_buffer *dst);
extern grpc_slice_buffer_move_first_type grpc_slice_buffer_move_first_import;
#define grpc_slice_buffer_move_first grpc_slice_buffer_move_first_import
+typedef void(*grpc_slice_buffer_move_first_no_ref_type)(grpc_slice_buffer *src, size_t n, grpc_slice_buffer *dst);
+extern grpc_slice_buffer_move_first_no_ref_type grpc_slice_buffer_move_first_no_ref_import;
+#define grpc_slice_buffer_move_first_no_ref grpc_slice_buffer_move_first_no_ref_import
typedef void(*grpc_slice_buffer_move_first_into_buffer_type)(grpc_exec_ctx *exec_ctx, grpc_slice_buffer *src, size_t n, void *dst);
extern grpc_slice_buffer_move_first_into_buffer_type grpc_slice_buffer_move_first_into_buffer_import;
#define grpc_slice_buffer_move_first_into_buffer grpc_slice_buffer_move_first_into_buffer_import
diff --git a/test/core/client_channel/parse_address_test.c b/test/core/client_channel/parse_address_test.c
index 629cdb001f..802e41e5de 100644
--- a/test/core/client_channel/parse_address_test.c
+++ b/test/core/client_channel/parse_address_test.c
@@ -47,12 +47,12 @@
#ifdef GRPC_HAVE_UNIX_SOCKET
-static void test_parse_unix(const char *uri_text, const char *pathname) {
+static void test_grpc_parse_unix(const char *uri_text, const char *pathname) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_uri *uri = grpc_uri_parse(&exec_ctx, uri_text, 0);
grpc_resolved_address addr;
- GPR_ASSERT(1 == parse_unix(uri, &addr));
+ GPR_ASSERT(1 == grpc_parse_unix(uri, &addr));
struct sockaddr_un *addr_un = (struct sockaddr_un *)addr.addr;
GPR_ASSERT(AF_UNIX == addr_un->sun_family);
GPR_ASSERT(0 == strcmp(addr_un->sun_path, pathname));
@@ -63,18 +63,18 @@ static void test_parse_unix(const char *uri_text, const char *pathname) {
#else /* GRPC_HAVE_UNIX_SOCKET */
-static void test_parse_unix(const char *uri_text, const char *pathname) {}
+static void test_grpc_parse_unix(const char *uri_text, const char *pathname) {}
#endif /* GRPC_HAVE_UNIX_SOCKET */
-static void test_parse_ipv4(const char *uri_text, const char *host,
- unsigned short port) {
+static void test_grpc_parse_ipv4(const char *uri_text, const char *host,
+ unsigned short port) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_uri *uri = grpc_uri_parse(&exec_ctx, uri_text, 0);
grpc_resolved_address addr;
char ntop_buf[INET_ADDRSTRLEN];
- GPR_ASSERT(1 == parse_ipv4(uri, &addr));
+ GPR_ASSERT(1 == grpc_parse_ipv4(uri, &addr));
struct sockaddr_in *addr_in = (struct sockaddr_in *)addr.addr;
GPR_ASSERT(AF_INET == addr_in->sin_family);
GPR_ASSERT(NULL != grpc_inet_ntop(AF_INET, &addr_in->sin_addr, ntop_buf,
@@ -86,14 +86,14 @@ static void test_parse_ipv4(const char *uri_text, const char *host,
grpc_exec_ctx_finish(&exec_ctx);
}
-static void test_parse_ipv6(const char *uri_text, const char *host,
- unsigned short port, uint32_t scope_id) {
+static void test_grpc_parse_ipv6(const char *uri_text, const char *host,
+ unsigned short port, uint32_t scope_id) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_uri *uri = grpc_uri_parse(&exec_ctx, uri_text, 0);
grpc_resolved_address addr;
char ntop_buf[INET6_ADDRSTRLEN];
- GPR_ASSERT(1 == parse_ipv6(uri, &addr));
+ GPR_ASSERT(1 == grpc_parse_ipv6(uri, &addr));
struct sockaddr_in6 *addr_in6 = (struct sockaddr_in6 *)addr.addr;
GPR_ASSERT(AF_INET6 == addr_in6->sin6_family);
GPR_ASSERT(NULL != grpc_inet_ntop(AF_INET6, &addr_in6->sin6_addr, ntop_buf,
@@ -109,8 +109,8 @@ static void test_parse_ipv6(const char *uri_text, const char *host,
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
- test_parse_unix("unix:/path/name", "/path/name");
- test_parse_ipv4("ipv4:192.0.2.1:12345", "192.0.2.1", 12345);
- test_parse_ipv6("ipv6:[2001:db8::1]:12345", "2001:db8::1", 12345, 0);
- test_parse_ipv6("ipv6:[2001:db8::1%252]:12345", "2001:db8::1", 12345, 2);
+ test_grpc_parse_unix("unix:/path/name", "/path/name");
+ test_grpc_parse_ipv4("ipv4:192.0.2.1:12345", "192.0.2.1", 12345);
+ test_grpc_parse_ipv6("ipv6:[2001:db8::1]:12345", "2001:db8::1", 12345, 0);
+ test_grpc_parse_ipv6("ipv6:[2001:db8::1%252]:12345", "2001:db8::1", 12345, 2);
}
diff --git a/test/core/client_channel/resolvers/BUILD b/test/core/client_channel/resolvers/BUILD
index af37072e3a..e8361cdef6 100644
--- a/test/core/client_channel/resolvers/BUILD
+++ b/test/core/client_channel/resolvers/BUILD
@@ -32,20 +32,48 @@ licenses(["notice"]) # 3-clause BSD
cc_test(
name = "dns_resolver_connectivity_test",
srcs = ["dns_resolver_connectivity_test.c"],
- deps = ["//:grpc", "//test/core/util:grpc_test_util", "//:gpr", "//test/core/util:gpr_test_util"],
- copts = ['-std=c99']
+ copts = ["-std=c99"],
+ deps = [
+ "//:gpr",
+ "//:grpc",
+ "//test/core/util:gpr_test_util",
+ "//test/core/util:grpc_test_util",
+ ],
)
cc_test(
name = "dns_resolver_test",
srcs = ["dns_resolver_test.c"],
- deps = ["//:grpc", "//test/core/util:grpc_test_util", "//:gpr", "//test/core/util:gpr_test_util"],
- copts = ['-std=c99']
+ copts = ["-std=c99"],
+ deps = [
+ "//:gpr",
+ "//:grpc",
+ "//test/core/util:gpr_test_util",
+ "//test/core/util:grpc_test_util",
+ ],
)
cc_test(
name = "sockaddr_resolver_test",
srcs = ["sockaddr_resolver_test.c"],
- deps = ["//:grpc", "//test/core/util:grpc_test_util", "//:gpr", "//test/core/util:gpr_test_util"],
- copts = ['-std=c99']
+ copts = ["-std=c99"],
+ deps = [
+ "//:gpr",
+ "//:grpc",
+ "//test/core/util:gpr_test_util",
+ "//test/core/util:grpc_test_util",
+ ],
+)
+
+cc_test(
+ name = "fake_resolver_test",
+ srcs = ["fake_resolver_test.c"],
+ copts = ["-std=c99"],
+ deps = [
+ "//:gpr",
+ "//:grpc",
+ "//test/core/end2end:fake_resolver",
+ "//test/core/util:gpr_test_util",
+ "//test/core/util:grpc_test_util",
+ ],
)
diff --git a/test/core/client_channel/resolvers/fake_resolver_test.c b/test/core/client_channel/resolvers/fake_resolver_test.c
new file mode 100644
index 0000000000..861918fbd6
--- /dev/null
+++ b/test/core/client_channel/resolvers/fake_resolver_test.c
@@ -0,0 +1,187 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
+#include "src/core/ext/filters/client_channel/parse_address.h"
+#include "src/core/ext/filters/client_channel/resolver_registry.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/iomgr/combiner.h"
+#include "src/core/lib/security/credentials/fake/fake_credentials.h"
+
+#include "test/core/end2end/fake_resolver.h"
+#include "test/core/util/test_config.h"
+
+static grpc_resolver *build_fake_resolver(
+ grpc_exec_ctx *exec_ctx, grpc_combiner *combiner,
+ grpc_fake_resolver_response_generator *response_generator) {
+ grpc_resolver_factory *factory = grpc_resolver_factory_lookup("test");
+ grpc_arg generator_arg =
+ grpc_fake_resolver_response_generator_arg(response_generator);
+ grpc_resolver_args args;
+ memset(&args, 0, sizeof(args));
+ grpc_channel_args channel_args = {1, &generator_arg};
+ args.args = &channel_args;
+ args.combiner = combiner;
+ grpc_resolver *resolver =
+ grpc_resolver_factory_create_resolver(exec_ctx, factory, &args);
+ grpc_resolver_factory_unref(factory);
+ return resolver;
+}
+
+typedef struct on_resolution_arg {
+ grpc_channel_args *resolver_result;
+ grpc_channel_args *expected_resolver_result;
+ bool was_called;
+} on_resolution_arg;
+
+void on_resolution_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+ on_resolution_arg *res = arg;
+ res->was_called = true;
+ // We only check the addresses channel arg because that's the only one
+ // explicitly set by the test via
+ // grpc_fake_resolver_response_generator_set_response.
+ const grpc_lb_addresses *actual_lb_addresses =
+ grpc_lb_addresses_find_channel_arg(res->resolver_result);
+ const grpc_lb_addresses *expected_lb_addresses =
+ grpc_lb_addresses_find_channel_arg(res->expected_resolver_result);
+ GPR_ASSERT(
+ grpc_lb_addresses_cmp(actual_lb_addresses, expected_lb_addresses) == 0);
+ grpc_channel_args_destroy(exec_ctx, res->resolver_result);
+ grpc_channel_args_destroy(exec_ctx, res->expected_resolver_result);
+}
+
+static void test_fake_resolver() {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_combiner *combiner = grpc_combiner_create(NULL);
+ // Create resolver.
+ grpc_fake_resolver_response_generator *response_generator =
+ grpc_fake_resolver_response_generator_create();
+ grpc_resolver *resolver =
+ build_fake_resolver(&exec_ctx, combiner, response_generator);
+ GPR_ASSERT(resolver != NULL);
+
+ // Setup expectations.
+ grpc_uri *uris[] = {grpc_uri_parse(&exec_ctx, "ipv4:10.2.1.1:1234", true),
+ grpc_uri_parse(&exec_ctx, "ipv4:127.0.0.1:4321", true)};
+ char *balancer_names[] = {"name1", "name2"};
+ const bool is_balancer[] = {true, false};
+ grpc_lb_addresses *addresses = grpc_lb_addresses_create(3, NULL);
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(uris); ++i) {
+ grpc_lb_addresses_set_address_from_uri(
+ addresses, i, uris[i], is_balancer[i], balancer_names[i], NULL);
+ grpc_uri_destroy(uris[i]);
+ }
+ const grpc_arg addresses_arg =
+ grpc_lb_addresses_create_channel_arg(addresses);
+ grpc_channel_args *results =
+ grpc_channel_args_copy_and_add(NULL, &addresses_arg, 1);
+ grpc_lb_addresses_destroy(&exec_ctx, addresses);
+ on_resolution_arg on_res_arg;
+ memset(&on_res_arg, 0, sizeof(on_res_arg));
+ on_res_arg.expected_resolver_result = results;
+ grpc_closure *on_resolution = grpc_closure_create(
+ on_resolution_cb, &on_res_arg, grpc_combiner_scheduler(combiner, false));
+
+ // Set resolver results and trigger first resolution. on_resolution_cb
+ // performs the checks.
+ grpc_fake_resolver_response_generator_set_response(
+ &exec_ctx, response_generator, results);
+ grpc_resolver_next_locked(&exec_ctx, resolver, &on_res_arg.resolver_result,
+ on_resolution);
+ grpc_exec_ctx_flush(&exec_ctx);
+ GPR_ASSERT(on_res_arg.was_called);
+
+ // Setup update.
+ grpc_uri *uris_update[] = {
+ grpc_uri_parse(&exec_ctx, "ipv4:192.168.1.0:31416", true)};
+ char *balancer_names_update[] = {"name3"};
+ const bool is_balancer_update[] = {false};
+ grpc_lb_addresses *addresses_update = grpc_lb_addresses_create(1, NULL);
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(uris_update); ++i) {
+ grpc_lb_addresses_set_address_from_uri(addresses_update, i, uris_update[i],
+ is_balancer_update[i],
+ balancer_names_update[i], NULL);
+ grpc_uri_destroy(uris_update[i]);
+ }
+
+ grpc_arg addresses_update_arg =
+ grpc_lb_addresses_create_channel_arg(addresses_update);
+ grpc_channel_args *results_update =
+ grpc_channel_args_copy_and_add(NULL, &addresses_update_arg, 1);
+ grpc_lb_addresses_destroy(&exec_ctx, addresses_update);
+
+ // Setup expectations for the update.
+ on_resolution_arg on_res_arg_update;
+ memset(&on_res_arg_update, 0, sizeof(on_res_arg_update));
+ on_res_arg_update.expected_resolver_result = results_update;
+ on_resolution = grpc_closure_create(on_resolution_cb, &on_res_arg_update,
+ grpc_combiner_scheduler(combiner, false));
+
+ // Set updated resolver results and trigger a second resolution.
+ grpc_fake_resolver_response_generator_set_response(
+ &exec_ctx, response_generator, results_update);
+ grpc_resolver_next_locked(&exec_ctx, resolver,
+ &on_res_arg_update.resolver_result, on_resolution);
+ grpc_exec_ctx_flush(&exec_ctx);
+ GPR_ASSERT(on_res_arg.was_called);
+
+ // Requesting a new resolution without re-senting the response shouldn't
+ // trigger the resolution callback.
+ memset(&on_res_arg, 0, sizeof(on_res_arg));
+ grpc_resolver_next_locked(&exec_ctx, resolver, &on_res_arg.resolver_result,
+ on_resolution);
+ grpc_exec_ctx_flush(&exec_ctx);
+ GPR_ASSERT(!on_res_arg.was_called);
+
+ GRPC_COMBINER_UNREF(&exec_ctx, combiner, "test_fake_resolver");
+ GRPC_RESOLVER_UNREF(&exec_ctx, resolver, "test_fake_resolver");
+ grpc_exec_ctx_finish(&exec_ctx);
+ grpc_fake_resolver_response_generator_unref(response_generator);
+}
+
+int main(int argc, char **argv) {
+ grpc_test_init(argc, argv);
+ grpc_fake_resolver_init(); // Registers the "test" scheme.
+ grpc_init();
+
+ test_fake_resolver();
+
+ grpc_shutdown();
+ return 0;
+}
diff --git a/test/core/end2end/BUILD b/test/core/end2end/BUILD
index 0cef7aa01d..ffea1cc4e8 100644
--- a/test/core/end2end/BUILD
+++ b/test/core/end2end/BUILD
@@ -32,49 +32,66 @@ licenses(["notice"]) # 3-clause BSD
load(":generate_tests.bzl", "grpc_end2end_tests")
cc_library(
- name = 'cq_verifier',
- srcs = ['cq_verifier.c'],
- hdrs = ['cq_verifier.h'],
- deps = ['//:gpr', '//:grpc', '//test/core/util:grpc_test_util'],
- copts = ['-std=c99'],
- visibility = ["//test:__subpackages__"],
+ name = "cq_verifier",
+ srcs = ["cq_verifier.c"],
+ hdrs = ["cq_verifier.h"],
+ copts = ["-std=c99"],
+ visibility = ["//test:__subpackages__"],
+ deps = [
+ "//:gpr",
+ "//:grpc",
+ "//test/core/util:grpc_test_util",
+ ],
)
cc_library(
- name = 'ssl_test_data',
- visibility = ["//test:__subpackages__"],
- hdrs = ['data/ssl_test_data.h'],
- copts = ['-std=c99'],
- srcs = [
- "data/client_certs.c",
- "data/server1_cert.c",
- "data/server1_key.c",
- "data/test_root_cert.c",
- ]
+ name = "ssl_test_data",
+ srcs = [
+ "data/client_certs.c",
+ "data/server1_cert.c",
+ "data/server1_key.c",
+ "data/test_root_cert.c",
+ ],
+ hdrs = ["data/ssl_test_data.h"],
+ copts = ["-std=c99"],
+ visibility = ["//test:__subpackages__"],
)
cc_library(
- name = 'fake_resolver',
- hdrs = ['fake_resolver.h'],
- srcs = ['fake_resolver.c'],
- copts = ['-std=c99'],
- deps = ['//:gpr', '//:grpc', '//test/core/util:grpc_test_util']
+ name = "fake_resolver",
+ srcs = ["fake_resolver.c"],
+ hdrs = ["fake_resolver.h"],
+ copts = ["-std=c99"],
+ visibility = ["//test:__subpackages__"],
+ deps = [
+ "//:gpr",
+ "//:grpc",
+ "//test/core/util:grpc_test_util",
+ ],
)
cc_library(
- name = 'http_proxy',
- hdrs = ['fixtures/http_proxy_fixture.h'],
- srcs = ['fixtures/http_proxy_fixture.c'],
- copts = ['-std=c99'],
- deps = ['//:gpr', '//:grpc', '//test/core/util:grpc_test_util']
+ name = "http_proxy",
+ srcs = ["fixtures/http_proxy_fixture.c"],
+ hdrs = ["fixtures/http_proxy_fixture.h"],
+ copts = ["-std=c99"],
+ deps = [
+ "//:gpr",
+ "//:grpc",
+ "//test/core/util:grpc_test_util",
+ ],
)
cc_library(
- name = 'proxy',
- hdrs = ['fixtures/proxy.h'],
- srcs = ['fixtures/proxy.c'],
- copts = ['-std=c99'],
- deps = ['//:gpr', '//:grpc', '//test/core/util:grpc_test_util']
+ name = "proxy",
+ srcs = ["fixtures/proxy.c"],
+ hdrs = ["fixtures/proxy.h"],
+ copts = ["-std=c99"],
+ deps = [
+ "//:gpr",
+ "//:grpc",
+ "//test/core/util:grpc_test_util",
+ ],
)
grpc_end2end_tests()
diff --git a/test/core/end2end/fake_resolver.c b/test/core/end2end/fake_resolver.c
index 1c7dd1339c..6a71c20b80 100644
--- a/test/core/end2end/fake_resolver.c
+++ b/test/core/end2end/fake_resolver.c
@@ -32,6 +32,7 @@
// This is similar to the sockaddr resolver, except that it supports a
// bunch of query args that are useful for dependency injection in tests.
+#include <limits.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
@@ -46,12 +47,18 @@
#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/ext/filters/client_channel/resolver_registry.h"
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/unix_sockets_posix.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"
+#include "test/core/end2end/fake_resolver.h"
+
+#define GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR \
+ "grpc.fake_resolver.response_generator"
+
//
// fake_resolver
//
@@ -62,12 +69,11 @@ typedef struct {
// passed-in parameters
grpc_channel_args* channel_args;
- grpc_lb_addresses* addresses;
- // mutex guarding the rest of the state
- gpr_mu mu;
- // have we published?
- bool published;
+ // If not NULL, the next set of resolution results to be returned to
+ // grpc_resolver_next_locked()'s closure.
+ grpc_channel_args* next_results;
+
// pending next completion, or NULL
grpc_closure* next_completion;
// target result address for next completion
@@ -76,60 +82,137 @@ typedef struct {
static void fake_resolver_destroy(grpc_exec_ctx* exec_ctx, grpc_resolver* gr) {
fake_resolver* r = (fake_resolver*)gr;
- gpr_mu_destroy(&r->mu);
+ grpc_channel_args_destroy(exec_ctx, r->next_results);
grpc_channel_args_destroy(exec_ctx, r->channel_args);
- grpc_lb_addresses_destroy(exec_ctx, r->addresses);
gpr_free(r);
}
-static void fake_resolver_shutdown(grpc_exec_ctx* exec_ctx,
- grpc_resolver* resolver) {
+static void fake_resolver_shutdown_locked(grpc_exec_ctx* exec_ctx,
+ grpc_resolver* resolver) {
fake_resolver* r = (fake_resolver*)resolver;
- gpr_mu_lock(&r->mu);
if (r->next_completion != NULL) {
*r->target_result = NULL;
grpc_closure_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE);
r->next_completion = NULL;
}
- gpr_mu_unlock(&r->mu);
}
static void fake_resolver_maybe_finish_next_locked(grpc_exec_ctx* exec_ctx,
fake_resolver* r) {
- if (r->next_completion != NULL && !r->published) {
- r->published = true;
- grpc_arg arg = grpc_lb_addresses_create_channel_arg(r->addresses);
+ if (r->next_completion != NULL && r->next_results != NULL) {
*r->target_result =
- grpc_channel_args_copy_and_add(r->channel_args, &arg, 1);
+ grpc_channel_args_merge(r->channel_args, r->next_results);
+ grpc_channel_args_destroy(exec_ctx, r->next_results);
grpc_closure_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE);
r->next_completion = NULL;
+ r->next_results = NULL;
}
}
-static void fake_resolver_channel_saw_error(grpc_exec_ctx* exec_ctx,
- grpc_resolver* resolver) {
+static void fake_resolver_channel_saw_error_locked(grpc_exec_ctx* exec_ctx,
+ grpc_resolver* resolver) {
fake_resolver* r = (fake_resolver*)resolver;
- gpr_mu_lock(&r->mu);
- r->published = false;
fake_resolver_maybe_finish_next_locked(exec_ctx, r);
- gpr_mu_unlock(&r->mu);
}
-static void fake_resolver_next(grpc_exec_ctx* exec_ctx, grpc_resolver* resolver,
- grpc_channel_args** target_result,
- grpc_closure* on_complete) {
+static void fake_resolver_next_locked(grpc_exec_ctx* exec_ctx,
+ grpc_resolver* resolver,
+ grpc_channel_args** target_result,
+ grpc_closure* on_complete) {
fake_resolver* r = (fake_resolver*)resolver;
- gpr_mu_lock(&r->mu);
GPR_ASSERT(!r->next_completion);
r->next_completion = on_complete;
r->target_result = target_result;
fake_resolver_maybe_finish_next_locked(exec_ctx, r);
- gpr_mu_unlock(&r->mu);
}
static const grpc_resolver_vtable fake_resolver_vtable = {
- fake_resolver_destroy, fake_resolver_shutdown,
- fake_resolver_channel_saw_error, fake_resolver_next};
+ fake_resolver_destroy, fake_resolver_shutdown_locked,
+ fake_resolver_channel_saw_error_locked, fake_resolver_next_locked};
+
+struct grpc_fake_resolver_response_generator {
+ fake_resolver* resolver; // Set by the fake_resolver constructor to itself.
+ grpc_channel_args* next_response;
+ gpr_refcount refcount;
+};
+
+grpc_fake_resolver_response_generator*
+grpc_fake_resolver_response_generator_create() {
+ grpc_fake_resolver_response_generator* generator =
+ gpr_zalloc(sizeof(*generator));
+ gpr_ref_init(&generator->refcount, 1);
+ return generator;
+}
+
+grpc_fake_resolver_response_generator*
+grpc_fake_resolver_response_generator_ref(
+ grpc_fake_resolver_response_generator* generator) {
+ gpr_ref(&generator->refcount);
+ return generator;
+}
+
+void grpc_fake_resolver_response_generator_unref(
+ grpc_fake_resolver_response_generator* generator) {
+ if (gpr_unref(&generator->refcount)) {
+ gpr_free(generator);
+ }
+}
+
+static void set_response_cb(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ grpc_fake_resolver_response_generator* generator = arg;
+ fake_resolver* r = generator->resolver;
+ if (r->next_results != NULL) {
+ grpc_channel_args_destroy(exec_ctx, r->next_results);
+ }
+ r->next_results = generator->next_response;
+ fake_resolver_maybe_finish_next_locked(exec_ctx, r);
+}
+
+void grpc_fake_resolver_response_generator_set_response(
+ grpc_exec_ctx* exec_ctx, grpc_fake_resolver_response_generator* generator,
+ grpc_channel_args* next_response) {
+ GPR_ASSERT(generator->resolver != NULL);
+ generator->next_response = grpc_channel_args_copy(next_response);
+ grpc_closure_sched(
+ exec_ctx,
+ grpc_closure_create(
+ set_response_cb, generator,
+ grpc_combiner_scheduler(generator->resolver->base.combiner, false)),
+ GRPC_ERROR_NONE);
+}
+
+static void* response_generator_arg_copy(void* p) {
+ return grpc_fake_resolver_response_generator_ref(p);
+}
+
+static void response_generator_arg_destroy(grpc_exec_ctx* exec_ctx, void* p) {
+ grpc_fake_resolver_response_generator_unref(p);
+}
+
+static int response_generator_cmp(void* a, void* b) { return GPR_ICMP(a, b); }
+
+static const grpc_arg_pointer_vtable response_generator_arg_vtable = {
+ response_generator_arg_copy, response_generator_arg_destroy,
+ response_generator_cmp};
+
+grpc_arg grpc_fake_resolver_response_generator_arg(
+ grpc_fake_resolver_response_generator* generator) {
+ grpc_arg arg;
+ arg.type = GRPC_ARG_POINTER;
+ arg.key = GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR;
+ arg.value.pointer.p = generator;
+ arg.value.pointer.vtable = &response_generator_arg_vtable;
+ return arg;
+}
+
+grpc_fake_resolver_response_generator*
+grpc_fake_resolver_get_response_generator(const grpc_channel_args* args) {
+ const grpc_arg* arg =
+ grpc_channel_args_find(args, GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR);
+ if (arg == NULL || arg->type != GRPC_ARG_POINTER) return NULL;
+ return arg->value.pointer.p;
+}
//
// fake_resolver_factory
@@ -139,81 +222,15 @@ static void fake_resolver_factory_ref(grpc_resolver_factory* factory) {}
static void fake_resolver_factory_unref(grpc_resolver_factory* factory) {}
-static void do_nothing(void* ignored) {}
-
static grpc_resolver* fake_resolver_create(grpc_exec_ctx* exec_ctx,
grpc_resolver_factory* factory,
grpc_resolver_args* args) {
- if (0 != strcmp(args->uri->authority, "")) {
- gpr_log(GPR_ERROR, "authority based uri's not supported by the %s scheme",
- args->uri->scheme);
- return NULL;
- }
- // Get lb_enabled arg. Anything other than "0" is interpreted as true.
- const char* lb_enabled_qpart =
- grpc_uri_get_query_arg(args->uri, "lb_enabled");
- const bool lb_enabled =
- lb_enabled_qpart != NULL && strcmp("0", lb_enabled_qpart) != 0;
-
- // Get the balancer's names.
- const char* balancer_names =
- grpc_uri_get_query_arg(args->uri, "balancer_names");
- grpc_slice_buffer balancer_names_parts;
- grpc_slice_buffer_init(&balancer_names_parts);
- if (balancer_names != NULL) {
- const grpc_slice balancer_names_slice =
- grpc_slice_from_copied_string(balancer_names);
- grpc_slice_split(balancer_names_slice, ",", &balancer_names_parts);
- grpc_slice_unref(balancer_names_slice);
- }
-
- // Construct addresses.
- grpc_slice path_slice =
- grpc_slice_new(args->uri->path, strlen(args->uri->path), do_nothing);
- grpc_slice_buffer path_parts;
- grpc_slice_buffer_init(&path_parts);
- grpc_slice_split(path_slice, ",", &path_parts);
- if (balancer_names_parts.count > 0 &&
- path_parts.count != balancer_names_parts.count) {
- gpr_log(GPR_ERROR,
- "Balancer names present but mismatched with number of addresses: "
- "%lu balancer names != %lu addresses",
- (unsigned long)balancer_names_parts.count,
- (unsigned long)path_parts.count);
- return NULL;
- }
- grpc_lb_addresses* addresses =
- grpc_lb_addresses_create(path_parts.count, NULL /* user_data_vtable */);
- bool errors_found = false;
- for (size_t i = 0; i < addresses->num_addresses; i++) {
- grpc_uri ith_uri = *args->uri;
- char* part_str = grpc_slice_to_c_string(path_parts.slices[i]);
- ith_uri.path = part_str;
- if (!parse_ipv4(&ith_uri, &addresses->addresses[i].address)) {
- errors_found = true;
- }
- gpr_free(part_str);
- if (errors_found) break;
- addresses->addresses[i].is_balancer = lb_enabled;
- addresses->addresses[i].balancer_name =
- balancer_names_parts.count > 0
- ? grpc_dump_slice(balancer_names_parts.slices[i], GPR_DUMP_ASCII)
- : NULL;
- }
- grpc_slice_buffer_destroy_internal(exec_ctx, &path_parts);
- grpc_slice_buffer_destroy_internal(exec_ctx, &balancer_names_parts);
- grpc_slice_unref(path_slice);
- if (errors_found) {
- grpc_lb_addresses_destroy(exec_ctx, addresses);
- return NULL;
- }
- // Instantiate resolver.
- fake_resolver* r = gpr_malloc(sizeof(fake_resolver));
- memset(r, 0, sizeof(*r));
+ fake_resolver* r = gpr_zalloc(sizeof(*r));
r->channel_args = grpc_channel_args_copy(args->args);
- r->addresses = addresses;
- gpr_mu_init(&r->mu);
grpc_resolver_init(&r->base, &fake_resolver_vtable, args->combiner);
+ grpc_fake_resolver_response_generator* response_generator =
+ grpc_fake_resolver_get_response_generator(args->args);
+ if (response_generator != NULL) response_generator->resolver = r;
return &r->base;
}
diff --git a/test/core/end2end/fake_resolver.h b/test/core/end2end/fake_resolver.h
index 7a30347f30..447289adef 100644
--- a/test/core/end2end/fake_resolver.h
+++ b/test/core/end2end/fake_resolver.h
@@ -32,8 +32,42 @@
#ifndef GRPC_TEST_CORE_END2END_FAKE_RESOLVER_H
#define GRPC_TEST_CORE_END2END_FAKE_RESOLVER_H
+#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
+#include "src/core/ext/filters/client_channel/uri_parser.h"
+#include "src/core/lib/channel/channel_args.h"
+
#include "test/core/util/test_config.h"
void grpc_fake_resolver_init();
+// Instances of \a grpc_fake_resolver_response_generator are passed to the
+// fake resolver in a channel argument (see \a
+// grpc_fake_resolver_response_generator_arg) in order to inject and trigger
+// custom resolutions. See also \a
+// grpc_fake_resolver_response_generator_set_response.
+typedef struct grpc_fake_resolver_response_generator
+ grpc_fake_resolver_response_generator;
+grpc_fake_resolver_response_generator*
+grpc_fake_resolver_response_generator_create();
+
+// Instruct the fake resolver associated with the \a response_generator instance
+// to trigger a new resolution for \a uri and \a args.
+void grpc_fake_resolver_response_generator_set_response(
+ grpc_exec_ctx* exec_ctx, grpc_fake_resolver_response_generator* generator,
+ grpc_channel_args* next_response);
+
+// Return a \a grpc_arg for a \a grpc_fake_resolver_response_generator instance.
+grpc_arg grpc_fake_resolver_response_generator_arg(
+ grpc_fake_resolver_response_generator* generator);
+// Return the \a grpc_fake_resolver_response_generator instance in \a args or
+// NULL.
+grpc_fake_resolver_response_generator*
+grpc_fake_resolver_get_response_generator(const grpc_channel_args* args);
+
+grpc_fake_resolver_response_generator*
+grpc_fake_resolver_response_generator_ref(
+ grpc_fake_resolver_response_generator* generator);
+void grpc_fake_resolver_response_generator_unref(
+ grpc_fake_resolver_response_generator* generator);
+
#endif /* GRPC_TEST_CORE_END2END_FAKE_RESOLVER_H */
diff --git a/test/core/end2end/fuzzers/api_fuzzer.c b/test/core/end2end/fuzzers/api_fuzzer.c
index 8b8983ee17..83c82ba5c0 100644
--- a/test/core/end2end/fuzzers/api_fuzzer.c
+++ b/test/core/end2end/fuzzers/api_fuzzer.c
@@ -932,6 +932,9 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) {
}
uint32_t propagation_mask = read_uint32(&inp);
grpc_slice method = read_string_like_slice(&inp);
+ if (GRPC_SLICE_LENGTH(method) == 0) {
+ ok = false;
+ }
grpc_slice host = read_string_like_slice(&inp);
gpr_timespec deadline =
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
diff --git a/test/core/end2end/fuzzers/api_fuzzer_corpus/clusterfuzz-testcase-5867145026076672 b/test/core/end2end/fuzzers/api_fuzzer_corpus/clusterfuzz-testcase-5867145026076672
new file mode 100644
index 0000000000..3fd5427b46
--- /dev/null
+++ b/test/core/end2end/fuzzers/api_fuzzer_corpus/clusterfuzz-testcase-5867145026076672
Binary files differ
diff --git a/test/core/iomgr/udp_server_test.c b/test/core/iomgr/udp_server_test.c
index 12d8406323..1f1696a7a7 100644
--- a/test/core/iomgr/udp_server_test.c
+++ b/test/core/iomgr/udp_server_test.c
@@ -91,7 +91,7 @@ static void on_write(grpc_exec_ctx *exec_ctx, grpc_fd *emfd, void *user_data) {
}
static void on_fd_orphaned(grpc_exec_ctx *exec_ctx, grpc_fd *emfd,
- void *user_data) {
+ grpc_closure *closure, void *user_data) {
gpr_log(GPR_INFO, "gRPC FD about to be orphaned: %d",
grpc_fd_wrapped_fd(emfd));
g_number_of_orphan_calls++;
@@ -228,9 +228,9 @@ static void test_no_op_with_port_and_start(void) {
grpc_udp_server_destroy(&exec_ctx, s, NULL);
grpc_exec_ctx_finish(&exec_ctx);
- /* The server had a single FD, which is orphaned once in *
- * deactivated_all_ports, and once in grpc_udp_server_destroy. */
- GPR_ASSERT(g_number_of_orphan_calls == 2);
+ /* The server had a single FD, which is orphaned exactly once in *
+ * grpc_udp_server_destroy. */
+ GPR_ASSERT(g_number_of_orphan_calls == 1);
}
static void test_receive(int number_of_clients) {
@@ -297,9 +297,9 @@ static void test_receive(int number_of_clients) {
grpc_udp_server_destroy(&exec_ctx, s, NULL);
grpc_exec_ctx_finish(&exec_ctx);
- /* The server had a single FD, which is orphaned once in *
- * deactivated_all_ports, and once in grpc_udp_server_destroy. */
- GPR_ASSERT(g_number_of_orphan_calls == 2);
+ /* The server had a single FD, which is orphaned exactly once in *
+ * grpc_udp_server_destroy. */
+ GPR_ASSERT(g_number_of_orphan_calls == 1);
/* The write callback should have fired a few times. */
GPR_ASSERT(g_number_of_writes > 0);
diff --git a/test/cpp/grpclb/grpclb_api_test.cc b/test/cpp/grpclb/grpclb_api_test.cc
index d9df2bb673..0b569f1415 100644
--- a/test/cpp/grpclb/grpclb_api_test.cc
+++ b/test/cpp/grpclb/grpclb_api_test.cc
@@ -106,11 +106,13 @@ TEST_F(GrpclbTest, ParseResponseServerList) {
auto* server = serverlist->add_servers();
server->set_ip_address(Ip4ToPackedString("127.0.0.1"));
server->set_port(12345);
- server->set_drop_request(true);
+ server->set_drop_for_rate_limiting(true);
+ server->set_drop_for_load_balancing(false);
server = response.mutable_server_list()->add_servers();
server->set_ip_address(Ip4ToPackedString("10.0.0.1"));
server->set_port(54321);
- server->set_drop_request(false);
+ server->set_drop_for_rate_limiting(false);
+ server->set_drop_for_load_balancing(true);
auto* expiration_interval = serverlist->mutable_expiration_interval();
expiration_interval->set_seconds(888);
expiration_interval->set_nanos(999);
@@ -125,12 +127,14 @@ TEST_F(GrpclbTest, ParseResponseServerList) {
EXPECT_EQ(PackedStringToIp(c_serverlist->servers[0]->ip_address),
"127.0.0.1");
EXPECT_EQ(c_serverlist->servers[0]->port, 12345);
- EXPECT_TRUE(c_serverlist->servers[0]->drop_request);
+ EXPECT_TRUE(c_serverlist->servers[0]->drop_for_rate_limiting);
+ EXPECT_FALSE(c_serverlist->servers[0]->drop_for_load_balancing);
EXPECT_TRUE(c_serverlist->servers[1]->has_ip_address);
EXPECT_EQ(PackedStringToIp(c_serverlist->servers[1]->ip_address), "10.0.0.1");
EXPECT_EQ(c_serverlist->servers[1]->port, 54321);
- EXPECT_FALSE(c_serverlist->servers[1]->drop_request);
+ EXPECT_FALSE(c_serverlist->servers[1]->drop_for_rate_limiting);
+ EXPECT_TRUE(c_serverlist->servers[1]->drop_for_load_balancing);
EXPECT_TRUE(c_serverlist->expiration_interval.has_seconds);
EXPECT_EQ(c_serverlist->expiration_interval.seconds, 888);
diff --git a/test/cpp/grpclb/grpclb_test.cc b/test/cpp/grpclb/grpclb_test.cc
index a80431f46a..997a8391eb 100644
--- a/test/cpp/grpclb/grpclb_test.cc
+++ b/test/cpp/grpclb/grpclb_test.cc
@@ -573,34 +573,51 @@ static void perform_request(client_fixture *cf) {
static void setup_client(const server_fixture *lb_server,
const server_fixture *backends, client_fixture *cf) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- char *lb_uri;
- // The grpclb LB policy will be automatically selected by virtue of
- // the fact that the returned addresses are balancer addresses.
- gpr_asprintf(&lb_uri, "test:///%s?lb_enabled=1&balancer_names=%s",
- lb_server->servers_hostport, lb_server->balancer_name);
-
- grpc_arg expected_target_arg;
- expected_target_arg.type = GRPC_ARG_STRING;
- expected_target_arg.key =
- const_cast<char *>(GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS);
char *expected_target_names = NULL;
const char *backends_name = lb_server->servers_hostport;
gpr_asprintf(&expected_target_names, "%s;%s", backends_name, BALANCERS_NAME);
- expected_target_arg.value.string = const_cast<char *>(expected_target_names);
+ grpc_fake_resolver_response_generator *response_generator =
+ grpc_fake_resolver_response_generator_create();
+
+ grpc_lb_addresses *addresses = grpc_lb_addresses_create(1, NULL);
+ char *lb_uri_str;
+ gpr_asprintf(&lb_uri_str, "ipv4:%s", lb_server->servers_hostport);
+ grpc_uri *lb_uri = grpc_uri_parse(&exec_ctx, lb_uri_str, true);
+ GPR_ASSERT(lb_uri != NULL);
+ grpc_lb_addresses_set_address_from_uri(addresses, 0, lb_uri, true,
+ lb_server->balancer_name, NULL);
+ grpc_uri_destroy(lb_uri);
+ gpr_free(lb_uri_str);
+
+ gpr_asprintf(&cf->server_uri, "test:///%s", lb_server->servers_hostport);
+ const grpc_arg fake_addresses =
+ grpc_lb_addresses_create_channel_arg(addresses);
+ grpc_channel_args *fake_result =
+ grpc_channel_args_copy_and_add(NULL, &fake_addresses, 1);
+ grpc_lb_addresses_destroy(&exec_ctx, addresses);
+
+ const grpc_arg new_args[] = {
+ grpc_fake_transport_expected_targets_arg(expected_target_names),
+ grpc_fake_resolver_response_generator_arg(response_generator)};
+
grpc_channel_args *args =
- grpc_channel_args_copy_and_add(NULL, &expected_target_arg, 1);
+ grpc_channel_args_copy_and_add(NULL, new_args, GPR_ARRAY_SIZE(new_args));
gpr_free(expected_target_names);
cf->cq = grpc_completion_queue_create_for_next(NULL);
- cf->server_uri = lb_uri;
grpc_channel_credentials *fake_creds =
grpc_fake_transport_security_credentials_create();
cf->client =
grpc_secure_channel_create(fake_creds, cf->server_uri, args, NULL);
+ grpc_fake_resolver_response_generator_set_response(
+ &exec_ctx, response_generator, fake_result);
+ grpc_channel_args_destroy(&exec_ctx, fake_result);
grpc_channel_credentials_unref(&exec_ctx, fake_creds);
grpc_channel_args_destroy(&exec_ctx, args);
+ grpc_fake_resolver_response_generator_unref(response_generator);
+ grpc_exec_ctx_finish(&exec_ctx);
}
static void teardown_client(client_fixture *cf) {
@@ -787,8 +804,8 @@ TEST(GrpclbTest, InvalidAddressInServerlist) {}
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
- grpc_test_init(argc, argv);
grpc_fake_resolver_init();
+ grpc_test_init(argc, argv);
grpc_init();
const auto result = RUN_ALL_TESTS();
grpc_shutdown();
diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc
index c89f349ca7..8c5413b5fd 100644
--- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc
+++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc
@@ -569,12 +569,17 @@ static void BM_TransportStreamRecv(benchmark::State &state) {
grpc_closure_sched(exec_ctx, c.get(), GRPC_ERROR_NONE);
return;
}
- } while (grpc_byte_stream_next(exec_ctx, recv_stream, &recv_slice,
+ } while (grpc_byte_stream_next(exec_ctx, recv_stream,
recv_stream->length - received,
- drain_continue.get()));
+ drain_continue.get()) &&
+ GRPC_ERROR_NONE ==
+ grpc_byte_stream_pull(exec_ctx, recv_stream, &recv_slice) &&
+ (received += GRPC_SLICE_LENGTH(recv_slice),
+ grpc_slice_unref_internal(exec_ctx, recv_slice), true));
});
drain_continue = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) {
+ grpc_byte_stream_pull(exec_ctx, recv_stream, &recv_slice);
received += GRPC_SLICE_LENGTH(recv_slice);
grpc_slice_unref_internal(exec_ctx, recv_slice);
grpc_closure_run(exec_ctx, drain.get(), GRPC_ERROR_NONE);
diff --git a/tools/jenkins/run_performance.sh b/tools/jenkins/run_performance.sh
index dee033a951..544e31dcbd 100755
--- a/tools/jenkins/run_performance.sh
+++ b/tools/jenkins/run_performance.sh
@@ -32,7 +32,7 @@
set -ex
# List of benchmarks that provide good signal for analyzing performance changes in pull requests
-BENCHMARKS_TO_RUN="bm_closure bm_cq bm_call_create bm_error bm_chttp2_hpack bm_chttp2_transport bm_pollset bm_metadata"
+BENCHMARKS_TO_RUN="bm_fullstack_unary_ping_pong bm_fullstack_streaming_ping_pong bm_fullstack_streaming_pump bm_closure bm_cq bm_call_create bm_error bm_chttp2_hpack bm_chttp2_transport bm_pollset bm_metadata"
# Enter the gRPC repo root
cd $(dirname $0)/../..
diff --git a/tools/profiling/microbenchmarks/bm_diff.py b/tools/profiling/microbenchmarks/bm_diff.py
index 35a79593b4..ba0c225f6c 100755
--- a/tools/profiling/microbenchmarks/bm_diff.py
+++ b/tools/profiling/microbenchmarks/bm_diff.py
@@ -41,6 +41,22 @@ import pipes
import os
sys.path.append(os.path.join(os.path.dirname(sys.argv[0]), '..', '..', 'run_tests', 'python_utils'))
import comment_on_pr
+import jobset
+import itertools
+import speedup
+import random
+import shutil
+import errno
+
+_INTERESTING = (
+ 'cpu_time',
+ 'real_time',
+ 'locks_per_iteration',
+ 'allocs_per_iteration',
+ 'writes_per_iteration',
+ 'atm_cas_per_iteration',
+ 'atm_add_per_iteration',
+)
def changed_ratio(n, o):
if float(o) <= .0001: o = 0
@@ -60,26 +76,6 @@ def median(ary):
def min_change(pct):
return lambda n, o: abs(changed_ratio(n,o)) > pct/100.0
-nanos = {
- 'abs_diff': 5,
- 'pct_diff': 10,
-}
-counter = {
- 'abs_diff': 0.5,
- 'pct_diff': 10,
-}
-
-_INTERESTING = {
- 'cpu_time': nanos,
- 'real_time': nanos,
- 'locks_per_iteration': counter,
- 'allocs_per_iteration': counter,
- 'writes_per_iteration': counter,
- 'atm_cas_per_iteration': counter,
- 'atm_add_per_iteration': counter,
-}
-
-
_AVAILABLE_BENCHMARK_TESTS = ['bm_fullstack_unary_ping_pong',
'bm_fullstack_streaming_ping_pong',
'bm_fullstack_streaming_pump',
@@ -95,14 +91,15 @@ _AVAILABLE_BENCHMARK_TESTS = ['bm_fullstack_unary_ping_pong',
argp = argparse.ArgumentParser(description='Perform diff on microbenchmarks')
argp.add_argument('-t', '--track',
- choices=sorted(_INTERESTING.keys()),
+ choices=sorted(_INTERESTING),
nargs='+',
- default=sorted(_INTERESTING.keys()),
+ default=sorted(_INTERESTING),
help='Which metrics to track')
argp.add_argument('-b', '--benchmarks', nargs='+', choices=_AVAILABLE_BENCHMARK_TESTS, default=['bm_cq'])
argp.add_argument('-d', '--diff_base', type=str)
-argp.add_argument('-r', '--repetitions', type=int, default=4)
-argp.add_argument('-p', '--p_threshold', type=float, default=0.01)
+argp.add_argument('-r', '--repetitions', type=int, default=1)
+argp.add_argument('-l', '--loops', type=int, default=12)
+argp.add_argument('-j', '--jobs', type=int, default=multiprocessing.cpu_count())
args = argp.parse_args()
assert args.diff_base
@@ -117,9 +114,10 @@ def avg(lst):
def make_cmd(cfg):
return ['make'] + args.benchmarks + [
- 'CONFIG=%s' % cfg, '-j', '%d' % multiprocessing.cpu_count()]
+ 'CONFIG=%s' % cfg, '-j', '%d' % args.jobs]
-def build():
+def build(dest):
+ shutil.rmtree('bm_diff_%s' % dest, ignore_errors=True)
subprocess.check_call(['git', 'submodule', 'update'])
try:
subprocess.check_call(make_cmd('opt'))
@@ -128,38 +126,38 @@ def build():
subprocess.check_call(['make', 'clean'])
subprocess.check_call(make_cmd('opt'))
subprocess.check_call(make_cmd('counters'))
+ os.rename('bins', 'bm_diff_%s' % dest)
-def collect1(bm, cfg, ver):
- cmd = ['bins/%s/%s' % (cfg, bm),
- '--benchmark_out=%s.%s.%s.json' % (bm, cfg, ver),
+def collect1(bm, cfg, ver, idx):
+ cmd = ['bm_diff_%s/%s/%s' % (ver, cfg, bm),
+ '--benchmark_out=%s.%s.%s.%d.json' % (bm, cfg, ver, idx),
'--benchmark_out_format=json',
'--benchmark_repetitions=%d' % (args.repetitions)
]
- print cmd
- subprocess.check_call(cmd)
+ return jobset.JobSpec(cmd, shortname='%s %s %s %d/%d' % (bm, cfg, ver, idx+1, args.loops),
+ verbose_success=True, timeout_seconds=None)
-build()
-for bm in args.benchmarks:
- collect1(bm, 'opt', 'new')
- collect1(bm, 'counters', 'new')
+build('new')
where_am_i = subprocess.check_output(['git', 'rev-parse', '--abbrev-ref', 'HEAD']).strip()
subprocess.check_call(['git', 'checkout', args.diff_base])
-
try:
- build()
- comparables = []
- for bm in args.benchmarks:
- try:
- collect1(bm, 'opt', 'old')
- collect1(bm, 'counters', 'old')
- comparables.append(bm)
- except subprocess.CalledProcessError, e:
- pass
+ build('old')
finally:
subprocess.check_call(['git', 'checkout', where_am_i])
subprocess.check_call(['git', 'submodule', 'update'])
+jobs = []
+for loop in range(0, args.loops):
+ jobs.extend(x for x in itertools.chain(
+ (collect1(bm, 'opt', 'new', loop) for bm in args.benchmarks),
+ (collect1(bm, 'counters', 'new', loop) for bm in args.benchmarks),
+ (collect1(bm, 'opt', 'old', loop) for bm in args.benchmarks),
+ (collect1(bm, 'counters', 'old', loop) for bm in args.benchmarks),
+ ))
+random.shuffle(jobs, random.SystemRandom().random)
+
+jobset.run(jobs, maxjobs=args.jobs)
class Benchmark:
@@ -180,16 +178,11 @@ class Benchmark:
new = self.samples[True][f]
old = self.samples[False][f]
if not new or not old: continue
- p = stats.ttest_ind(new, old)[1]
- new_mdn = median(new)
- old_mdn = median(old)
- delta = new_mdn - old_mdn
- ratio = changed_ratio(new_mdn, old_mdn)
- print '%s: new=%r old=%r new_mdn=%f old_mdn=%f delta=%f(%f:%f) ratio=%f(%f:%f) p=%f' % (
- f, new, old, new_mdn, old_mdn, delta, abs(delta), _INTERESTING[f]['abs_diff'], ratio, abs(ratio), _INTERESTING[f]['pct_diff']/100.0, p
- )
- if p < args.p_threshold and abs(delta) > _INTERESTING[f]['abs_diff'] and abs(ratio) > _INTERESTING[f]['pct_diff']/100.0:
- self.final[f] = delta
+ mdn_diff = abs(median(new) - median(old))
+ print '%s: new=%r old=%r mdn_diff=%r' % (f, new, old, mdn_diff)
+ s = speedup.speedup(new, old)
+ if abs(s) > 3 and mdn_diff > 0.5:
+ self.final[f] = '%+d%%' % s
return self.final.keys()
def skip(self):
@@ -199,28 +192,37 @@ class Benchmark:
return [self.final[f] if f in self.final else '' for f in flds]
+def read_file(filename):
+ while True:
+ try:
+ with open(filename) as f:
+ return f.read()
+ except IOError, e:
+ if e.errno != errno.EINTR:
+ raise
+
+def read_json(filename):
+ return json.loads(read_file(filename))
+
benchmarks = collections.defaultdict(Benchmark)
-for bm in comparables:
- with open('%s.counters.new.json' % bm) as f:
- js_new_ctr = json.loads(f.read())
- with open('%s.opt.new.json' % bm) as f:
- js_new_opt = json.loads(f.read())
- with open('%s.counters.old.json' % bm) as f:
- js_old_ctr = json.loads(f.read())
- with open('%s.opt.old.json' % bm) as f:
- js_old_opt = json.loads(f.read())
-
- for row in bm_json.expand_json(js_new_ctr, js_new_opt):
- print row
- name = row['cpp_name']
- if name.endswith('_mean') or name.endswith('_stddev'): continue
- benchmarks[name].add_sample(row, True)
- for row in bm_json.expand_json(js_old_ctr, js_old_opt):
- print row
- name = row['cpp_name']
- if name.endswith('_mean') or name.endswith('_stddev'): continue
- benchmarks[name].add_sample(row, False)
+for bm in args.benchmarks:
+ for loop in range(0, args.loops):
+ js_new_ctr = read_json('%s.counters.new.%d.json' % (bm, loop))
+ js_new_opt = read_json('%s.opt.new.%d.json' % (bm, loop))
+ js_old_ctr = read_json('%s.counters.old.%d.json' % (bm, loop))
+ js_old_opt = read_json('%s.opt.old.%d.json' % (bm, loop))
+
+ for row in bm_json.expand_json(js_new_ctr, js_new_opt):
+ print row
+ name = row['cpp_name']
+ if name.endswith('_mean') or name.endswith('_stddev'): continue
+ benchmarks[name].add_sample(row, True)
+ for row in bm_json.expand_json(js_old_ctr, js_old_opt):
+ print row
+ name = row['cpp_name']
+ if name.endswith('_mean') or name.endswith('_stddev'): continue
+ benchmarks[name].add_sample(row, False)
really_interesting = set()
for name, bm in benchmarks.items():
diff --git a/tools/profiling/microbenchmarks/speedup.py b/tools/profiling/microbenchmarks/speedup.py
new file mode 100644
index 0000000000..8f9023d2c8
--- /dev/null
+++ b/tools/profiling/microbenchmarks/speedup.py
@@ -0,0 +1,67 @@
+# Copyright 2017, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+from scipy import stats
+import math
+
+_THRESHOLD = 0.0001
+
+def scale(a, mul):
+ return [x*mul for x in a]
+
+def cmp(a, b):
+ return stats.ttest_ind(a, b)
+
+def speedup(new, old):
+ s0, p0 = cmp(new, old)
+ if math.isnan(p0): return 0
+ if s0 == 0: return 0
+ if p0 > _THRESHOLD: return 0
+ if s0 < 0:
+ pct = 1
+ while pct < 101:
+ sp, pp = cmp(new, scale(old, 1 - pct/100.0))
+ if sp > 0: break
+ if pp > _THRESHOLD: break
+ pct += 1
+ return -(pct - 1)
+ else:
+ pct = 1
+ while pct < 101:
+ sp, pp = cmp(new, scale(old, 1 + pct/100.0))
+ if sp < 0: break
+ if pp > _THRESHOLD: break
+ pct += 1
+ return pct - 1
+
+if __name__ == "__main__":
+ new=[66034560.0, 126765693.0, 99074674.0, 98588433.0, 96731372.0, 110179725.0, 103802110.0, 101139800.0, 102357205.0, 99016353.0, 98840824.0, 99585632.0, 98791720.0, 96171521.0, 95327098.0, 95629704.0, 98209772.0, 99779411.0, 100182488.0, 98354192.0, 99644781.0, 98546709.0, 99019176.0, 99543014.0, 99077269.0, 98046601.0, 99319039.0, 98542572.0, 98886614.0, 72560968.0]
+ old=[60423464.0, 71249570.0, 73213089.0, 73200055.0, 72911768.0, 72347798.0, 72494672.0, 72756976.0, 72116565.0, 71541342.0, 73442538.0, 74817383.0, 73007780.0, 72499062.0, 72404945.0, 71843504.0, 73245405.0, 72778304.0, 74004519.0, 73694464.0, 72919931.0, 72955481.0, 71583857.0, 71350467.0, 71836817.0, 70064115.0, 70355345.0, 72516202.0, 71716777.0, 71532266.0]
+ print speedup(new, old)
+ print speedup(old, new)
diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json
index 95d5649d00..aa4869e1c8 100644
--- a/tools/run_tests/generated/sources_and_headers.json
+++ b/tools/run_tests/generated/sources_and_headers.json
@@ -480,6 +480,23 @@
"headers": [],
"is_filegroup": false,
"language": "c",
+ "name": "fake_resolver_test",
+ "src": [
+ "test/core/client_channel/resolvers/fake_resolver_test.c"
+ ],
+ "third_party": false,
+ "type": "target"
+ },
+ {
+ "deps": [
+ "gpr",
+ "gpr_test_util",
+ "grpc",
+ "grpc_test_util"
+ ],
+ "headers": [],
+ "is_filegroup": false,
+ "language": "c",
"name": "fd_conservation_posix_test",
"src": [
"test/core/iomgr/fd_conservation_posix_test.c"
diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json
index 89c987770a..6338ea7012 100644
--- a/tools/run_tests/generated/tests.json
+++ b/tools/run_tests/generated/tests.json
@@ -582,6 +582,28 @@
"ci_platforms": [
"linux",
"mac",
+ "posix",
+ "windows"
+ ],
+ "cpu_cost": 1.0,
+ "exclude_configs": [],
+ "exclude_iomgrs": [],
+ "flaky": false,
+ "gtest": false,
+ "language": "c",
+ "name": "fake_resolver_test",
+ "platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ]
+ },
+ {
+ "args": [],
+ "ci_platforms": [
+ "linux",
+ "mac",
"posix"
],
"cpu_cost": 1.0,
@@ -85099,6 +85121,29 @@
},
{
"args": [
+ "test/core/end2end/fuzzers/api_fuzzer_corpus/clusterfuzz-testcase-5867145026076672"
+ ],
+ "ci_platforms": [
+ "linux"
+ ],
+ "cpu_cost": 0.1,
+ "exclude_configs": [
+ "tsan"
+ ],
+ "exclude_iomgrs": [
+ "uv"
+ ],
+ "flaky": false,
+ "language": "c",
+ "name": "api_fuzzer_one_entry",
+ "platforms": [
+ "mac",
+ "linux"
+ ],
+ "uses_polling": false
+ },
+ {
+ "args": [
"test/core/end2end/fuzzers/api_fuzzer_corpus/clusterfuzz-testcase-5965570207907840"
],
"ci_platforms": [
diff --git a/vsprojects/buildtests_c.sln b/vsprojects/buildtests_c.sln
index 1e8336c54e..6ae8bfa74d 100644
--- a/vsprojects/buildtests_c.sln
+++ b/vsprojects/buildtests_c.sln
@@ -317,6 +317,17 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "error_test", "vcxproj\test\
{B23D3D1A-9438-4EDA-BEB6-9A0A03D17792} = {B23D3D1A-9438-4EDA-BEB6-9A0A03D17792}
EndProjectSection
EndProject
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "fake_resolver_test", "vcxproj\test\fake_resolver_test\fake_resolver_test.vcxproj", "{2F59EB2E-CEF9-A291-9480-1F737C3E5E57}"
+ ProjectSection(myProperties) = preProject
+ lib = "False"
+ EndProjectSection
+ ProjectSection(ProjectDependencies) = postProject
+ {17BCAFC0-5FDC-4C94-AEB9-95F3E220614B} = {17BCAFC0-5FDC-4C94-AEB9-95F3E220614B}
+ {29D16885-7228-4C31-81ED-5F9187C7F2A9} = {29D16885-7228-4C31-81ED-5F9187C7F2A9}
+ {EAB0A629-17A9-44DB-B5FF-E91A721FE037} = {EAB0A629-17A9-44DB-B5FF-E91A721FE037}
+ {B23D3D1A-9438-4EDA-BEB6-9A0A03D17792} = {B23D3D1A-9438-4EDA-BEB6-9A0A03D17792}
+ EndProjectSection
+EndProject
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "fling_client", "vcxproj\test\fling_client\fling_client.vcxproj", "{0647D598-9611-F659-EA36-DF995C9F736B}"
ProjectSection(myProperties) = preProject
lib = "False"
@@ -2116,6 +2127,22 @@ Global
{42720233-A6D4-66BC-CCA2-06B57261D0B3}.Release-DLL|Win32.Build.0 = Release|Win32
{42720233-A6D4-66BC-CCA2-06B57261D0B3}.Release-DLL|x64.ActiveCfg = Release|x64
{42720233-A6D4-66BC-CCA2-06B57261D0B3}.Release-DLL|x64.Build.0 = Release|x64
+ {2F59EB2E-CEF9-A291-9480-1F737C3E5E57}.Debug|Win32.ActiveCfg = Debug|Win32
+ {2F59EB2E-CEF9-A291-9480-1F737C3E5E57}.Debug|x64.ActiveCfg = Debug|x64
+ {2F59EB2E-CEF9-A291-9480-1F737C3E5E57}.Release|Win32.ActiveCfg = Release|Win32
+ {2F59EB2E-CEF9-A291-9480-1F737C3E5E57}.Release|x64.ActiveCfg = Release|x64
+ {2F59EB2E-CEF9-A291-9480-1F737C3E5E57}.Debug|Win32.Build.0 = Debug|Win32
+ {2F59EB2E-CEF9-A291-9480-1F737C3E5E57}.Debug|x64.Build.0 = Debug|x64
+ {2F59EB2E-CEF9-A291-9480-1F737C3E5E57}.Release|Win32.Build.0 = Release|Win32
+ {2F59EB2E-CEF9-A291-9480-1F737C3E5E57}.Release|x64.Build.0 = Release|x64
+ {2F59EB2E-CEF9-A291-9480-1F737C3E5E57}.Debug-DLL|Win32.ActiveCfg = Debug|Win32
+ {2F59EB2E-CEF9-A291-9480-1F737C3E5E57}.Debug-DLL|Win32.Build.0 = Debug|Win32
+ {2F59EB2E-CEF9-A291-9480-1F737C3E5E57}.Debug-DLL|x64.ActiveCfg = Debug|x64
+ {2F59EB2E-CEF9-A291-9480-1F737C3E5E57}.Debug-DLL|x64.Build.0 = Debug|x64
+ {2F59EB2E-CEF9-A291-9480-1F737C3E5E57}.Release-DLL|Win32.ActiveCfg = Release|Win32
+ {2F59EB2E-CEF9-A291-9480-1F737C3E5E57}.Release-DLL|Win32.Build.0 = Release|Win32
+ {2F59EB2E-CEF9-A291-9480-1F737C3E5E57}.Release-DLL|x64.ActiveCfg = Release|x64
+ {2F59EB2E-CEF9-A291-9480-1F737C3E5E57}.Release-DLL|x64.Build.0 = Release|x64
{0647D598-9611-F659-EA36-DF995C9F736B}.Debug|Win32.ActiveCfg = Debug|Win32
{0647D598-9611-F659-EA36-DF995C9F736B}.Debug|x64.ActiveCfg = Debug|x64
{0647D598-9611-F659-EA36-DF995C9F736B}.Release|Win32.ActiveCfg = Release|Win32
diff --git a/vsprojects/vcxproj/test/fake_resolver_test/fake_resolver_test.vcxproj b/vsprojects/vcxproj/test/fake_resolver_test/fake_resolver_test.vcxproj
new file mode 100644
index 0000000000..8ea721dd7b
--- /dev/null
+++ b/vsprojects/vcxproj/test/fake_resolver_test/fake_resolver_test.vcxproj
@@ -0,0 +1,199 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project DefaultTargets="Build" ToolsVersion="12.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\1.0.204.1.props')" />
+ <ItemGroup Label="ProjectConfigurations">
+ <ProjectConfiguration Include="Debug|Win32">
+ <Configuration>Debug</Configuration>
+ <Platform>Win32</Platform>
+ </ProjectConfiguration>
+ <ProjectConfiguration Include="Debug|x64">
+ <Configuration>Debug</Configuration>
+ <Platform>x64</Platform>
+ </ProjectConfiguration>
+ <ProjectConfiguration Include="Release|Win32">
+ <Configuration>Release</Configuration>
+ <Platform>Win32</Platform>
+ </ProjectConfiguration>
+ <ProjectConfiguration Include="Release|x64">
+ <Configuration>Release</Configuration>
+ <Platform>x64</Platform>
+ </ProjectConfiguration>
+ </ItemGroup>
+ <PropertyGroup Label="Globals">
+ <ProjectGuid>{2F59EB2E-CEF9-A291-9480-1F737C3E5E57}</ProjectGuid>
+ <IgnoreWarnIntDirInTempDetected>true</IgnoreWarnIntDirInTempDetected>
+ <IntDir>$(SolutionDir)IntDir\$(MSBuildProjectName)\</IntDir>
+ </PropertyGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
+ <PropertyGroup Condition="'$(VisualStudioVersion)' == '10.0'" Label="Configuration">
+ <PlatformToolset>v100</PlatformToolset>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(VisualStudioVersion)' == '11.0'" Label="Configuration">
+ <PlatformToolset>v110</PlatformToolset>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(VisualStudioVersion)' == '12.0'" Label="Configuration">
+ <PlatformToolset>v120</PlatformToolset>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(VisualStudioVersion)' == '14.0'" Label="Configuration">
+ <PlatformToolset>v140</PlatformToolset>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)'=='Debug'" Label="Configuration">
+ <ConfigurationType>Application</ConfigurationType>
+ <UseDebugLibraries>true</UseDebugLibraries>
+ <CharacterSet>Unicode</CharacterSet>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)'=='Release'" Label="Configuration">
+ <ConfigurationType>Application</ConfigurationType>
+ <UseDebugLibraries>false</UseDebugLibraries>
+ <WholeProgramOptimization>true</WholeProgramOptimization>
+ <CharacterSet>Unicode</CharacterSet>
+ </PropertyGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" />
+ <ImportGroup Label="ExtensionSettings">
+ </ImportGroup>
+ <ImportGroup Label="PropertySheets">
+ <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
+ <Import Project="$(SolutionDir)\..\vsprojects\global.props" />
+ <Import Project="$(SolutionDir)\..\vsprojects\openssl.props" />
+ <Import Project="$(SolutionDir)\..\vsprojects\winsock.props" />
+ <Import Project="$(SolutionDir)\..\vsprojects\zlib.props" />
+ </ImportGroup>
+ <PropertyGroup Label="UserMacros" />
+ <PropertyGroup Condition="'$(Configuration)'=='Debug'">
+ <TargetName>fake_resolver_test</TargetName>
+ <Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib>
+ <Configuration-grpc_dependencies_zlib>Debug</Configuration-grpc_dependencies_zlib>
+ <Linkage-grpc_dependencies_openssl>static</Linkage-grpc_dependencies_openssl>
+ <Configuration-grpc_dependencies_openssl>Debug</Configuration-grpc_dependencies_openssl>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)'=='Release'">
+ <TargetName>fake_resolver_test</TargetName>
+ <Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib>
+ <Configuration-grpc_dependencies_zlib>Release</Configuration-grpc_dependencies_zlib>
+ <Linkage-grpc_dependencies_openssl>static</Linkage-grpc_dependencies_openssl>
+ <Configuration-grpc_dependencies_openssl>Release</Configuration-grpc_dependencies_openssl>
+ </PropertyGroup>
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
+ <ClCompile>
+ <PrecompiledHeader>NotUsing</PrecompiledHeader>
+ <WarningLevel>Level3</WarningLevel>
+ <Optimization>Disabled</Optimization>
+ <PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <SDLCheck>true</SDLCheck>
+ <RuntimeLibrary>MultiThreadedDebug</RuntimeLibrary>
+ <TreatWarningAsError>true</TreatWarningAsError>
+ <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
+ <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
+ </ClCompile>
+ <Link>
+ <SubSystem>Console</SubSystem>
+ <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
+ <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
+ </Link>
+ </ItemDefinitionGroup>
+
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
+ <ClCompile>
+ <PrecompiledHeader>NotUsing</PrecompiledHeader>
+ <WarningLevel>Level3</WarningLevel>
+ <Optimization>Disabled</Optimization>
+ <PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <SDLCheck>true</SDLCheck>
+ <RuntimeLibrary>MultiThreadedDebug</RuntimeLibrary>
+ <TreatWarningAsError>true</TreatWarningAsError>
+ <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
+ <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
+ </ClCompile>
+ <Link>
+ <SubSystem>Console</SubSystem>
+ <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
+ <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
+ </Link>
+ </ItemDefinitionGroup>
+
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
+ <ClCompile>
+ <PrecompiledHeader>NotUsing</PrecompiledHeader>
+ <WarningLevel>Level3</WarningLevel>
+ <Optimization>MaxSpeed</Optimization>
+ <PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <FunctionLevelLinking>true</FunctionLevelLinking>
+ <IntrinsicFunctions>true</IntrinsicFunctions>
+ <SDLCheck>true</SDLCheck>
+ <RuntimeLibrary>MultiThreaded</RuntimeLibrary>
+ <TreatWarningAsError>true</TreatWarningAsError>
+ <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
+ <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
+ </ClCompile>
+ <Link>
+ <SubSystem>Console</SubSystem>
+ <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
+ <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
+ <EnableCOMDATFolding>true</EnableCOMDATFolding>
+ <OptimizeReferences>true</OptimizeReferences>
+ </Link>
+ </ItemDefinitionGroup>
+
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
+ <ClCompile>
+ <PrecompiledHeader>NotUsing</PrecompiledHeader>
+ <WarningLevel>Level3</WarningLevel>
+ <Optimization>MaxSpeed</Optimization>
+ <PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <FunctionLevelLinking>true</FunctionLevelLinking>
+ <IntrinsicFunctions>true</IntrinsicFunctions>
+ <SDLCheck>true</SDLCheck>
+ <RuntimeLibrary>MultiThreaded</RuntimeLibrary>
+ <TreatWarningAsError>true</TreatWarningAsError>
+ <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
+ <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
+ </ClCompile>
+ <Link>
+ <SubSystem>Console</SubSystem>
+ <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
+ <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
+ <EnableCOMDATFolding>true</EnableCOMDATFolding>
+ <OptimizeReferences>true</OptimizeReferences>
+ </Link>
+ </ItemDefinitionGroup>
+
+ <ItemGroup>
+ <ClCompile Include="$(SolutionDir)\..\test\core\client_channel\resolvers\fake_resolver_test.c">
+ </ClCompile>
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\grpc_test_util\grpc_test_util.vcxproj">
+ <Project>{17BCAFC0-5FDC-4C94-AEB9-95F3E220614B}</Project>
+ </ProjectReference>
+ <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\grpc\grpc.vcxproj">
+ <Project>{29D16885-7228-4C31-81ED-5F9187C7F2A9}</Project>
+ </ProjectReference>
+ <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\gpr_test_util\gpr_test_util.vcxproj">
+ <Project>{EAB0A629-17A9-44DB-B5FF-E91A721FE037}</Project>
+ </ProjectReference>
+ <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\gpr\gpr.vcxproj">
+ <Project>{B23D3D1A-9438-4EDA-BEB6-9A0A03D17792}</Project>
+ </ProjectReference>
+ </ItemGroup>
+ <ItemGroup>
+ <None Include="packages.config" />
+ </ItemGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
+ <ImportGroup Label="ExtensionTargets">
+ <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies\grpc.dependencies.zlib.targets')" />
+ <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies\grpc.dependencies.zlib.targets')" />
+ <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies\grpc.dependencies.openssl.targets')" />
+ <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies\grpc.dependencies.openssl.targets')" />
+ </ImportGroup>
+ <Target Name="EnsureNuGetPackageBuildImports" BeforeTargets="PrepareForBuild">
+ <PropertyGroup>
+ <ErrorText>This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.</ErrorText>
+ </PropertyGroup>
+ <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets')" />
+ <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets')" />
+ <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets')" />
+ <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props')" />
+ <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets')" />
+ </Target>
+</Project>
+
diff --git a/vsprojects/vcxproj/test/fake_resolver_test/fake_resolver_test.vcxproj.filters b/vsprojects/vcxproj/test/fake_resolver_test/fake_resolver_test.vcxproj.filters
new file mode 100644
index 0000000000..353cae2b07
--- /dev/null
+++ b/vsprojects/vcxproj/test/fake_resolver_test/fake_resolver_test.vcxproj.filters
@@ -0,0 +1,24 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <ItemGroup>
+ <ClCompile Include="$(SolutionDir)\..\test\core\client_channel\resolvers\fake_resolver_test.c">
+ <Filter>test\core\client_channel\resolvers</Filter>
+ </ClCompile>
+ </ItemGroup>
+
+ <ItemGroup>
+ <Filter Include="test">
+ <UniqueIdentifier>{05f58640-4b7c-c233-bf86-f119fe270ba1}</UniqueIdentifier>
+ </Filter>
+ <Filter Include="test\core">
+ <UniqueIdentifier>{caeef88d-baf6-603c-83b0-84b1009be480}</UniqueIdentifier>
+ </Filter>
+ <Filter Include="test\core\client_channel">
+ <UniqueIdentifier>{c5bba556-fd85-f7b4-99b7-8b1eeb4dfcb2}</UniqueIdentifier>
+ </Filter>
+ <Filter Include="test\core\client_channel\resolvers">
+ <UniqueIdentifier>{eff74a86-6a4c-b68c-0330-6901d992c5c7}</UniqueIdentifier>
+ </Filter>
+ </ItemGroup>
+</Project>
+