aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-02-15 08:34:49 -0800
committerGravatar Craig Tiller <ctiller@google.com>2017-02-15 08:34:49 -0800
commit1f8c55835dcf0a7568c1baf8f9dff554acb31335 (patch)
treea36d247b162f5e2ba8a3b4b739ef88019db54a6f
parentad3b200e690f703cfa4fe9fea6603f708420ed0f (diff)
parent3f878bc78493bc3f0d8a826740355be445ffab7a (diff)
Merge branch 'bm_closure' into bm_cq
-rw-r--r--CMakeLists.txt41
-rw-r--r--Makefile50
-rw-r--r--build.yaml19
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_settings.h2
-rw-r--r--test/core/util/trickle_endpoint.c196
-rw-r--r--test/core/util/trickle_endpoint.h46
-rw-r--r--test/cpp/microbenchmarks/bm_closure.cc356
-rw-r--r--test/cpp/microbenchmarks/bm_fullstack.cc186
-rwxr-xr-xtools/profiling/microbenchmarks/bm2bq.py29
-rw-r--r--tools/run_tests/generated/sources_and_headers.json25
-rw-r--r--tools/run_tests/generated/tests.json22
-rwxr-xr-xtools/run_tests/run_microbenchmark.py13
-rw-r--r--vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj3
-rw-r--r--vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters6
-rw-r--r--vsprojects/vcxproj/grpc_test_util_unsecure/grpc_test_util_unsecure.vcxproj3
-rw-r--r--vsprojects/vcxproj/grpc_test_util_unsecure/grpc_test_util_unsecure.vcxproj.filters6
16 files changed, 980 insertions, 23 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index bf26749221..2d2dc772c6 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -570,6 +570,9 @@ add_dependencies(buildtests_cxx alarm_cpp_test)
add_dependencies(buildtests_cxx async_end2end_test)
add_dependencies(buildtests_cxx auth_property_iterator_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
+add_dependencies(buildtests_cxx bm_closure)
+endif()
+if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx bm_cq)
endif()
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
@@ -1431,6 +1434,7 @@ add_library(grpc_test_util
test/core/util/port_uv.c
test/core/util/port_windows.c
test/core/util/slice_splitter.c
+ test/core/util/trickle_endpoint.c
src/core/lib/channel/channel_args.c
src/core/lib/channel/channel_stack.c
src/core/lib/channel/channel_stack_builder.c
@@ -1638,6 +1642,7 @@ add_library(grpc_test_util_unsecure
test/core/util/port_uv.c
test/core/util/port_windows.c
test/core/util/slice_splitter.c
+ test/core/util/trickle_endpoint.c
)
if(WIN32 AND MSVC)
@@ -7448,6 +7453,42 @@ endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
+add_executable(bm_closure
+ test/cpp/microbenchmarks/bm_closure.cc
+ third_party/googletest/src/gtest-all.cc
+)
+
+
+target_include_directories(bm_closure
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
+ PRIVATE ${BORINGSSL_ROOT_DIR}/include
+ PRIVATE ${PROTOBUF_ROOT_DIR}/src
+ PRIVATE ${BENCHMARK_ROOT_DIR}/include
+ PRIVATE ${ZLIB_ROOT_DIR}
+ PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib
+ PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/gflags/include
+ PRIVATE third_party/googletest/include
+ PRIVATE third_party/googletest
+ PRIVATE ${_gRPC_PROTO_GENS_DIR}
+)
+
+target_link_libraries(bm_closure
+ ${_gRPC_PROTOBUF_LIBRARIES}
+ ${_gRPC_ALLTARGETS_LIBRARIES}
+ benchmark
+ grpc_test_util
+ grpc
+ gpr_test_util
+ gpr
+ ${_gRPC_GFLAGS_LIBRARIES}
+)
+
+endif()
+endif (gRPC_BUILD_TESTS)
+if (gRPC_BUILD_TESTS)
+if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
+
add_executable(bm_cq
test/cpp/microbenchmarks/bm_cq.cc
third_party/googletest/src/gtest-all.cc
diff --git a/Makefile b/Makefile
index 313dce0532..72affa2fe2 100644
--- a/Makefile
+++ b/Makefile
@@ -1043,6 +1043,7 @@ wakeup_fd_cv_test: $(BINDIR)/$(CONFIG)/wakeup_fd_cv_test
alarm_cpp_test: $(BINDIR)/$(CONFIG)/alarm_cpp_test
async_end2end_test: $(BINDIR)/$(CONFIG)/async_end2end_test
auth_property_iterator_test: $(BINDIR)/$(CONFIG)/auth_property_iterator_test
+bm_closure: $(BINDIR)/$(CONFIG)/bm_closure
bm_cq: $(BINDIR)/$(CONFIG)/bm_cq
bm_fullstack: $(BINDIR)/$(CONFIG)/bm_fullstack
channel_arguments_test: $(BINDIR)/$(CONFIG)/channel_arguments_test
@@ -1450,6 +1451,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/alarm_cpp_test \
$(BINDIR)/$(CONFIG)/async_end2end_test \
$(BINDIR)/$(CONFIG)/auth_property_iterator_test \
+ $(BINDIR)/$(CONFIG)/bm_closure \
$(BINDIR)/$(CONFIG)/bm_cq \
$(BINDIR)/$(CONFIG)/bm_fullstack \
$(BINDIR)/$(CONFIG)/channel_arguments_test \
@@ -1555,6 +1557,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/alarm_cpp_test \
$(BINDIR)/$(CONFIG)/async_end2end_test \
$(BINDIR)/$(CONFIG)/auth_property_iterator_test \
+ $(BINDIR)/$(CONFIG)/bm_closure \
$(BINDIR)/$(CONFIG)/bm_cq \
$(BINDIR)/$(CONFIG)/bm_fullstack \
$(BINDIR)/$(CONFIG)/channel_arguments_test \
@@ -1873,6 +1876,8 @@ test_cxx: buildtests_cxx
$(Q) $(BINDIR)/$(CONFIG)/async_end2end_test || ( echo test async_end2end_test failed ; exit 1 )
$(E) "[RUN] Testing auth_property_iterator_test"
$(Q) $(BINDIR)/$(CONFIG)/auth_property_iterator_test || ( echo test auth_property_iterator_test failed ; exit 1 )
+ $(E) "[RUN] Testing bm_closure"
+ $(Q) $(BINDIR)/$(CONFIG)/bm_closure || ( echo test bm_closure failed ; exit 1 )
$(E) "[RUN] Testing bm_cq"
$(Q) $(BINDIR)/$(CONFIG)/bm_cq || ( echo test bm_cq failed ; exit 1 )
$(E) "[RUN] Testing bm_fullstack"
@@ -3267,6 +3272,7 @@ LIBGRPC_TEST_UTIL_SRC = \
test/core/util/port_uv.c \
test/core/util/port_windows.c \
test/core/util/slice_splitter.c \
+ test/core/util/trickle_endpoint.c \
src/core/lib/channel/channel_args.c \
src/core/lib/channel/channel_stack.c \
src/core/lib/channel/channel_stack_builder.c \
@@ -3467,6 +3473,7 @@ LIBGRPC_TEST_UTIL_UNSECURE_SRC = \
test/core/util/port_uv.c \
test/core/util/port_windows.c \
test/core/util/slice_splitter.c \
+ test/core/util/trickle_endpoint.c \
PUBLIC_HEADERS_C += \
@@ -12394,6 +12401,49 @@ endif
endif
+BM_CLOSURE_SRC = \
+ test/cpp/microbenchmarks/bm_closure.cc \
+
+BM_CLOSURE_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(BM_CLOSURE_SRC))))
+ifeq ($(NO_SECURE),true)
+
+# You can't build secure targets if you don't have OpenSSL.
+
+$(BINDIR)/$(CONFIG)/bm_closure: openssl_dep_error
+
+else
+
+
+
+
+ifeq ($(NO_PROTOBUF),true)
+
+# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.0.0+.
+
+$(BINDIR)/$(CONFIG)/bm_closure: protobuf_dep_error
+
+else
+
+$(BINDIR)/$(CONFIG)/bm_closure: $(PROTOBUF_DEP) $(BM_CLOSURE_OBJS) $(LIBDIR)/$(CONFIG)/libbenchmark.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+ $(E) "[LD] Linking $@"
+ $(Q) mkdir -p `dirname $@`
+ $(Q) $(LDXX) $(LDFLAGS) $(BM_CLOSURE_OBJS) $(LIBDIR)/$(CONFIG)/libbenchmark.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/bm_closure
+
+endif
+
+endif
+
+$(OBJDIR)/$(CONFIG)/test/cpp/microbenchmarks/bm_closure.o: $(LIBDIR)/$(CONFIG)/libbenchmark.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+
+deps_bm_closure: $(BM_CLOSURE_OBJS:.o=.dep)
+
+ifneq ($(NO_SECURE),true)
+ifneq ($(NO_DEPS),true)
+-include $(BM_CLOSURE_OBJS:.o=.dep)
+endif
+endif
+
+
BM_CQ_SRC = \
test/cpp/microbenchmarks/bm_cq.cc \
diff --git a/build.yaml b/build.yaml
index 811aab4e70..5eb615f74a 100644
--- a/build.yaml
+++ b/build.yaml
@@ -596,6 +596,7 @@ filegroups:
- test/core/util/port.h
- test/core/util/port_server_client.h
- test/core/util/slice_splitter.h
+ - test/core/util/trickle_endpoint.h
src:
- test/core/end2end/cq_verifier.c
- test/core/end2end/fake_resolver.c
@@ -613,6 +614,7 @@ filegroups:
- test/core/util/port_uv.c
- test/core/util/port_windows.c
- test/core/util/slice_splitter.c
+ - test/core/util/trickle_endpoint.c
deps:
- grpc
- gpr_test_util
@@ -2985,6 +2987,23 @@ targets:
- grpc
- gpr_test_util
- gpr
+- name: bm_closure
+ build: test
+ language: c++
+ src:
+ - test/cpp/microbenchmarks/bm_closure.cc
+ deps:
+ - benchmark
+ - grpc_test_util
+ - grpc
+ - gpr_test_util
+ - gpr
+ args:
+ - --benchmark_min_time=0
+ platforms:
+ - mac
+ - linux
+ - posix
- name: bm_cq
build: test
language: c++
diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.h b/src/core/ext/transport/chttp2/transport/frame_settings.h
index a29dc82106..44137798c0 100644
--- a/src/core/ext/transport/chttp2/transport/frame_settings.h
+++ b/src/core/ext/transport/chttp2/transport/frame_settings.h
@@ -87,7 +87,7 @@ extern const grpc_chttp2_setting_parameters
grpc_chttp2_settings_parameters[GRPC_CHTTP2_NUM_SETTINGS];
/* Create a settings frame by diffing old & new, and updating old to be new */
-grpc_slice grpc_chttp2_settings_create(uint32_t *old, const uint32_t *new,
+grpc_slice grpc_chttp2_settings_create(uint32_t *old, const uint32_t *newval,
uint32_t force_mask, size_t count);
/* Create an ack settings frame */
grpc_slice grpc_chttp2_settings_ack_create(void);
diff --git a/test/core/util/trickle_endpoint.c b/test/core/util/trickle_endpoint.c
new file mode 100644
index 0000000000..7ab0488a66
--- /dev/null
+++ b/test/core/util/trickle_endpoint.c
@@ -0,0 +1,196 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "test/core/util/passthru_endpoint.h"
+
+#include <inttypes.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/useful.h>
+
+#include "src/core/lib/iomgr/sockaddr.h"
+
+#include "src/core/lib/slice/slice_internal.h"
+
+typedef struct {
+ grpc_endpoint base;
+ double bytes_per_second;
+ grpc_endpoint *wrapped;
+ gpr_timespec last_write;
+
+ gpr_mu mu;
+ grpc_slice_buffer write_buffer;
+ grpc_slice_buffer writing_buffer;
+ grpc_error *error;
+ bool writing;
+} trickle_endpoint;
+
+static void te_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
+ grpc_slice_buffer *slices, grpc_closure *cb) {
+ trickle_endpoint *te = (trickle_endpoint *)ep;
+ grpc_endpoint_read(exec_ctx, te->wrapped, slices, cb);
+}
+
+static void te_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
+ grpc_slice_buffer *slices, grpc_closure *cb) {
+ trickle_endpoint *te = (trickle_endpoint *)ep;
+ for (size_t i = 0; i < slices->count; i++) {
+ grpc_slice_ref_internal(slices->slices[i]);
+ }
+ gpr_mu_lock(&te->mu);
+ if (te->write_buffer.length == 0) {
+ te->last_write = gpr_now(GPR_CLOCK_MONOTONIC);
+ }
+ grpc_slice_buffer_addn(&te->write_buffer, slices->slices, slices->count);
+ grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_REF(te->error));
+ gpr_mu_unlock(&te->mu);
+}
+
+static grpc_workqueue *te_get_workqueue(grpc_endpoint *ep) {
+ trickle_endpoint *te = (trickle_endpoint *)ep;
+ return grpc_endpoint_get_workqueue(te->wrapped);
+}
+
+static void te_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
+ grpc_pollset *pollset) {
+ trickle_endpoint *te = (trickle_endpoint *)ep;
+ grpc_endpoint_add_to_pollset(exec_ctx, te->wrapped, pollset);
+}
+
+static void te_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
+ grpc_pollset_set *pollset_set) {
+ trickle_endpoint *te = (trickle_endpoint *)ep;
+ grpc_endpoint_add_to_pollset_set(exec_ctx, te->wrapped, pollset_set);
+}
+
+static void te_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
+ grpc_error *why) {
+ trickle_endpoint *te = (trickle_endpoint *)ep;
+ gpr_mu_lock(&te->mu);
+ if (te->error == GRPC_ERROR_NONE) {
+ te->error = GRPC_ERROR_REF(why);
+ }
+ gpr_mu_unlock(&te->mu);
+ grpc_endpoint_shutdown(exec_ctx, te->wrapped, why);
+}
+
+static void te_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
+ trickle_endpoint *te = (trickle_endpoint *)ep;
+ grpc_endpoint_destroy(exec_ctx, te->wrapped);
+ gpr_mu_destroy(&te->mu);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &te->write_buffer);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &te->writing_buffer);
+ GRPC_ERROR_UNREF(te->error);
+ gpr_free(te);
+}
+
+static grpc_resource_user *te_get_resource_user(grpc_endpoint *ep) {
+ trickle_endpoint *te = (trickle_endpoint *)ep;
+ return grpc_endpoint_get_resource_user(te->wrapped);
+}
+
+static char *te_get_peer(grpc_endpoint *ep) {
+ trickle_endpoint *te = (trickle_endpoint *)ep;
+ return grpc_endpoint_get_peer(te->wrapped);
+}
+
+static int te_get_fd(grpc_endpoint *ep) {
+ trickle_endpoint *te = (trickle_endpoint *)ep;
+ return grpc_endpoint_get_fd(te->wrapped);
+}
+
+static void te_finish_write(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ trickle_endpoint *te = arg;
+ gpr_mu_lock(&te->mu);
+ te->writing = false;
+ grpc_slice_buffer_reset_and_unref(&te->writing_buffer);
+ gpr_mu_unlock(&te->mu);
+}
+
+static const grpc_endpoint_vtable vtable = {te_read,
+ te_write,
+ te_get_workqueue,
+ te_add_to_pollset,
+ te_add_to_pollset_set,
+ te_shutdown,
+ te_destroy,
+ te_get_resource_user,
+ te_get_peer,
+ te_get_fd};
+
+grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap,
+ double bytes_per_second) {
+ trickle_endpoint *te = gpr_malloc(sizeof(*te));
+ te->base.vtable = &vtable;
+ te->wrapped = wrap;
+ te->bytes_per_second = bytes_per_second;
+ gpr_mu_init(&te->mu);
+ grpc_slice_buffer_init(&te->write_buffer);
+ grpc_slice_buffer_init(&te->writing_buffer);
+ te->error = GRPC_ERROR_NONE;
+ te->writing = false;
+ return &te->base;
+}
+
+static double ts2dbl(gpr_timespec s) {
+ return (double)s.tv_sec + 1e-9 * (double)s.tv_nsec;
+}
+
+size_t grpc_trickle_endpoint_trickle(grpc_exec_ctx *exec_ctx,
+ grpc_endpoint *ep) {
+ trickle_endpoint *te = (trickle_endpoint *)ep;
+ gpr_mu_lock(&te->mu);
+ if (!te->writing && te->write_buffer.length > 0) {
+ gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+ double elapsed = ts2dbl(gpr_time_sub(now, te->last_write));
+ size_t bytes = (size_t)(te->bytes_per_second * elapsed);
+ // gpr_log(GPR_DEBUG, "%lf elapsed --> %" PRIdPTR " bytes", elapsed, bytes);
+ if (bytes > 0) {
+ grpc_slice_buffer_move_first(&te->write_buffer,
+ GPR_MIN(bytes, te->write_buffer.length),
+ &te->writing_buffer);
+ te->writing = true;
+ te->last_write = now;
+ grpc_endpoint_write(
+ exec_ctx, te->wrapped, &te->writing_buffer,
+ grpc_closure_create(te_finish_write, te, grpc_schedule_on_exec_ctx));
+ }
+ }
+ size_t backlog = te->write_buffer.length;
+ gpr_mu_unlock(&te->mu);
+ return backlog;
+}
diff --git a/test/core/util/trickle_endpoint.h b/test/core/util/trickle_endpoint.h
new file mode 100644
index 0000000000..7e8d9d91e3
--- /dev/null
+++ b/test/core/util/trickle_endpoint.h
@@ -0,0 +1,46 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef TRICKLE_ENDPOINT_H
+#define TRICKLE_ENDPOINT_H
+
+#include "src/core/lib/iomgr/endpoint.h"
+
+grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap,
+ double bytes_per_second);
+
+/* Allow up to \a bytes through the endpoint. Returns the new backlog. */
+size_t grpc_trickle_endpoint_trickle(grpc_exec_ctx *exec_ctx,
+ grpc_endpoint *endpoint);
+
+#endif
diff --git a/test/cpp/microbenchmarks/bm_closure.cc b/test/cpp/microbenchmarks/bm_closure.cc
new file mode 100644
index 0000000000..80d6610e13
--- /dev/null
+++ b/test/cpp/microbenchmarks/bm_closure.cc
@@ -0,0 +1,356 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/* Test various closure related operations */
+
+#include <grpc/grpc.h>
+
+extern "C" {
+#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/combiner.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
+}
+
+#include "third_party/benchmark/include/benchmark/benchmark.h"
+
+static class InitializeStuff {
+ public:
+ InitializeStuff() { grpc_init(); }
+ ~InitializeStuff() { grpc_shutdown(); }
+} initialize_stuff;
+
+static void BM_NoOpExecCtx(benchmark::State& state) {
+ while (state.KeepRunning()) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_exec_ctx_finish(&exec_ctx);
+ }
+}
+BENCHMARK(BM_NoOpExecCtx);
+
+static void BM_WellFlushed(benchmark::State& state) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ while (state.KeepRunning()) {
+ grpc_exec_ctx_flush(&exec_ctx);
+ }
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+BENCHMARK(BM_WellFlushed);
+
+static void DoNothing(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {}
+
+static void BM_ClosureInitAgainstExecCtx(benchmark::State& state) {
+ grpc_closure c;
+ while (state.KeepRunning()) {
+ benchmark::DoNotOptimize(
+ grpc_closure_init(&c, DoNothing, NULL, grpc_schedule_on_exec_ctx));
+ }
+}
+BENCHMARK(BM_ClosureInitAgainstExecCtx);
+
+static void BM_ClosureInitAgainstCombiner(benchmark::State& state) {
+ grpc_combiner* combiner = grpc_combiner_create(NULL);
+ grpc_closure c;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ while (state.KeepRunning()) {
+ benchmark::DoNotOptimize(grpc_closure_init(
+ &c, DoNothing, NULL, grpc_combiner_scheduler(combiner, false)));
+ }
+ GRPC_COMBINER_UNREF(&exec_ctx, combiner, "finished");
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+BENCHMARK(BM_ClosureInitAgainstCombiner);
+
+static void BM_ClosureRunOnExecCtx(benchmark::State& state) {
+ grpc_closure c;
+ grpc_closure_init(&c, DoNothing, NULL, grpc_schedule_on_exec_ctx);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ while (state.KeepRunning()) {
+ grpc_closure_run(&exec_ctx, &c, GRPC_ERROR_NONE);
+ grpc_exec_ctx_flush(&exec_ctx);
+ }
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+BENCHMARK(BM_ClosureRunOnExecCtx);
+
+static void BM_ClosureCreateAndRun(benchmark::State& state) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ while (state.KeepRunning()) {
+ grpc_closure_run(&exec_ctx, grpc_closure_create(DoNothing, NULL,
+ grpc_schedule_on_exec_ctx),
+ GRPC_ERROR_NONE);
+ }
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+BENCHMARK(BM_ClosureCreateAndRun);
+
+static void BM_ClosureInitAndRun(benchmark::State& state) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_closure c;
+ while (state.KeepRunning()) {
+ grpc_closure_run(&exec_ctx, grpc_closure_init(&c, DoNothing, NULL,
+ grpc_schedule_on_exec_ctx),
+ GRPC_ERROR_NONE);
+ }
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+BENCHMARK(BM_ClosureInitAndRun);
+
+static void BM_ClosureSchedOnExecCtx(benchmark::State& state) {
+ grpc_closure c;
+ grpc_closure_init(&c, DoNothing, NULL, grpc_schedule_on_exec_ctx);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ while (state.KeepRunning()) {
+ grpc_closure_sched(&exec_ctx, &c, GRPC_ERROR_NONE);
+ grpc_exec_ctx_flush(&exec_ctx);
+ }
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+BENCHMARK(BM_ClosureSchedOnExecCtx);
+
+static void BM_ClosureSched2OnExecCtx(benchmark::State& state) {
+ grpc_closure c1;
+ grpc_closure c2;
+ grpc_closure_init(&c1, DoNothing, NULL, grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&c2, DoNothing, NULL, grpc_schedule_on_exec_ctx);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ while (state.KeepRunning()) {
+ grpc_closure_sched(&exec_ctx, &c1, GRPC_ERROR_NONE);
+ grpc_closure_sched(&exec_ctx, &c2, GRPC_ERROR_NONE);
+ grpc_exec_ctx_flush(&exec_ctx);
+ }
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+BENCHMARK(BM_ClosureSched2OnExecCtx);
+
+static void BM_ClosureSched3OnExecCtx(benchmark::State& state) {
+ grpc_closure c1;
+ grpc_closure c2;
+ grpc_closure c3;
+ grpc_closure_init(&c1, DoNothing, NULL, grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&c2, DoNothing, NULL, grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&c3, DoNothing, NULL, grpc_schedule_on_exec_ctx);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ while (state.KeepRunning()) {
+ grpc_closure_sched(&exec_ctx, &c1, GRPC_ERROR_NONE);
+ grpc_closure_sched(&exec_ctx, &c2, GRPC_ERROR_NONE);
+ grpc_closure_sched(&exec_ctx, &c3, GRPC_ERROR_NONE);
+ grpc_exec_ctx_flush(&exec_ctx);
+ }
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+BENCHMARK(BM_ClosureSched3OnExecCtx);
+
+static void BM_AcquireMutex(benchmark::State& state) {
+ // for comparison with the combiner stuff below
+ gpr_mu mu;
+ gpr_mu_init(&mu);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ while (state.KeepRunning()) {
+ gpr_mu_lock(&mu);
+ DoNothing(&exec_ctx, NULL, GRPC_ERROR_NONE);
+ gpr_mu_unlock(&mu);
+ }
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+BENCHMARK(BM_AcquireMutex);
+
+static void BM_ClosureSchedOnCombiner(benchmark::State& state) {
+ grpc_combiner* combiner = grpc_combiner_create(NULL);
+ grpc_closure c;
+ grpc_closure_init(&c, DoNothing, NULL,
+ grpc_combiner_scheduler(combiner, false));
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ while (state.KeepRunning()) {
+ grpc_closure_sched(&exec_ctx, &c, GRPC_ERROR_NONE);
+ grpc_exec_ctx_flush(&exec_ctx);
+ }
+ GRPC_COMBINER_UNREF(&exec_ctx, combiner, "finished");
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+BENCHMARK(BM_ClosureSchedOnCombiner);
+
+static void BM_ClosureSched2OnCombiner(benchmark::State& state) {
+ grpc_combiner* combiner = grpc_combiner_create(NULL);
+ grpc_closure c1;
+ grpc_closure c2;
+ grpc_closure_init(&c1, DoNothing, NULL,
+ grpc_combiner_scheduler(combiner, false));
+ grpc_closure_init(&c2, DoNothing, NULL,
+ grpc_combiner_scheduler(combiner, false));
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ while (state.KeepRunning()) {
+ grpc_closure_sched(&exec_ctx, &c1, GRPC_ERROR_NONE);
+ grpc_closure_sched(&exec_ctx, &c2, GRPC_ERROR_NONE);
+ grpc_exec_ctx_flush(&exec_ctx);
+ }
+ GRPC_COMBINER_UNREF(&exec_ctx, combiner, "finished");
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+BENCHMARK(BM_ClosureSched2OnCombiner);
+
+static void BM_ClosureSched3OnCombiner(benchmark::State& state) {
+ grpc_combiner* combiner = grpc_combiner_create(NULL);
+ grpc_closure c1;
+ grpc_closure c2;
+ grpc_closure c3;
+ grpc_closure_init(&c1, DoNothing, NULL,
+ grpc_combiner_scheduler(combiner, false));
+ grpc_closure_init(&c2, DoNothing, NULL,
+ grpc_combiner_scheduler(combiner, false));
+ grpc_closure_init(&c3, DoNothing, NULL,
+ grpc_combiner_scheduler(combiner, false));
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ while (state.KeepRunning()) {
+ grpc_closure_sched(&exec_ctx, &c1, GRPC_ERROR_NONE);
+ grpc_closure_sched(&exec_ctx, &c2, GRPC_ERROR_NONE);
+ grpc_closure_sched(&exec_ctx, &c3, GRPC_ERROR_NONE);
+ grpc_exec_ctx_flush(&exec_ctx);
+ }
+ GRPC_COMBINER_UNREF(&exec_ctx, combiner, "finished");
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+BENCHMARK(BM_ClosureSched3OnCombiner);
+
+static void BM_ClosureSched2OnTwoCombiners(benchmark::State& state) {
+ grpc_combiner* combiner1 = grpc_combiner_create(NULL);
+ grpc_combiner* combiner2 = grpc_combiner_create(NULL);
+ grpc_closure c1;
+ grpc_closure c2;
+ grpc_closure_init(&c1, DoNothing, NULL,
+ grpc_combiner_scheduler(combiner1, false));
+ grpc_closure_init(&c2, DoNothing, NULL,
+ grpc_combiner_scheduler(combiner2, false));
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ while (state.KeepRunning()) {
+ grpc_closure_sched(&exec_ctx, &c1, GRPC_ERROR_NONE);
+ grpc_closure_sched(&exec_ctx, &c2, GRPC_ERROR_NONE);
+ grpc_exec_ctx_flush(&exec_ctx);
+ }
+ GRPC_COMBINER_UNREF(&exec_ctx, combiner1, "finished");
+ GRPC_COMBINER_UNREF(&exec_ctx, combiner2, "finished");
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+BENCHMARK(BM_ClosureSched2OnTwoCombiners);
+
+static void BM_ClosureSched4OnTwoCombiners(benchmark::State& state) {
+ grpc_combiner* combiner1 = grpc_combiner_create(NULL);
+ grpc_combiner* combiner2 = grpc_combiner_create(NULL);
+ grpc_closure c1;
+ grpc_closure c2;
+ grpc_closure c3;
+ grpc_closure c4;
+ grpc_closure_init(&c1, DoNothing, NULL,
+ grpc_combiner_scheduler(combiner1, false));
+ grpc_closure_init(&c2, DoNothing, NULL,
+ grpc_combiner_scheduler(combiner2, false));
+ grpc_closure_init(&c3, DoNothing, NULL,
+ grpc_combiner_scheduler(combiner1, false));
+ grpc_closure_init(&c4, DoNothing, NULL,
+ grpc_combiner_scheduler(combiner2, false));
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ while (state.KeepRunning()) {
+ grpc_closure_sched(&exec_ctx, &c1, GRPC_ERROR_NONE);
+ grpc_closure_sched(&exec_ctx, &c2, GRPC_ERROR_NONE);
+ grpc_closure_sched(&exec_ctx, &c3, GRPC_ERROR_NONE);
+ grpc_closure_sched(&exec_ctx, &c4, GRPC_ERROR_NONE);
+ grpc_exec_ctx_flush(&exec_ctx);
+ }
+ GRPC_COMBINER_UNREF(&exec_ctx, combiner1, "finished");
+ GRPC_COMBINER_UNREF(&exec_ctx, combiner2, "finished");
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+BENCHMARK(BM_ClosureSched4OnTwoCombiners);
+
+// Helper that continuously reschedules the same closure against something until
+// the benchmark is complete
+class Rescheduler {
+ public:
+ Rescheduler(benchmark::State& state, grpc_closure_scheduler* scheduler)
+ : state_(state) {
+ grpc_closure_init(&closure_, Step, this, scheduler);
+ }
+
+ void ScheduleFirst(grpc_exec_ctx* exec_ctx) {
+ grpc_closure_sched(exec_ctx, &closure_, GRPC_ERROR_NONE);
+ }
+
+ void ScheduleFirstAgainstDifferentScheduler(
+ grpc_exec_ctx* exec_ctx, grpc_closure_scheduler* scheduler) {
+ grpc_closure_sched(exec_ctx, grpc_closure_create(Step, this, scheduler),
+ GRPC_ERROR_NONE);
+ }
+
+ private:
+ benchmark::State& state_;
+ grpc_closure closure_;
+
+ static void Step(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
+ Rescheduler* self = static_cast<Rescheduler*>(arg);
+ if (self->state_.KeepRunning()) {
+ grpc_closure_sched(exec_ctx, &self->closure_, GRPC_ERROR_NONE);
+ }
+ }
+};
+
+static void BM_ClosureReschedOnExecCtx(benchmark::State& state) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ Rescheduler r(state, grpc_schedule_on_exec_ctx);
+ r.ScheduleFirst(&exec_ctx);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+BENCHMARK(BM_ClosureReschedOnExecCtx);
+
+static void BM_ClosureReschedOnCombiner(benchmark::State& state) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_combiner* combiner = grpc_combiner_create(NULL);
+ Rescheduler r(state, grpc_combiner_scheduler(combiner, false));
+ r.ScheduleFirst(&exec_ctx);
+ grpc_exec_ctx_flush(&exec_ctx);
+ GRPC_COMBINER_UNREF(&exec_ctx, combiner, "finished");
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+BENCHMARK(BM_ClosureReschedOnCombiner);
+
+static void BM_ClosureReschedOnCombinerFinally(benchmark::State& state) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_combiner* combiner = grpc_combiner_create(NULL);
+ Rescheduler r(state, grpc_combiner_finally_scheduler(combiner, false));
+ r.ScheduleFirstAgainstDifferentScheduler(
+ &exec_ctx, grpc_combiner_scheduler(combiner, false));
+ grpc_exec_ctx_flush(&exec_ctx);
+ GRPC_COMBINER_UNREF(&exec_ctx, combiner, "finished");
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+BENCHMARK(BM_ClosureReschedOnCombinerFinally);
+
+BENCHMARK_MAIN();
diff --git a/test/cpp/microbenchmarks/bm_fullstack.cc b/test/cpp/microbenchmarks/bm_fullstack.cc
index 9d883e68d7..5c85674611 100644
--- a/test/cpp/microbenchmarks/bm_fullstack.cc
+++ b/test/cpp/microbenchmarks/bm_fullstack.cc
@@ -46,6 +46,7 @@
extern "C" {
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
+#include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/endpoint_pair.h"
@@ -57,6 +58,7 @@ extern "C" {
#include "test/core/util/memory_counters.h"
#include "test/core/util/passthru_endpoint.h"
#include "test/core/util/port.h"
+#include "test/core/util/trickle_endpoint.h"
}
#include "src/core/lib/profiling/timers.h"
#include "src/cpp/client/create_channel_internal.h"
@@ -197,7 +199,8 @@ class UDS : public FullstackFixture {
class EndpointPairFixture : public BaseFixture {
public:
- EndpointPairFixture(Service* service, grpc_endpoint_pair endpoints) {
+ EndpointPairFixture(Service* service, grpc_endpoint_pair endpoints)
+ : endpoint_pair_(endpoints) {
ServerBuilder b;
cq_ = b.AddCompletionQueue(true);
b.RegisterService(service);
@@ -210,7 +213,7 @@ class EndpointPairFixture : public BaseFixture {
{
const grpc_channel_args* server_args =
grpc_server_get_channel_args(server_->c_server());
- grpc_transport* transport = grpc_create_chttp2_transport(
+ server_transport_ = grpc_create_chttp2_transport(
&exec_ctx, server_args, endpoints.server, 0 /* is_client */);
grpc_pollset** pollsets;
@@ -221,9 +224,9 @@ class EndpointPairFixture : public BaseFixture {
grpc_endpoint_add_to_pollset(&exec_ctx, endpoints.server, pollsets[i]);
}
- grpc_server_setup_transport(&exec_ctx, server_->c_server(), transport,
- NULL, server_args);
- grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL);
+ grpc_server_setup_transport(&exec_ctx, server_->c_server(),
+ server_transport_, NULL, server_args);
+ grpc_chttp2_transport_start_reading(&exec_ctx, server_transport_, NULL);
}
/* create channel */
@@ -233,12 +236,13 @@ class EndpointPairFixture : public BaseFixture {
ApplyCommonChannelArguments(&args);
grpc_channel_args c_args = args.c_channel_args();
- grpc_transport* transport =
+ client_transport_ =
grpc_create_chttp2_transport(&exec_ctx, &c_args, endpoints.client, 1);
- GPR_ASSERT(transport);
- grpc_channel* channel = grpc_channel_create(
- &exec_ctx, "target", &c_args, GRPC_CLIENT_DIRECT_CHANNEL, transport);
- grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL);
+ GPR_ASSERT(client_transport_);
+ grpc_channel* channel =
+ grpc_channel_create(&exec_ctx, "target", &c_args,
+ GRPC_CLIENT_DIRECT_CHANNEL, client_transport_);
+ grpc_chttp2_transport_start_reading(&exec_ctx, client_transport_, NULL);
channel_ = CreateChannelInternal("", channel);
}
@@ -258,6 +262,11 @@ class EndpointPairFixture : public BaseFixture {
ServerCompletionQueue* cq() { return cq_.get(); }
std::shared_ptr<Channel> channel() { return channel_; }
+ protected:
+ grpc_endpoint_pair endpoint_pair_;
+ grpc_transport* client_transport_;
+ grpc_transport* server_transport_;
+
private:
std::unique_ptr<Server> server_;
std::unique_ptr<ServerCompletionQueue> cq_;
@@ -295,6 +304,75 @@ class InProcessCHTTP2 : public EndpointPairFixture {
}
};
+class TrickledCHTTP2 : public EndpointPairFixture {
+ public:
+ TrickledCHTTP2(Service* service, size_t megabits_per_second)
+ : EndpointPairFixture(service, MakeEndpoints(megabits_per_second)) {}
+
+ void AddToLabel(std::ostream& out, benchmark::State& state) {
+ out << " writes/iter:"
+ << ((double)stats_.num_writes / (double)state.iterations())
+ << " cli_transport_stalls/iter:"
+ << ((double)
+ client_stats_.streams_stalled_due_to_transport_flow_control /
+ (double)state.iterations())
+ << " cli_stream_stalls/iter:"
+ << ((double)client_stats_.streams_stalled_due_to_stream_flow_control /
+ (double)state.iterations())
+ << " svr_transport_stalls/iter:"
+ << ((double)
+ server_stats_.streams_stalled_due_to_transport_flow_control /
+ (double)state.iterations())
+ << " svr_stream_stalls/iter:"
+ << ((double)server_stats_.streams_stalled_due_to_stream_flow_control /
+ (double)state.iterations());
+ }
+
+ void Step() {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ size_t client_backlog =
+ grpc_trickle_endpoint_trickle(&exec_ctx, endpoint_pair_.client);
+ size_t server_backlog =
+ grpc_trickle_endpoint_trickle(&exec_ctx, endpoint_pair_.server);
+ grpc_exec_ctx_finish(&exec_ctx);
+
+ UpdateStats((grpc_chttp2_transport*)client_transport_, &client_stats_,
+ client_backlog);
+ UpdateStats((grpc_chttp2_transport*)server_transport_, &server_stats_,
+ server_backlog);
+ }
+
+ private:
+ grpc_passthru_endpoint_stats stats_;
+ struct Stats {
+ int streams_stalled_due_to_stream_flow_control = 0;
+ int streams_stalled_due_to_transport_flow_control = 0;
+ };
+ Stats client_stats_;
+ Stats server_stats_;
+
+ grpc_endpoint_pair MakeEndpoints(size_t kilobits) {
+ grpc_endpoint_pair p;
+ grpc_passthru_endpoint_create(&p.client, &p.server, initialize_stuff.rq(),
+ &stats_);
+ double bytes_per_second = 125.0 * kilobits;
+ p.client = grpc_trickle_endpoint_create(p.client, bytes_per_second);
+ p.server = grpc_trickle_endpoint_create(p.server, bytes_per_second);
+ return p;
+ }
+
+ void UpdateStats(grpc_chttp2_transport* t, Stats* s, size_t backlog) {
+ if (backlog == 0) {
+ if (t->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != NULL) {
+ s->streams_stalled_due_to_stream_flow_control++;
+ }
+ if (t->lists[GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT].head != NULL) {
+ s->streams_stalled_due_to_transport_flow_control++;
+ }
+ }
+ }
+};
+
/*******************************************************************************
* CONTEXT MUTATORS
*/
@@ -777,6 +855,81 @@ static void BM_PumpStreamServerToClient(benchmark::State& state) {
state.SetBytesProcessed(state.range(0) * state.iterations());
}
+static void TrickleCQNext(TrickledCHTTP2* fixture, void** t, bool* ok) {
+ while (true) {
+ switch (fixture->cq()->AsyncNext(
+ t, ok, gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
+ gpr_time_from_micros(100, GPR_TIMESPAN)))) {
+ case CompletionQueue::TIMEOUT:
+ fixture->Step();
+ break;
+ case CompletionQueue::SHUTDOWN:
+ GPR_ASSERT(false);
+ break;
+ case CompletionQueue::GOT_EVENT:
+ return;
+ }
+ }
+}
+
+static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) {
+ EchoTestService::AsyncService service;
+ std::unique_ptr<TrickledCHTTP2> fixture(
+ new TrickledCHTTP2(&service, state.range(1)));
+ {
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ if (state.range(0) > 0) {
+ send_response.set_message(std::string(state.range(0), 'a'));
+ }
+ Status recv_status;
+ ServerContext svr_ctx;
+ ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
+ service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
+ fixture->cq(), tag(0));
+ std::unique_ptr<EchoTestService::Stub> stub(
+ EchoTestService::NewStub(fixture->channel()));
+ ClientContext cli_ctx;
+ auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
+ int need_tags = (1 << 0) | (1 << 1);
+ void* t;
+ bool ok;
+ while (need_tags) {
+ TrickleCQNext(fixture.get(), &t, &ok);
+ GPR_ASSERT(ok);
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+ request_rw->Read(&recv_response, tag(0));
+ while (state.KeepRunning()) {
+ GPR_TIMER_SCOPE("BenchmarkCycle", 0);
+ response_rw.Write(send_response, tag(1));
+ while (true) {
+ TrickleCQNext(fixture.get(), &t, &ok);
+ if (t == tag(0)) {
+ request_rw->Read(&recv_response, tag(0));
+ } else if (t == tag(1)) {
+ break;
+ } else {
+ GPR_ASSERT(false);
+ }
+ }
+ }
+ response_rw.Finish(Status::OK, tag(1));
+ need_tags = (1 << 0) | (1 << 1);
+ while (need_tags) {
+ TrickleCQNext(fixture.get(), &t, &ok);
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+ }
+ fixture->Finish(state);
+ fixture.reset();
+ state.SetBytesProcessed(state.range(0) * state.iterations());
+}
+
/*******************************************************************************
* CONFIGURATIONS
*/
@@ -866,6 +1019,19 @@ BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, SockPair)
BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, InProcessCHTTP2)
->Range(0, 128 * 1024 * 1024);
+static void TrickleArgs(benchmark::internal::Benchmark* b) {
+ for (int i = 1; i <= 128 * 1024 * 1024; i *= 8) {
+ for (int j = 1; j <= 128 * 1024 * 1024; j *= 8) {
+ double expected_time =
+ static_cast<double>(14 + i) / (125.0 * static_cast<double>(j));
+ if (expected_time > 0.01) continue;
+ b->Args({i, j});
+ }
+ }
+}
+
+BENCHMARK(BM_PumpStreamServerToClient_Trickle)->Apply(TrickleArgs);
+
// Generate Args for StreamingPingPong benchmarks. Currently generates args for
// only "small streams" (i.e streams with 0, 1 or 2 messages)
static void StreamingPingPongArgs(benchmark::internal::Benchmark* b) {
diff --git a/tools/profiling/microbenchmarks/bm2bq.py b/tools/profiling/microbenchmarks/bm2bq.py
index 124dbdfec5..a7d82269f5 100755
--- a/tools/profiling/microbenchmarks/bm2bq.py
+++ b/tools/profiling/microbenchmarks/bm2bq.py
@@ -61,6 +61,11 @@ columns = [
('allocs_per_iteration', 'float'),
('locks_per_iteration', 'float'),
('writes_per_iteration', 'float'),
+ ('bandwidth_kilobits', 'integer'),
+ ('cli_transport_stalls_per_iteration', 'float'),
+ ('cli_stream_stalls_per_iteration', 'float'),
+ ('svr_transport_stalls_per_iteration', 'float'),
+ ('svr_stream_stalls_per_iteration', 'float'),
]
if sys.argv[1] == '--schema':
@@ -92,7 +97,11 @@ bm_specs = {
'BM_StreamingPingPongMsgs': {
'tpl': ['fixture', 'client_mutator', 'server_mutator'],
'dyn': ['request_size'],
- }
+ },
+ 'BM_PumpStreamServerToClient_Trickle': {
+ 'tpl': [],
+ 'dyn': ['request_size', 'bandwidth_kilobits'],
+ },
}
def numericalize(s):
@@ -106,6 +115,8 @@ def numericalize(s):
assert 'not a number: %s' % s
def parse_name(name):
+ if '<' not in name and '/' not in name and name not in bm_specs:
+ return {'name': name}
rest = name
out = {}
tpl_args = []
@@ -136,7 +147,7 @@ def parse_name(name):
rest = s[0]
dyn_args = s[1:]
name = rest
- assert name in bm_specs
+ assert name in bm_specs, 'bm_specs needs to be expanded for %s' % name
assert len(dyn_args) == len(bm_specs[name]['dyn'])
assert len(tpl_args) == len(bm_specs[name]['tpl'])
out['name'] = name
@@ -146,10 +157,13 @@ def parse_name(name):
for bm in js['benchmarks']:
context = js['context']
- labels_list = [s.split(':') for s in bm.get('label', '').split(' ')]
- for el in labels_list:
- el[0] = el[0].replace('/iter', '_per_iteration')
- labels = dict(labels_list)
+ if 'label' in bm:
+ labels_list = [s.split(':') for s in bm['label'].split(' ')]
+ for el in labels_list:
+ el[0] = el[0].replace('/iter', '_per_iteration')
+ labels = dict(labels_list)
+ else:
+ labels = {}
row = {
'jenkins_build': os.environ.get('BUILD_NUMBER', ''),
'jenkins_job': os.environ.get('JOB_NAME', ''),
@@ -158,5 +172,6 @@ for bm in js['benchmarks']:
row.update(bm)
row.update(parse_name(row['name']))
row.update(labels)
- del row['label']
+ if 'label' in row:
+ del row['label']
writer.writerow(row)
diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json
index 9c04d04d3f..d71f5735ec 100644
--- a/tools/run_tests/generated/sources_and_headers.json
+++ b/tools/run_tests/generated/sources_and_headers.json
@@ -2369,6 +2369,24 @@
"gpr",
"gpr_test_util",
"grpc",
+ "grpc_test_util"
+ ],
+ "headers": [],
+ "is_filegroup": false,
+ "language": "c++",
+ "name": "bm_closure",
+ "src": [
+ "test/cpp/microbenchmarks/bm_closure.cc"
+ ],
+ "third_party": false,
+ "type": "target"
+ },
+ {
+ "deps": [
+ "benchmark",
+ "gpr",
+ "gpr_test_util",
+ "grpc",
"grpc++",
"grpc++_test_util",
"grpc_test_util"
@@ -7799,7 +7817,8 @@
"test/core/util/passthru_endpoint.h",
"test/core/util/port.h",
"test/core/util/port_server_client.h",
- "test/core/util/slice_splitter.h"
+ "test/core/util/slice_splitter.h",
+ "test/core/util/trickle_endpoint.h"
],
"is_filegroup": true,
"language": "c",
@@ -7834,7 +7853,9 @@
"test/core/util/port_uv.c",
"test/core/util/port_windows.c",
"test/core/util/slice_splitter.c",
- "test/core/util/slice_splitter.h"
+ "test/core/util/slice_splitter.h",
+ "test/core/util/trickle_endpoint.c",
+ "test/core/util/trickle_endpoint.h"
],
"third_party": false,
"type": "filegroup"
diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json
index 4aa8917e51..0c3f98c459 100644
--- a/tools/run_tests/generated/tests.json
+++ b/tools/run_tests/generated/tests.json
@@ -2484,6 +2484,28 @@
"flaky": false,
"gtest": false,
"language": "c++",
+ "name": "bm_closure",
+ "platforms": [
+ "linux",
+ "mac",
+ "posix"
+ ]
+ },
+ {
+ "args": [
+ "--benchmark_min_time=0"
+ ],
+ "ci_platforms": [
+ "linux",
+ "mac",
+ "posix"
+ ],
+ "cpu_cost": 1.0,
+ "exclude_configs": [],
+ "exclude_iomgrs": [],
+ "flaky": false,
+ "gtest": false,
+ "language": "c++",
"name": "bm_cq",
"platforms": [
"linux",
diff --git a/tools/run_tests/run_microbenchmark.py b/tools/run_tests/run_microbenchmark.py
index 6bba29d74b..3bb4a9547c 100755
--- a/tools/run_tests/run_microbenchmark.py
+++ b/tools/run_tests/run_microbenchmark.py
@@ -149,9 +149,12 @@ def collect_summary(bm_name, args):
subprocess.check_call(
['make', bm_name,
'CONFIG=counters', '-j', '%d' % multiprocessing.cpu_count()])
- text(subprocess.check_output(['bins/counters/%s' % bm_name,
- '--benchmark_out=out.json',
- '--benchmark_out_format=json']))
+ cmd = ['bins/counters/%s' % bm_name,
+ '--benchmark_out=out.json',
+ '--benchmark_out_format=json']
+ if args.summary_time is not None:
+ cmd += ['--benchmark_min_time=%d' % args.summary_time]
+ text(subprocess.check_output(cmd))
if args.bigquery_upload:
with open('out.csv', 'w') as f:
f.write(subprocess.check_output(['tools/profiling/microbenchmarks/bm2bq.py', 'out.json']))
@@ -179,6 +182,10 @@ argp.add_argument('--bigquery_upload',
action='store_const',
const=True,
help='Upload results from summary collection to bigquery')
+argp.add_argument('--summary_time',
+ default=None,
+ type=int,
+ help='Minimum time to run benchmarks for the summary collection')
args = argp.parse_args()
for bm_name in args.benchmarks:
diff --git a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj
index 57cbb2c496..504a3fc927 100644
--- a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj
+++ b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj
@@ -193,6 +193,7 @@
<ClInclude Include="$(SolutionDir)\..\test\core\util\port.h" />
<ClInclude Include="$(SolutionDir)\..\test\core\util\port_server_client.h" />
<ClInclude Include="$(SolutionDir)\..\test\core\util\slice_splitter.h" />
+ <ClInclude Include="$(SolutionDir)\..\test\core\util\trickle_endpoint.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\channel_args.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\channel_stack.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\channel_stack_builder.h" />
@@ -343,6 +344,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\test\core\util\slice_splitter.c">
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\test\core\util\trickle_endpoint.c">
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\channel_args.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\channel_stack.c">
diff --git a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters
index 620649a06c..8e51a641f1 100644
--- a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters
+++ b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters
@@ -64,6 +64,9 @@
<ClCompile Include="$(SolutionDir)\..\test\core\util\slice_splitter.c">
<Filter>test\core\util</Filter>
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\test\core\util\trickle_endpoint.c">
+ <Filter>test\core\util</Filter>
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\channel_args.c">
<Filter>src\core\lib\channel</Filter>
</ClCompile>
@@ -554,6 +557,9 @@
<ClInclude Include="$(SolutionDir)\..\test\core\util\slice_splitter.h">
<Filter>test\core\util</Filter>
</ClInclude>
+ <ClInclude Include="$(SolutionDir)\..\test\core\util\trickle_endpoint.h">
+ <Filter>test\core\util</Filter>
+ </ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\channel_args.h">
<Filter>src\core\lib\channel</Filter>
</ClInclude>
diff --git a/vsprojects/vcxproj/grpc_test_util_unsecure/grpc_test_util_unsecure.vcxproj b/vsprojects/vcxproj/grpc_test_util_unsecure/grpc_test_util_unsecure.vcxproj
index daf92305c4..1ea64654e5 100644
--- a/vsprojects/vcxproj/grpc_test_util_unsecure/grpc_test_util_unsecure.vcxproj
+++ b/vsprojects/vcxproj/grpc_test_util_unsecure/grpc_test_util_unsecure.vcxproj
@@ -161,6 +161,7 @@
<ClInclude Include="$(SolutionDir)\..\test\core\util\port.h" />
<ClInclude Include="$(SolutionDir)\..\test\core\util\port_server_client.h" />
<ClInclude Include="$(SolutionDir)\..\test\core\util\slice_splitter.h" />
+ <ClInclude Include="$(SolutionDir)\..\test\core\util\trickle_endpoint.h" />
</ItemGroup>
<ItemGroup>
<ClCompile Include="$(SolutionDir)\..\test\core\end2end\cq_verifier.c">
@@ -195,6 +196,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\test\core\util\slice_splitter.c">
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\test\core\util\trickle_endpoint.c">
+ </ClCompile>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\gpr\gpr.vcxproj">
diff --git a/vsprojects/vcxproj/grpc_test_util_unsecure/grpc_test_util_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc_test_util_unsecure/grpc_test_util_unsecure.vcxproj.filters
index c9a1b4c10d..e2ad88c96e 100644
--- a/vsprojects/vcxproj/grpc_test_util_unsecure/grpc_test_util_unsecure.vcxproj.filters
+++ b/vsprojects/vcxproj/grpc_test_util_unsecure/grpc_test_util_unsecure.vcxproj.filters
@@ -49,6 +49,9 @@
<ClCompile Include="$(SolutionDir)\..\test\core\util\slice_splitter.c">
<Filter>test\core\util</Filter>
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\test\core\util\trickle_endpoint.c">
+ <Filter>test\core\util</Filter>
+ </ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="$(SolutionDir)\..\test\core\end2end\cq_verifier.h">
@@ -93,6 +96,9 @@
<ClInclude Include="$(SolutionDir)\..\test\core\util\slice_splitter.h">
<Filter>test\core\util</Filter>
</ClInclude>
+ <ClInclude Include="$(SolutionDir)\..\test\core\util\trickle_endpoint.h">
+ <Filter>test\core\util</Filter>
+ </ClInclude>
</ItemGroup>
<ItemGroup>